From 6591e2b974ac22cbc2a06141edef76a775726e11 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Wed, 26 Apr 2017 14:23:37 -0400 Subject: have server relay Devloper messages to other Developers --- Server.hs | 45 ++++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-) (limited to 'Server.hs') diff --git a/Server.hs b/Server.hs index 5de184d..0906937 100644 --- a/Server.hs +++ b/Server.hs @@ -14,6 +14,7 @@ import Network.Wai.Handler.WebSockets import Network.WebSockets hiding (Message) import qualified Network.WebSockets as WS import Network.HTTP.Types +import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.STM.TMChan import Control.Concurrent.Async @@ -24,6 +25,7 @@ import qualified Data.Text as T import Data.Time.Clock.POSIX import System.IO import System.Directory +import System.Mem.Weak type ServerState = M.Map SessionID Session @@ -33,17 +35,21 @@ newServerState = newTVarIO M.empty -- | A session consists of a broadcast TMChan, which both users and -- developers write messages to. Writes are stored in the log file, -- and a log lock allows atomic access to the log file for replays. -data Session = Session (TMChan Log) (TVar Handle) (TMVar LogLock) +data Session = Session (TMChan (Broadcast Log)) (TVar Handle) (TMVar LogLock) data LogLock = LogLock +-- | A broadcast message, with the ThreadId of the sending thread +-- (which probably wants to ignore the message it sent). +data Broadcast a = Broadcast a (Weak ThreadId) + newSession :: TVar Handle -> IO Session newSession loghv = Session <$> newBroadcastTMChanIO <*> pure loghv <*> newTMVarIO LogLock -listenSession :: Session -> STM (TMChan Log) +listenSession :: Session -> STM (TMChan (Broadcast Log)) listenSession (Session bchan _ _) = dupTMChan bchan -- | While writing a log to the session the LogLock is drained until @@ -55,9 +61,10 @@ writeSession (Session bchan loghv loglock) l = do <$> takeTMVar loglock <*> readTVar loghv writeLogHandle l logh + tid <- mkWeakThreadId =<< myThreadId atomically $ do putTMVar loglock ll - writeTMChan bchan l + writeTMChan bchan (Broadcast l tid) -- | Run an action with the log file quiescent (and its write handle closed), -- and nothing being added to the session's broadcast TMChan. @@ -145,7 +152,7 @@ user o ssv conn = withSessionID (serverDirectory o) $ \(loghv, sid) -> do relaytouser userchan = do v <- atomically $ readTMChan userchan case v of - Just l -> case loggedMessage l of + Just (Broadcast l _from) -> case loggedMessage l of Developer m -> do sendBinaryData conn (LogMessage (Developer m)) relaytouser userchan @@ -169,8 +176,9 @@ developer o ssv sid conn = bracket setup cleanup go go (Just session) = do sendBinaryData conn (Ready ServerSends sid) devchan <- replayBacklogAndListen o sid session conn + mytid <- myThreadId _ <- relayfromdeveloper session - `concurrently` relaytodeveloper devchan + `concurrently` relaytodeveloper mytid devchan return () -- Relay all Developer amessages from the developer's websocket @@ -183,17 +191,24 @@ developer o ssv sid conn = bracket setup cleanup go -- Relay user messages from the developer's clone of the -- broadcast channel to the developer's websocket. - relaytodeveloper devchan = do + relaytodeveloper mytid devchan = do v <- atomically $ readTMChan devchan case v of - Just l -> case loggedMessage l of - User m -> do - sendBinaryData conn (LogMessage (User m)) - relaytodeveloper devchan - -- TODO: Relay messages from other - -- developers, without looping back - -- the developer's own messages. - Developer _ -> relaytodeveloper devchan + Just (Broadcast l from) -> do + let sendit = sendBinaryData conn + (LogMessage $ loggedMessage l) + case loggedMessage l of + User _ -> sendit + -- Relay messages from other + -- developers, without looping + -- back the developer's own messages. + Developer _ -> do + mtid <- deRefWeak from + case mtid of + Just tid | tid == mytid -> + return () + _ -> sendit + relaytodeveloper mytid devchan Nothing -> do sendBinaryData conn Done return () @@ -207,7 +222,7 @@ developer o ssv sid conn = bracket setup cleanup go -- -- Note that the session may appear to freeze for other users while -- this is running. -replayBacklogAndListen :: ServerOpts -> SessionID -> Session -> WS.Connection -> IO (TMChan Log) +replayBacklogAndListen :: ServerOpts -> SessionID -> Session -> WS.Connection -> IO (TMChan (Broadcast Log)) replayBacklogAndListen o sid session conn = preventWriteWhile session o sid $ do replayBacklog o sid conn -- cgit v1.2.3