From 7987157bfd99b8e2ec78f5030a49c2e16bf08321 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Sat, 22 Apr 2017 13:00:04 -0400 Subject: it works Multi-user client-server debug-me is working, almost perfectly. All that was missing was replaying the log when the developer connected. A number of race conditions had to be avoided to do that sanely. This commit was sponsored by Ignacio on Patreon. --- Server.hs | 144 ++++++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 102 insertions(+), 42 deletions(-) (limited to 'Server.hs') diff --git a/Server.hs b/Server.hs index 1de02a4..c2589f1 100644 --- a/Server.hs +++ b/Server.hs @@ -1,4 +1,4 @@ -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE OverloadedStrings, BangPatterns #-} module Server where @@ -16,21 +16,72 @@ import Network.HTTP.Types import Control.Concurrent.STM import Control.Concurrent.STM.TMChan import Control.Concurrent.Async +import Control.Exception +import Control.Monad import qualified Data.Map as M import qualified Data.Text as T -import Control.Exception import Data.Time.Clock.POSIX +import System.IO -server :: ServerOpts -> IO () -server o = run (serverPort o) . app o =<< newServerState - --- | A server is a map of sessions, each of which consists of a broadcast --- TMChan, which both users and developers write messages to. -type ServerState = M.Map SessionID (TMChan Log) +type ServerState = M.Map SessionID Session newServerState :: IO (TVar ServerState) newServerState = newTVarIO M.empty +-- | A session consists of a broadcast TMChan, which both users and +-- developers write messages to. Writes are stored in the log file, +-- and a log lock allows atomic access to the log file for replays. +data Session = Session (TMChan Log) (TVar Handle) (TMVar LogLock) + +data LogLock = LogLock + +newSession :: TVar Handle -> IO Session +newSession loghv = Session + <$> newBroadcastTMChanIO + <*> pure loghv + <*> newTMVarIO LogLock + +listenSession :: Session -> STM (TMChan Log) +listenSession (Session bchan _ _) = dupTMChan bchan + +-- | While writing a log to the session the LogLock is drained until +-- the write has reached the log file. This prevents concurrent writes +-- to the file, and allows writes to be blocked while reading the log file. +writeSession :: Session -> Log -> IO () +writeSession (Session bchan loghv loglock) l = do + (ll, logh) <- atomically $ (,) + <$> takeTMVar loglock + <*> readTVar loghv + writeLogHandle l logh + atomically $ do + putTMVar loglock ll + writeTMChan bchan l + +-- | Run an action with the log file quiescent (and its write handle closed), +-- and nothing being added to the session's broadcast TMChan. +preventWriteWhile :: Session -> ServerOpts -> SessionID -> IO a -> IO a +preventWriteWhile (Session _ loghv loglock) o sid a = bracket setup cleanup go + where + setup = do + (ll, logh) <- atomically $ (,) + <$> takeTMVar loglock + <*> readTVar loghv + hClose logh + return ll + cleanup ll = do + let f = sessionLogFile (serverDirectory o) sid + h <- openFile f WriteMode + atomically $ do + putTMVar loglock ll + writeTVar loghv h + go _ = a + +closeSession :: Session -> STM () +closeSession (Session bchan _ _) = closeTMChan bchan + +server :: ServerOpts -> IO () +server o = run (serverPort o) . app o =<< newServerState + app :: ServerOpts -> TVar ServerState -> Application app o ssv = websocketsOr WS.defaultConnectionOptions (websocketApp o ssv) webapp where @@ -49,42 +100,30 @@ websocketApp o ssv pending_conn = do Just sid -> developer o ssv sid conn user :: ServerOpts -> TVar ServerState -> WS.Connection -> IO () -user o ssv conn = withSessionID (serverDirectory o) $ \(logh, sid) -> do +user o ssv conn = withSessionID (serverDirectory o) $ \(loghv, sid) -> do sendTextData conn sid - bracket (setup sid) (cleanup sid) (go logh) + bracket (setup sid loghv) (cleanup sid) go where - setup sid = do - bchan <- newBroadcastTMChanIO - atomically $ modifyTVar' ssv $ M.insert sid bchan - return bchan + setup sid loghv = do + session <- newSession loghv + atomically $ modifyTVar' ssv $ M.insert sid session + return session - cleanup sid bchan = atomically $ do - closeTMChan bchan + cleanup sid session = atomically $ do + closeSession session modifyTVar' ssv $ M.delete sid - go logh bchan = do - logchan <- atomically $ dupTMChan bchan - userchan <- atomically $ dupTMChan bchan - _ <- storelog logh logchan - `concurrently` relaytouser userchan - `concurrently` relayfromuser bchan + go session = do + userchan <- atomically $ listenSession session + _ <- relaytouser userchan + `concurrently` relayfromuser session return () -- Relay all messages from the user's websocket to the - -- broadcast channel. - relayfromuser bchan = relayFromSocket conn $ \msg -> do - print ("got from user", msg) + -- session broadcast channel. + relayfromuser session = relayFromSocket conn $ \msg -> do l <- mkLog (User msg) <$> getPOSIXTime - atomically $ writeTMChan bchan l - - -- Read from logchan and store each message to the log file. - storelog logh logchan = do - v <- atomically $ readTMChan logchan - case v of - Nothing -> return () - Just l -> do - writeLogHandle l logh - storelog logh logchan + writeSession session l -- Relay developer messages from the channel to the user's websocket. relaytouser userchan = relayToSocket conn $ do @@ -101,20 +140,21 @@ developer o ssv sid conn = bracket setup cleanup go setup = atomically $ M.lookup sid <$> readTVar ssv cleanup _ = return () go Nothing = error "Invalid session id!" - go (Just bchan) = do + go (Just session) = do + -- Sending the SessionID to the developer is redundant, but + -- is done to make the protocol startup sequence the same as + -- it is for the user. sendTextData conn sid - -- TODO replay backlog - devchan <- atomically $ dupTMChan bchan - _ <- relayfromdeveloper bchan + devchan <- replayBacklog o sid session conn + _ <- relayfromdeveloper session `concurrently` relaytodeveloper devchan return () -- Relay all messages from the developer's websocket to the -- broadcast channel. - relayfromdeveloper bchan = relayFromSocket conn $ \msg -> do - print ("got from developer", msg) + relayfromdeveloper session = relayFromSocket conn $ \msg -> do l <- mkLog (Developer msg) <$> getPOSIXTime - atomically $ writeTMChan bchan l + writeSession session l -- Relay user messages from the channel to the developer's websocket. relaytodeveloper devchan = relayToSocket conn $ do @@ -124,3 +164,23 @@ developer o ssv sid conn = bracket setup cleanup go User m -> Just m Developer _ -> Nothing Nothing -> Nothing + +-- | Replay the log of what's happened in the session so far, +-- and return a channel that will get new session activity. +-- +-- This is done atomically; even if new activity arrives while it's +-- running nothing more will be logged until the log file has been +-- replayed and the channel set up. +-- +-- Note that the session may appear to freeze for other users while +-- this is running. +replayBacklog :: ServerOpts -> SessionID -> Session -> WS.Connection -> IO (TMChan Log) +replayBacklog o sid session conn = preventWriteWhile session o sid $ do + ls <- streamLog (sessionLogFile (serverDirectory o) sid) + forM_ ls $ \l -> case loggedMessage <$> l of + Right (User m) -> sendBinaryData conn m + Right (Developer _) -> return () + -- This should not happen, since writes to the log + -- are blocked. Unless there's a disk error.. + Left _ -> return () + atomically $ listenSession session -- cgit v1.2.3