summaryrefslogtreecommitdiffhomepage
path: root/WebSockets.hs
diff options
context:
space:
mode:
Diffstat (limited to 'WebSockets.hs')
-rw-r--r--WebSockets.hs30
1 files changed, 10 insertions, 20 deletions
diff --git a/WebSockets.hs b/WebSockets.hs
index 7cb140b..bbf21e3 100644
--- a/WebSockets.hs
+++ b/WebSockets.hs
@@ -63,12 +63,11 @@ runClientApp app = do
-- | Make a client that sends and receives AnyMessages over a websocket.
clientApp
:: Mode
- -> RecentActivity
-> (sent -> AnyMessage)
-> (AnyMessage -> Maybe received)
- -> (TMChan sent -> TMChan received -> SessionID -> IO a)
+ -> (TMChan sent -> TMChan (MissingHashes received) -> SessionID -> IO a)
-> ClientApp a
-clientApp mode recentactivity mksent filterreceived a conn = do
+clientApp mode mksent filterreceived a conn = do
-- Ping every 30 seconds to avoid timeouts caused by proxies etc.
forkPingThread conn 30
_v <- negotiateWireVersion conn
@@ -85,10 +84,10 @@ clientApp mode recentactivity mksent filterreceived a conn = do
sthread <- async $ relayToSocket conn mksent $
atomically (readTMChan schan)
rthread <- async $ do
- relayFromSocket conn recentactivity (waitTillDrained rchan) $ \v -> do
+ relayFromSocket conn $ \v -> do
case filterreceived v of
Nothing -> return ()
- Just r -> atomically $ writeTMChan rchan r
+ Just r -> atomically $ writeTMChan rchan (MissingHashes r)
-- Server sent Done, so close channels.
atomically $ do
closeTMChan schan
@@ -104,24 +103,14 @@ clientApp mode recentactivity mksent filterreceived a conn = do
void $ waitCatch rthread
go sid (schan, rchan, _, _) = a schan rchan sid
-waitTillDrained :: TMChan a -> IO ()
-waitTillDrained c = atomically $ do
- e <- isEmptyTMChan c
- if e
- then return ()
- else retry
-
-relayFromSocket :: Connection -> RecentActivity -> IO () -> (AnyMessage -> IO ()) -> IO ()
-relayFromSocket conn recentactivity waitprevprocessed sender = go
+relayFromSocket :: Connection -> (AnyMessage -> IO ()) -> IO ()
+relayFromSocket conn sender = go
where
go = do
r <- receiveData conn
case r of
AnyMessage msg -> do
- waitprevprocessed
- msg' <- atomically $
- restorePrevActivityHash recentactivity msg
- sender msg'
+ sender msg
go
Done -> return ()
WireProtocolError e -> protocolError conn e
@@ -135,8 +124,9 @@ relayToSocket conn mksent getter = go
case mmsg of
Nothing -> return ()
Just msg -> do
- sendBinaryData conn $ AnyMessage $
- removePrevActivityHash $ mksent msg
+ let MissingHashes wiremsg =
+ removeHashes $ mksent msg
+ sendBinaryData conn $ AnyMessage wiremsg
go
-- | Framing protocol used over a websocket connection.