summaryrefslogtreecommitdiffhomepage
path: root/WebSockets.hs
diff options
context:
space:
mode:
Diffstat (limited to 'WebSockets.hs')
-rw-r--r--WebSockets.hs25
1 files changed, 19 insertions, 6 deletions
diff --git a/WebSockets.hs b/WebSockets.hs
index f159271..00f762a 100644
--- a/WebSockets.hs
+++ b/WebSockets.hs
@@ -18,6 +18,7 @@ module WebSockets (
import Types
import SessionID
import ProtocolBuffers
+import PrevActivity
import Network.WebSockets hiding (Message)
import Control.Concurrent.STM
@@ -62,11 +63,12 @@ runClientApp app = do
-- | Make a client that sends and receives AnyMessages over a websocket.
clientApp
:: Mode
+ -> RecentActivity
-> (sent -> AnyMessage)
-> (AnyMessage -> Maybe received)
-> (TMChan sent -> TMChan received -> SessionID -> IO a)
-> ClientApp a
-clientApp mode mksent filterreceived a conn = do
+clientApp mode recentactivity mksent filterreceived a conn = do
-- Ping every 30 seconds to avoid timeouts caused by proxies etc.
forkPingThread conn 30
_v <- negotiateWireVersion conn
@@ -83,7 +85,7 @@ clientApp mode mksent filterreceived a conn = do
sthread <- async $ relayToSocket conn mksent $
atomically (readTMChan schan)
rthread <- async $ do
- relayFromSocket conn $ \v -> do
+ relayFromSocket conn recentactivity (waitTillDrained rchan) $ \v -> do
case filterreceived v of
Nothing -> return ()
Just r -> atomically $ writeTMChan rchan r
@@ -101,14 +103,24 @@ clientApp mode mksent filterreceived a conn = do
void $ waitCatch rthread
go sid (schan, rchan, _, _) = a schan rchan sid
-relayFromSocket :: Connection -> (AnyMessage -> IO ()) -> IO ()
-relayFromSocket conn sender = go
+waitTillDrained :: TMChan a -> IO ()
+waitTillDrained c = atomically $ do
+ e <- isEmptyTMChan c
+ if e
+ then return ()
+ else retry
+
+relayFromSocket :: Connection -> RecentActivity -> IO () -> (AnyMessage -> IO ()) -> IO ()
+relayFromSocket conn recentactivity waitprevprocessed sender = go
where
go = do
r <- receiveData conn
case r of
AnyMessage msg -> do
- sender msg
+ waitprevprocessed
+ msg' <- atomically $
+ restorePrevActivityHash recentactivity msg
+ sender msg'
go
Done -> return ()
WireProtocolError e -> protocolError conn e
@@ -122,7 +134,8 @@ relayToSocket conn mksent getter = go
case mmsg of
Nothing -> return ()
Just msg -> do
- sendBinaryData conn $ AnyMessage $ mksent msg
+ sendBinaryData conn $ AnyMessage $
+ removePrevActivityHash $ mksent msg
go
-- | Framing protocol used over a websocket connection.