summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorJoey Hess <joeyh@joeyh.name>2017-04-22 13:00:04 -0400
committerJoey Hess <joeyh@joeyh.name>2017-04-22 13:00:04 -0400
commit7987157bfd99b8e2ec78f5030a49c2e16bf08321 (patch)
tree68230afb5bac635426d7c50c86bbc5345e4fc4b6
parent362d3a437c16c10d221caeac21e9f685d7ddf3e6 (diff)
downloaddebug-me-7987157bfd99b8e2ec78f5030a49c2e16bf08321.tar.gz
it works
Multi-user client-server debug-me is working, almost perfectly. All that was missing was replaying the log when the developer connected. A number of race conditions had to be avoided to do that sanely. This commit was sponsored by Ignacio on Patreon.
-rw-r--r--Log.hs1
-rw-r--r--Role/Developer.hs2
-rw-r--r--Server.hs144
-rw-r--r--SessionID.hs15
-rw-r--r--TODO1
5 files changed, 115 insertions, 48 deletions
diff --git a/Log.hs b/Log.hs
index 6c45074..eb7bf3c 100644
--- a/Log.hs
+++ b/Log.hs
@@ -8,7 +8,6 @@ import Memory
import Serialization
import Data.Char
-import Data.Either
import Data.Time.Clock.POSIX
import qualified Data.ByteString.Lazy as L
import System.IO
diff --git a/Role/Developer.hs b/Role/Developer.hs
index 90f7606..89f6ea9 100644
--- a/Role/Developer.hs
+++ b/Role/Developer.hs
@@ -47,9 +47,7 @@ sendTtyInput :: TChan (Message Entered) -> TVar DeveloperState -> Logger -> IO (
sendTtyInput ichan devstate logger = go
where
go = do
- print "in sendTtyInput"
b <- B.hGetSome stdin 1024
- print "sending from dev"
if b == B.empty
then return ()
else send b
diff --git a/Server.hs b/Server.hs
index 1de02a4..c2589f1 100644
--- a/Server.hs
+++ b/Server.hs
@@ -1,4 +1,4 @@
-{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE OverloadedStrings, BangPatterns #-}
module Server where
@@ -16,21 +16,72 @@ import Network.HTTP.Types
import Control.Concurrent.STM
import Control.Concurrent.STM.TMChan
import Control.Concurrent.Async
+import Control.Exception
+import Control.Monad
import qualified Data.Map as M
import qualified Data.Text as T
-import Control.Exception
import Data.Time.Clock.POSIX
+import System.IO
-server :: ServerOpts -> IO ()
-server o = run (serverPort o) . app o =<< newServerState
-
--- | A server is a map of sessions, each of which consists of a broadcast
--- TMChan, which both users and developers write messages to.
-type ServerState = M.Map SessionID (TMChan Log)
+type ServerState = M.Map SessionID Session
newServerState :: IO (TVar ServerState)
newServerState = newTVarIO M.empty
+-- | A session consists of a broadcast TMChan, which both users and
+-- developers write messages to. Writes are stored in the log file,
+-- and a log lock allows atomic access to the log file for replays.
+data Session = Session (TMChan Log) (TVar Handle) (TMVar LogLock)
+
+data LogLock = LogLock
+
+newSession :: TVar Handle -> IO Session
+newSession loghv = Session
+ <$> newBroadcastTMChanIO
+ <*> pure loghv
+ <*> newTMVarIO LogLock
+
+listenSession :: Session -> STM (TMChan Log)
+listenSession (Session bchan _ _) = dupTMChan bchan
+
+-- | While writing a log to the session the LogLock is drained until
+-- the write has reached the log file. This prevents concurrent writes
+-- to the file, and allows writes to be blocked while reading the log file.
+writeSession :: Session -> Log -> IO ()
+writeSession (Session bchan loghv loglock) l = do
+ (ll, logh) <- atomically $ (,)
+ <$> takeTMVar loglock
+ <*> readTVar loghv
+ writeLogHandle l logh
+ atomically $ do
+ putTMVar loglock ll
+ writeTMChan bchan l
+
+-- | Run an action with the log file quiescent (and its write handle closed),
+-- and nothing being added to the session's broadcast TMChan.
+preventWriteWhile :: Session -> ServerOpts -> SessionID -> IO a -> IO a
+preventWriteWhile (Session _ loghv loglock) o sid a = bracket setup cleanup go
+ where
+ setup = do
+ (ll, logh) <- atomically $ (,)
+ <$> takeTMVar loglock
+ <*> readTVar loghv
+ hClose logh
+ return ll
+ cleanup ll = do
+ let f = sessionLogFile (serverDirectory o) sid
+ h <- openFile f WriteMode
+ atomically $ do
+ putTMVar loglock ll
+ writeTVar loghv h
+ go _ = a
+
+closeSession :: Session -> STM ()
+closeSession (Session bchan _ _) = closeTMChan bchan
+
+server :: ServerOpts -> IO ()
+server o = run (serverPort o) . app o =<< newServerState
+
app :: ServerOpts -> TVar ServerState -> Application
app o ssv = websocketsOr WS.defaultConnectionOptions (websocketApp o ssv) webapp
where
@@ -49,42 +100,30 @@ websocketApp o ssv pending_conn = do
Just sid -> developer o ssv sid conn
user :: ServerOpts -> TVar ServerState -> WS.Connection -> IO ()
-user o ssv conn = withSessionID (serverDirectory o) $ \(logh, sid) -> do
+user o ssv conn = withSessionID (serverDirectory o) $ \(loghv, sid) -> do
sendTextData conn sid
- bracket (setup sid) (cleanup sid) (go logh)
+ bracket (setup sid loghv) (cleanup sid) go
where
- setup sid = do
- bchan <- newBroadcastTMChanIO
- atomically $ modifyTVar' ssv $ M.insert sid bchan
- return bchan
+ setup sid loghv = do
+ session <- newSession loghv
+ atomically $ modifyTVar' ssv $ M.insert sid session
+ return session
- cleanup sid bchan = atomically $ do
- closeTMChan bchan
+ cleanup sid session = atomically $ do
+ closeSession session
modifyTVar' ssv $ M.delete sid
- go logh bchan = do
- logchan <- atomically $ dupTMChan bchan
- userchan <- atomically $ dupTMChan bchan
- _ <- storelog logh logchan
- `concurrently` relaytouser userchan
- `concurrently` relayfromuser bchan
+ go session = do
+ userchan <- atomically $ listenSession session
+ _ <- relaytouser userchan
+ `concurrently` relayfromuser session
return ()
-- Relay all messages from the user's websocket to the
- -- broadcast channel.
- relayfromuser bchan = relayFromSocket conn $ \msg -> do
- print ("got from user", msg)
+ -- session broadcast channel.
+ relayfromuser session = relayFromSocket conn $ \msg -> do
l <- mkLog (User msg) <$> getPOSIXTime
- atomically $ writeTMChan bchan l
-
- -- Read from logchan and store each message to the log file.
- storelog logh logchan = do
- v <- atomically $ readTMChan logchan
- case v of
- Nothing -> return ()
- Just l -> do
- writeLogHandle l logh
- storelog logh logchan
+ writeSession session l
-- Relay developer messages from the channel to the user's websocket.
relaytouser userchan = relayToSocket conn $ do
@@ -101,20 +140,21 @@ developer o ssv sid conn = bracket setup cleanup go
setup = atomically $ M.lookup sid <$> readTVar ssv
cleanup _ = return ()
go Nothing = error "Invalid session id!"
- go (Just bchan) = do
+ go (Just session) = do
+ -- Sending the SessionID to the developer is redundant, but
+ -- is done to make the protocol startup sequence the same as
+ -- it is for the user.
sendTextData conn sid
- -- TODO replay backlog
- devchan <- atomically $ dupTMChan bchan
- _ <- relayfromdeveloper bchan
+ devchan <- replayBacklog o sid session conn
+ _ <- relayfromdeveloper session
`concurrently` relaytodeveloper devchan
return ()
-- Relay all messages from the developer's websocket to the
-- broadcast channel.
- relayfromdeveloper bchan = relayFromSocket conn $ \msg -> do
- print ("got from developer", msg)
+ relayfromdeveloper session = relayFromSocket conn $ \msg -> do
l <- mkLog (Developer msg) <$> getPOSIXTime
- atomically $ writeTMChan bchan l
+ writeSession session l
-- Relay user messages from the channel to the developer's websocket.
relaytodeveloper devchan = relayToSocket conn $ do
@@ -124,3 +164,23 @@ developer o ssv sid conn = bracket setup cleanup go
User m -> Just m
Developer _ -> Nothing
Nothing -> Nothing
+
+-- | Replay the log of what's happened in the session so far,
+-- and return a channel that will get new session activity.
+--
+-- This is done atomically; even if new activity arrives while it's
+-- running nothing more will be logged until the log file has been
+-- replayed and the channel set up.
+--
+-- Note that the session may appear to freeze for other users while
+-- this is running.
+replayBacklog :: ServerOpts -> SessionID -> Session -> WS.Connection -> IO (TMChan Log)
+replayBacklog o sid session conn = preventWriteWhile session o sid $ do
+ ls <- streamLog (sessionLogFile (serverDirectory o) sid)
+ forM_ ls $ \l -> case loggedMessage <$> l of
+ Right (User m) -> sendBinaryData conn m
+ Right (Developer _) -> return ()
+ -- This should not happen, since writes to the log
+ -- are blocked. Unless there's a disk error..
+ Left _ -> return ()
+ atomically $ listenSession session
diff --git a/SessionID.hs b/SessionID.hs
index d643a28..8bf8f7d 100644
--- a/SessionID.hs
+++ b/SessionID.hs
@@ -20,6 +20,8 @@ import Data.Maybe
import Data.List
import Data.UUID
import Data.UUID.V4
+import Control.Concurrent.STM
+import Control.Exception
-- | A SessionID is the base name of the log file to use,
-- and may not contain any path information.
@@ -51,11 +53,11 @@ mkSessionID s
sessionLogFile :: FilePath -> SessionID -> FilePath
sessionLogFile dir (SessionID f) = dir </> "debug-me." ++ f ++ ".log"
--- | Allocate a new SessionID and return an open Handle to its log file.
+-- | Allocate a new SessionID open a Handle to its log file.
--
-- A UUID is used, to avoid ever generating a SessionID that has been used
-- before.
-withSessionID :: FilePath -> ((Handle, SessionID) -> IO a) -> IO a
+withSessionID :: FilePath -> ((TVar Handle, SessionID) -> IO a) -> IO a
withSessionID dir a = do
createDirectoryIfMissing False dir
sid <- SessionID . toString <$> nextRandom
@@ -66,7 +68,14 @@ withSessionID dir a = do
exists <- doesFileExist f
if exists
then withSessionID dir a
- else withFile f WriteMode $ \h -> a (h, sid)
+ else bracket (setup f) cleanup (go sid)
+ where
+ setup f = do
+ h <- openFile f WriteMode
+ hv <- newTVarIO h
+ return hv
+ cleanup hv = hClose =<< atomically (readTVar hv)
+ go sid hv = a (hv, sid)
type UrlString = String
diff --git a/TODO b/TODO
index 3c78f89..8affef0 100644
--- a/TODO
+++ b/TODO
@@ -1,3 +1,4 @@
+* Developer keeps running when user quits.
* The current rules for when an Activity Entered is accepted allow it to
refer to an older activity than the last one. If echoing is disabled,
two Activity Entered could be sent, each pointing at the most recent