From b47e621749257331788e82e44d1565cf4d32d04b Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Sun, 30 Apr 2017 13:54:02 -0400 Subject: fix probable race in use of restoreHashes I think there was a race where a SessionKey message had been drained from the TChan, but not yet added to the developer state, which was resonsible for recent instability at startup. It manifested as protocol errors where the prevActivity hash was wrongly Nothing. Fixed by adding a MissingHashes type to tag things whose hashes have been stripped, and adding back the hashes when needed, which always happens inside atomically blocks, so won't have such a race. --- WebSockets.hs | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) (limited to 'WebSockets.hs') diff --git a/WebSockets.hs b/WebSockets.hs index 7cb140b..bbf21e3 100644 --- a/WebSockets.hs +++ b/WebSockets.hs @@ -63,12 +63,11 @@ runClientApp app = do -- | Make a client that sends and receives AnyMessages over a websocket. clientApp :: Mode - -> RecentActivity -> (sent -> AnyMessage) -> (AnyMessage -> Maybe received) - -> (TMChan sent -> TMChan received -> SessionID -> IO a) + -> (TMChan sent -> TMChan (MissingHashes received) -> SessionID -> IO a) -> ClientApp a -clientApp mode recentactivity mksent filterreceived a conn = do +clientApp mode mksent filterreceived a conn = do -- Ping every 30 seconds to avoid timeouts caused by proxies etc. forkPingThread conn 30 _v <- negotiateWireVersion conn @@ -85,10 +84,10 @@ clientApp mode recentactivity mksent filterreceived a conn = do sthread <- async $ relayToSocket conn mksent $ atomically (readTMChan schan) rthread <- async $ do - relayFromSocket conn recentactivity (waitTillDrained rchan) $ \v -> do + relayFromSocket conn $ \v -> do case filterreceived v of Nothing -> return () - Just r -> atomically $ writeTMChan rchan r + Just r -> atomically $ writeTMChan rchan (MissingHashes r) -- Server sent Done, so close channels. atomically $ do closeTMChan schan @@ -104,24 +103,14 @@ clientApp mode recentactivity mksent filterreceived a conn = do void $ waitCatch rthread go sid (schan, rchan, _, _) = a schan rchan sid -waitTillDrained :: TMChan a -> IO () -waitTillDrained c = atomically $ do - e <- isEmptyTMChan c - if e - then return () - else retry - -relayFromSocket :: Connection -> RecentActivity -> IO () -> (AnyMessage -> IO ()) -> IO () -relayFromSocket conn recentactivity waitprevprocessed sender = go +relayFromSocket :: Connection -> (AnyMessage -> IO ()) -> IO () +relayFromSocket conn sender = go where go = do r <- receiveData conn case r of AnyMessage msg -> do - waitprevprocessed - msg' <- atomically $ - restorePrevActivityHash recentactivity msg - sender msg' + sender msg go Done -> return () WireProtocolError e -> protocolError conn e @@ -135,8 +124,9 @@ relayToSocket conn mksent getter = go case mmsg of Nothing -> return () Just msg -> do - sendBinaryData conn $ AnyMessage $ - removePrevActivityHash $ mksent msg + let MissingHashes wiremsg = + removeHashes $ mksent msg + sendBinaryData conn $ AnyMessage wiremsg go -- | Framing protocol used over a websocket connection. -- cgit v1.2.3