{-# LANGUAGE OverloadedStrings, BangPatterns #-} module Server where import CmdLine import WebSockets import SessionID import Log import Network.Wai import Network.Wai.Handler.Warp import Network.Wai.Handler.WebSockets import Network.WebSockets hiding (Message) import qualified Network.WebSockets as WS import Network.HTTP.Types import Control.Concurrent.STM import Control.Concurrent.STM.TMChan import Control.Concurrent.Async import Control.Exception import Control.Monad import qualified Data.Map as M import qualified Data.Text as T import Data.Time.Clock.POSIX import System.IO type ServerState = M.Map SessionID Session newServerState :: IO (TVar ServerState) newServerState = newTVarIO M.empty -- | A session consists of a broadcast TMChan, which both users and -- developers write messages to. Writes are stored in the log file, -- and a log lock allows atomic access to the log file for replays. data Session = Session (TMChan Log) (TVar Handle) (TMVar LogLock) data LogLock = LogLock newSession :: TVar Handle -> IO Session newSession loghv = Session <$> newBroadcastTMChanIO <*> pure loghv <*> newTMVarIO LogLock listenSession :: Session -> STM (TMChan Log) listenSession (Session bchan _ _) = dupTMChan bchan -- | While writing a log to the session the LogLock is drained until -- the write has reached the log file. This prevents concurrent writes -- to the file, and allows writes to be blocked while reading the log file. writeSession :: Session -> Log -> IO () writeSession (Session bchan loghv loglock) l = do (ll, logh) <- atomically $ (,) <$> takeTMVar loglock <*> readTVar loghv writeLogHandle l logh atomically $ do putTMVar loglock ll writeTMChan bchan l -- | Run an action with the log file quiescent (and its write handle closed), -- and nothing being added to the session's broadcast TMChan. preventWriteWhile :: Session -> ServerOpts -> SessionID -> IO a -> IO a preventWriteWhile (Session _ loghv loglock) o sid a = bracket setup cleanup go where setup = do (ll, logh) <- atomically $ (,) <$> takeTMVar loglock <*> readTVar loghv hClose logh return ll cleanup ll = do let f = sessionLogFile (serverDirectory o) sid h <- openFile f WriteMode atomically $ do putTMVar loglock ll writeTVar loghv h go _ = a closeSession :: Session -> STM () closeSession (Session bchan _ _) = closeTMChan bchan server :: ServerOpts -> IO () server o = run (serverPort o) . app o =<< newServerState app :: ServerOpts -> TVar ServerState -> Application app o ssv = websocketsOr WS.defaultConnectionOptions (websocketApp o ssv) webapp where webapp _ respond = respond $ responseLBS status400 [] "Not a WebSocket request" websocketApp :: ServerOpts -> TVar ServerState -> WS.ServerApp websocketApp o ssv pending_conn = do conn <- WS.acceptRequest pending_conn _v <- negotiateWireVersion conn theirmode <- getMode conn case theirmode of InitMode _ -> user o ssv conn ConnectMode t -> case mkSessionID (T.unpack t) of Nothing -> error "Invalid session id!" Just sid -> developer o ssv sid conn user :: ServerOpts -> TVar ServerState -> WS.Connection -> IO () user o ssv conn = withSessionID (serverDirectory o) $ \(loghv, sid) -> do sendTextData conn sid bracket (setup sid loghv) (cleanup sid) go where setup sid loghv = do session <- newSession loghv atomically $ modifyTVar' ssv $ M.insert sid session return session cleanup sid session = atomically $ do closeSession session modifyTVar' ssv $ M.delete sid go session = do userchan <- atomically $ listenSession session _ <- relaytouser userchan `concurrently` relayfromuser session return () -- Relay all messages from the user's websocket to the -- session broadcast channel. relayfromuser session = relayFromSocket conn $ \msg -> do l <- mkLog (User msg) <$> getPOSIXTime writeSession session l -- Relay developer messages from the channel to the user's websocket. relaytouser userchan = relayToSocket conn $ do v <- atomically $ readTMChan userchan return $ case v of Just l -> case loggedMessage l of Developer m -> Just m User _ -> Nothing Nothing -> Nothing developer :: ServerOpts -> TVar ServerState -> SessionID -> WS.Connection -> IO () developer o ssv sid conn = bracket setup cleanup go where setup = atomically $ M.lookup sid <$> readTVar ssv cleanup _ = return () go Nothing = error "Invalid session id!" go (Just session) = do -- Sending the SessionID to the developer is redundant, but -- is done to make the protocol startup sequence the same as -- it is for the user. sendTextData conn sid devchan <- replayBacklog o sid session conn _ <- relayfromdeveloper session `concurrently` relaytodeveloper devchan return () -- Relay all messages from the developer's websocket to the -- broadcast channel. relayfromdeveloper session = relayFromSocket conn $ \msg -> do l <- mkLog (Developer msg) <$> getPOSIXTime writeSession session l -- Relay user messages from the channel to the developer's websocket. relaytodeveloper devchan = relayToSocket conn $ do v <- atomically $ readTMChan devchan return $ case v of Just l -> case loggedMessage l of User m -> Just m Developer _ -> Nothing Nothing -> Nothing -- | Replay the log of what's happened in the session so far, -- and return a channel that will get new session activity. -- -- This is done atomically; even if new activity arrives while it's -- running nothing more will be logged until the log file has been -- replayed and the channel set up. -- -- Note that the session may appear to freeze for other users while -- this is running. replayBacklog :: ServerOpts -> SessionID -> Session -> WS.Connection -> IO (TMChan Log) replayBacklog o sid session conn = preventWriteWhile session o sid $ do ls <- streamLog (sessionLogFile (serverDirectory o) sid) forM_ ls $ \l -> case loggedMessage <$> l of Right (User m) -> sendBinaryData conn m Right (Developer _) -> return () -- This should not happen, since writes to the log -- are blocked. Unless there's a disk error.. Left _ -> return () atomically $ listenSession session