From 3adfdf1ae27cd4b6419ce5be14ffb3712339065a Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Sat, 22 Apr 2017 15:14:03 -0400 Subject: add framing protocol for websockets --- Server.hs | 86 +++++++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 53 insertions(+), 33 deletions(-) (limited to 'Server.hs') diff --git a/Server.hs b/Server.hs index c2589f1..c1a302a 100644 --- a/Server.hs +++ b/Server.hs @@ -2,6 +2,7 @@ module Server where +import Types import CmdLine import WebSockets import SessionID @@ -22,6 +23,7 @@ import qualified Data.Map as M import qualified Data.Text as T import Data.Time.Clock.POSIX import System.IO +import System.Directory type ServerState = M.Map SessionID Session @@ -92,16 +94,18 @@ websocketApp :: ServerOpts -> TVar ServerState -> WS.ServerApp websocketApp o ssv pending_conn = do conn <- WS.acceptRequest pending_conn _v <- negotiateWireVersion conn - theirmode <- getMode conn - case theirmode of - InitMode _ -> user o ssv conn - ConnectMode t -> case mkSessionID (T.unpack t) of - Nothing -> error "Invalid session id!" - Just sid -> developer o ssv sid conn + r <- receiveData conn + case r of + SelectMode ClientSends (InitMode _) -> user o ssv conn + SelectMode ClientSends (ConnectMode t) -> + case mkSessionID (T.unpack t) of + Nothing -> protocolError conn "Invalid session id!" + Just sid -> developer o ssv sid conn + _ -> protocolError conn "Expected SelectMode" user :: ServerOpts -> TVar ServerState -> WS.Connection -> IO () user o ssv conn = withSessionID (serverDirectory o) $ \(loghv, sid) -> do - sendTextData conn sid + sendBinaryData conn (Ready ServerSends sid) bracket (setup sid loghv) (cleanup sid) go where setup sid loghv = do @@ -109,28 +113,32 @@ user o ssv conn = withSessionID (serverDirectory o) $ \(loghv, sid) -> do atomically $ modifyTVar' ssv $ M.insert sid session return session - cleanup sid session = atomically $ do - closeSession session - modifyTVar' ssv $ M.delete sid + cleanup sid session = do + atomically $ do + closeSession session + modifyTVar' ssv $ M.delete sid go session = do userchan <- atomically $ listenSession session _ <- relaytouser userchan - `concurrently` relayfromuser session + `race` relayfromuser session return () -- Relay all messages from the user's websocket to the -- session broadcast channel. + -- (The user is allowed to send Developer messages too.. perhaps + -- they got them from a developer connected to them some other + -- way.) relayfromuser session = relayFromSocket conn $ \msg -> do - l <- mkLog (User msg) <$> getPOSIXTime + l <- mkLog msg <$> getPOSIXTime writeSession session l - -- Relay developer messages from the channel to the user's websocket. + -- Relay Developer messages from the channel to the user's websocket. relaytouser userchan = relayToSocket conn $ do v <- atomically $ readTMChan userchan return $ case v of Just l -> case loggedMessage l of - Developer m -> Just m + Developer m -> Just (Developer m) User _ -> Nothing Nothing -> Nothing @@ -139,29 +147,39 @@ developer o ssv sid conn = bracket setup cleanup go where setup = atomically $ M.lookup sid <$> readTVar ssv cleanup _ = return () - go Nothing = error "Invalid session id!" + go Nothing = do + exists <- doesFileExist $ + sessionLogFile (serverDirectory o) sid + if exists + then do + sendBinaryData conn (Ready ServerSends sid) + replayBacklog o sid conn + sendBinaryData conn Done + else protocolError conn "Unknown session ID" go (Just session) = do - -- Sending the SessionID to the developer is redundant, but - -- is done to make the protocol startup sequence the same as - -- it is for the user. - sendTextData conn sid - devchan <- replayBacklog o sid session conn + sendBinaryData conn (Ready ServerSends sid) + devchan <- replayBacklogAndListen o sid session conn _ <- relayfromdeveloper session `concurrently` relaytodeveloper devchan return () - -- Relay all messages from the developer's websocket to the - -- broadcast channel. - relayfromdeveloper session = relayFromSocket conn $ \msg -> do - l <- mkLog (Developer msg) <$> getPOSIXTime - writeSession session l + -- Relay all Developer amessages from the developer's websocket + -- to the broadcast channel. + relayfromdeveloper session = relayFromSocket conn $ \msg -> case msg of + Developer _ -> do + l <- mkLog msg <$> getPOSIXTime + writeSession session l + User _ -> return () -- developer cannot send User messages -- Relay user messages from the channel to the developer's websocket. relaytodeveloper devchan = relayToSocket conn $ do v <- atomically $ readTMChan devchan return $ case v of Just l -> case loggedMessage l of - User m -> Just m + User m -> Just (User m) + -- TODO: Relay messages from other + -- developers, without looping back + -- the developer's own messages. Developer _ -> Nothing Nothing -> Nothing @@ -174,13 +192,15 @@ developer o ssv sid conn = bracket setup cleanup go -- -- Note that the session may appear to freeze for other users while -- this is running. -replayBacklog :: ServerOpts -> SessionID -> Session -> WS.Connection -> IO (TMChan Log) -replayBacklog o sid session conn = preventWriteWhile session o sid $ do +replayBacklogAndListen :: ServerOpts -> SessionID -> Session -> WS.Connection -> IO (TMChan Log) +replayBacklogAndListen o sid session conn = + preventWriteWhile session o sid $ do + replayBacklog o sid conn + atomically $ listenSession session + +replayBacklog :: ServerOpts -> SessionID -> WS.Connection -> IO () +replayBacklog o sid conn = do ls <- streamLog (sessionLogFile (serverDirectory o) sid) forM_ ls $ \l -> case loggedMessage <$> l of - Right (User m) -> sendBinaryData conn m - Right (Developer _) -> return () - -- This should not happen, since writes to the log - -- are blocked. Unless there's a disk error.. + Right m -> sendBinaryData conn (LogMessage m) Left _ -> return () - atomically $ listenSession session -- cgit v1.2.3