summaryrefslogtreecommitdiffhomepage
path: root/Server.hs
diff options
context:
space:
mode:
authorJoey Hess <joeyh@joeyh.name>2017-04-22 13:00:04 -0400
committerJoey Hess <joeyh@joeyh.name>2017-04-22 13:00:04 -0400
commit7987157bfd99b8e2ec78f5030a49c2e16bf08321 (patch)
tree68230afb5bac635426d7c50c86bbc5345e4fc4b6 /Server.hs
parent362d3a437c16c10d221caeac21e9f685d7ddf3e6 (diff)
downloaddebug-me-7987157bfd99b8e2ec78f5030a49c2e16bf08321.tar.gz
it works
Multi-user client-server debug-me is working, almost perfectly. All that was missing was replaying the log when the developer connected. A number of race conditions had to be avoided to do that sanely. This commit was sponsored by Ignacio on Patreon.
Diffstat (limited to 'Server.hs')
-rw-r--r--Server.hs144
1 files changed, 102 insertions, 42 deletions
diff --git a/Server.hs b/Server.hs
index 1de02a4..c2589f1 100644
--- a/Server.hs
+++ b/Server.hs
@@ -1,4 +1,4 @@
-{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE OverloadedStrings, BangPatterns #-}
module Server where
@@ -16,21 +16,72 @@ import Network.HTTP.Types
import Control.Concurrent.STM
import Control.Concurrent.STM.TMChan
import Control.Concurrent.Async
+import Control.Exception
+import Control.Monad
import qualified Data.Map as M
import qualified Data.Text as T
-import Control.Exception
import Data.Time.Clock.POSIX
+import System.IO
-server :: ServerOpts -> IO ()
-server o = run (serverPort o) . app o =<< newServerState
-
--- | A server is a map of sessions, each of which consists of a broadcast
--- TMChan, which both users and developers write messages to.
-type ServerState = M.Map SessionID (TMChan Log)
+type ServerState = M.Map SessionID Session
newServerState :: IO (TVar ServerState)
newServerState = newTVarIO M.empty
+-- | A session consists of a broadcast TMChan, which both users and
+-- developers write messages to. Writes are stored in the log file,
+-- and a log lock allows atomic access to the log file for replays.
+data Session = Session (TMChan Log) (TVar Handle) (TMVar LogLock)
+
+data LogLock = LogLock
+
+newSession :: TVar Handle -> IO Session
+newSession loghv = Session
+ <$> newBroadcastTMChanIO
+ <*> pure loghv
+ <*> newTMVarIO LogLock
+
+listenSession :: Session -> STM (TMChan Log)
+listenSession (Session bchan _ _) = dupTMChan bchan
+
+-- | While writing a log to the session the LogLock is drained until
+-- the write has reached the log file. This prevents concurrent writes
+-- to the file, and allows writes to be blocked while reading the log file.
+writeSession :: Session -> Log -> IO ()
+writeSession (Session bchan loghv loglock) l = do
+ (ll, logh) <- atomically $ (,)
+ <$> takeTMVar loglock
+ <*> readTVar loghv
+ writeLogHandle l logh
+ atomically $ do
+ putTMVar loglock ll
+ writeTMChan bchan l
+
+-- | Run an action with the log file quiescent (and its write handle closed),
+-- and nothing being added to the session's broadcast TMChan.
+preventWriteWhile :: Session -> ServerOpts -> SessionID -> IO a -> IO a
+preventWriteWhile (Session _ loghv loglock) o sid a = bracket setup cleanup go
+ where
+ setup = do
+ (ll, logh) <- atomically $ (,)
+ <$> takeTMVar loglock
+ <*> readTVar loghv
+ hClose logh
+ return ll
+ cleanup ll = do
+ let f = sessionLogFile (serverDirectory o) sid
+ h <- openFile f WriteMode
+ atomically $ do
+ putTMVar loglock ll
+ writeTVar loghv h
+ go _ = a
+
+closeSession :: Session -> STM ()
+closeSession (Session bchan _ _) = closeTMChan bchan
+
+server :: ServerOpts -> IO ()
+server o = run (serverPort o) . app o =<< newServerState
+
app :: ServerOpts -> TVar ServerState -> Application
app o ssv = websocketsOr WS.defaultConnectionOptions (websocketApp o ssv) webapp
where
@@ -49,42 +100,30 @@ websocketApp o ssv pending_conn = do
Just sid -> developer o ssv sid conn
user :: ServerOpts -> TVar ServerState -> WS.Connection -> IO ()
-user o ssv conn = withSessionID (serverDirectory o) $ \(logh, sid) -> do
+user o ssv conn = withSessionID (serverDirectory o) $ \(loghv, sid) -> do
sendTextData conn sid
- bracket (setup sid) (cleanup sid) (go logh)
+ bracket (setup sid loghv) (cleanup sid) go
where
- setup sid = do
- bchan <- newBroadcastTMChanIO
- atomically $ modifyTVar' ssv $ M.insert sid bchan
- return bchan
+ setup sid loghv = do
+ session <- newSession loghv
+ atomically $ modifyTVar' ssv $ M.insert sid session
+ return session
- cleanup sid bchan = atomically $ do
- closeTMChan bchan
+ cleanup sid session = atomically $ do
+ closeSession session
modifyTVar' ssv $ M.delete sid
- go logh bchan = do
- logchan <- atomically $ dupTMChan bchan
- userchan <- atomically $ dupTMChan bchan
- _ <- storelog logh logchan
- `concurrently` relaytouser userchan
- `concurrently` relayfromuser bchan
+ go session = do
+ userchan <- atomically $ listenSession session
+ _ <- relaytouser userchan
+ `concurrently` relayfromuser session
return ()
-- Relay all messages from the user's websocket to the
- -- broadcast channel.
- relayfromuser bchan = relayFromSocket conn $ \msg -> do
- print ("got from user", msg)
+ -- session broadcast channel.
+ relayfromuser session = relayFromSocket conn $ \msg -> do
l <- mkLog (User msg) <$> getPOSIXTime
- atomically $ writeTMChan bchan l
-
- -- Read from logchan and store each message to the log file.
- storelog logh logchan = do
- v <- atomically $ readTMChan logchan
- case v of
- Nothing -> return ()
- Just l -> do
- writeLogHandle l logh
- storelog logh logchan
+ writeSession session l
-- Relay developer messages from the channel to the user's websocket.
relaytouser userchan = relayToSocket conn $ do
@@ -101,20 +140,21 @@ developer o ssv sid conn = bracket setup cleanup go
setup = atomically $ M.lookup sid <$> readTVar ssv
cleanup _ = return ()
go Nothing = error "Invalid session id!"
- go (Just bchan) = do
+ go (Just session) = do
+ -- Sending the SessionID to the developer is redundant, but
+ -- is done to make the protocol startup sequence the same as
+ -- it is for the user.
sendTextData conn sid
- -- TODO replay backlog
- devchan <- atomically $ dupTMChan bchan
- _ <- relayfromdeveloper bchan
+ devchan <- replayBacklog o sid session conn
+ _ <- relayfromdeveloper session
`concurrently` relaytodeveloper devchan
return ()
-- Relay all messages from the developer's websocket to the
-- broadcast channel.
- relayfromdeveloper bchan = relayFromSocket conn $ \msg -> do
- print ("got from developer", msg)
+ relayfromdeveloper session = relayFromSocket conn $ \msg -> do
l <- mkLog (Developer msg) <$> getPOSIXTime
- atomically $ writeTMChan bchan l
+ writeSession session l
-- Relay user messages from the channel to the developer's websocket.
relaytodeveloper devchan = relayToSocket conn $ do
@@ -124,3 +164,23 @@ developer o ssv sid conn = bracket setup cleanup go
User m -> Just m
Developer _ -> Nothing
Nothing -> Nothing
+
+-- | Replay the log of what's happened in the session so far,
+-- and return a channel that will get new session activity.
+--
+-- This is done atomically; even if new activity arrives while it's
+-- running nothing more will be logged until the log file has been
+-- replayed and the channel set up.
+--
+-- Note that the session may appear to freeze for other users while
+-- this is running.
+replayBacklog :: ServerOpts -> SessionID -> Session -> WS.Connection -> IO (TMChan Log)
+replayBacklog o sid session conn = preventWriteWhile session o sid $ do
+ ls <- streamLog (sessionLogFile (serverDirectory o) sid)
+ forM_ ls $ \l -> case loggedMessage <$> l of
+ Right (User m) -> sendBinaryData conn m
+ Right (Developer _) -> return ()
+ -- This should not happen, since writes to the log
+ -- are blocked. Unless there's a disk error..
+ Left _ -> return ()
+ atomically $ listenSession session