diff options
Diffstat (limited to 'WebSockets.hs')
-rw-r--r-- | WebSockets.hs | 25 |
1 files changed, 19 insertions, 6 deletions
diff --git a/WebSockets.hs b/WebSockets.hs index f159271..00f762a 100644 --- a/WebSockets.hs +++ b/WebSockets.hs @@ -18,6 +18,7 @@ module WebSockets ( import Types import SessionID import ProtocolBuffers +import PrevActivity import Network.WebSockets hiding (Message) import Control.Concurrent.STM @@ -62,11 +63,12 @@ runClientApp app = do -- | Make a client that sends and receives AnyMessages over a websocket. clientApp :: Mode + -> RecentActivity -> (sent -> AnyMessage) -> (AnyMessage -> Maybe received) -> (TMChan sent -> TMChan received -> SessionID -> IO a) -> ClientApp a -clientApp mode mksent filterreceived a conn = do +clientApp mode recentactivity mksent filterreceived a conn = do -- Ping every 30 seconds to avoid timeouts caused by proxies etc. forkPingThread conn 30 _v <- negotiateWireVersion conn @@ -83,7 +85,7 @@ clientApp mode mksent filterreceived a conn = do sthread <- async $ relayToSocket conn mksent $ atomically (readTMChan schan) rthread <- async $ do - relayFromSocket conn $ \v -> do + relayFromSocket conn recentactivity (waitTillDrained rchan) $ \v -> do case filterreceived v of Nothing -> return () Just r -> atomically $ writeTMChan rchan r @@ -101,14 +103,24 @@ clientApp mode mksent filterreceived a conn = do void $ waitCatch rthread go sid (schan, rchan, _, _) = a schan rchan sid -relayFromSocket :: Connection -> (AnyMessage -> IO ()) -> IO () -relayFromSocket conn sender = go +waitTillDrained :: TMChan a -> IO () +waitTillDrained c = atomically $ do + e <- isEmptyTMChan c + if e + then return () + else retry + +relayFromSocket :: Connection -> RecentActivity -> IO () -> (AnyMessage -> IO ()) -> IO () +relayFromSocket conn recentactivity waitprevprocessed sender = go where go = do r <- receiveData conn case r of AnyMessage msg -> do - sender msg + waitprevprocessed + msg' <- atomically $ + restorePrevActivityHash recentactivity msg + sender msg' go Done -> return () WireProtocolError e -> protocolError conn e @@ -122,7 +134,8 @@ relayToSocket conn mksent getter = go case mmsg of Nothing -> return () Just msg -> do - sendBinaryData conn $ AnyMessage $ mksent msg + sendBinaryData conn $ AnyMessage $ + removePrevActivityHash $ mksent msg go -- | Framing protocol used over a websocket connection. |