summaryrefslogtreecommitdiffhomepage
path: root/Server.hs
blob: c2589f1ad828f221ff2e4e1c0942b06e67f18943 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
{-# LANGUAGE OverloadedStrings, BangPatterns #-}

module Server where

import CmdLine
import WebSockets
import SessionID
import Log

import Network.Wai
import Network.Wai.Handler.Warp
import Network.Wai.Handler.WebSockets
import Network.WebSockets hiding (Message)
import qualified Network.WebSockets as WS
import Network.HTTP.Types
import Control.Concurrent.STM
import Control.Concurrent.STM.TMChan
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import qualified Data.Map as M
import qualified Data.Text as T
import Data.Time.Clock.POSIX
import System.IO

type ServerState = M.Map SessionID Session

newServerState :: IO (TVar ServerState)
newServerState = newTVarIO M.empty

-- | A session consists of a broadcast TMChan, which both users and
-- developers write messages to. Writes are stored in the log file,
-- and a log lock allows atomic access to the log file for replays.
data Session = Session (TMChan Log) (TVar Handle) (TMVar LogLock)

data LogLock = LogLock

newSession :: TVar Handle -> IO Session
newSession loghv = Session
	<$> newBroadcastTMChanIO
	<*> pure loghv
	<*> newTMVarIO LogLock

listenSession :: Session -> STM (TMChan Log)
listenSession (Session bchan _ _) = dupTMChan bchan

-- | While writing a log to the session the LogLock is drained until
-- the write has reached the log file. This prevents concurrent writes
-- to the file, and allows writes to be blocked while reading the log file.
writeSession :: Session -> Log -> IO ()
writeSession (Session bchan loghv loglock) l = do
	(ll, logh) <- atomically $ (,)
		<$> takeTMVar loglock
		<*> readTVar loghv
	writeLogHandle l logh
	atomically $ do
		putTMVar loglock ll
		writeTMChan bchan l

-- | Run an action with the log file quiescent (and its write handle closed),
-- and nothing being added to the session's broadcast TMChan.
preventWriteWhile :: Session -> ServerOpts -> SessionID -> IO a -> IO a
preventWriteWhile (Session _ loghv loglock) o sid a = bracket setup cleanup go
  where
	setup = do
		(ll, logh) <- atomically $ (,)
			<$> takeTMVar loglock
			<*> readTVar loghv
		hClose logh
		return ll
	cleanup ll = do
		let f = sessionLogFile (serverDirectory o) sid
		h <- openFile f WriteMode
		atomically $ do
			putTMVar loglock ll
			writeTVar loghv h
	go _ = a

closeSession :: Session -> STM ()
closeSession (Session bchan _ _) = closeTMChan bchan

server :: ServerOpts -> IO ()
server o = run (serverPort o) . app o =<< newServerState

app :: ServerOpts -> TVar ServerState -> Application
app o ssv = websocketsOr WS.defaultConnectionOptions (websocketApp o ssv) webapp
  where
	webapp _ respond = respond $
		responseLBS status400 [] "Not a WebSocket request"

websocketApp :: ServerOpts -> TVar ServerState -> WS.ServerApp
websocketApp o ssv pending_conn = do
	conn <- WS.acceptRequest pending_conn
	_v <- negotiateWireVersion conn
	theirmode <- getMode conn
	case theirmode of
		InitMode _ -> user o ssv conn
		ConnectMode t -> case mkSessionID (T.unpack t) of
			Nothing -> error "Invalid session id!"
			Just sid -> developer o ssv sid conn

user :: ServerOpts -> TVar ServerState -> WS.Connection -> IO ()
user o ssv conn = withSessionID (serverDirectory o) $ \(loghv, sid) -> do
	sendTextData conn sid
	bracket (setup sid loghv) (cleanup sid) go
  where
	setup sid loghv = do
		session <- newSession loghv
		atomically $ modifyTVar' ssv $ M.insert sid session
		return session

	cleanup sid session = atomically $ do
		closeSession session
		modifyTVar' ssv $ M.delete sid

	go session = do
		userchan <- atomically $ listenSession session
		_ <- relaytouser userchan
			`concurrently` relayfromuser session
		return ()
	
	-- Relay all messages from the user's websocket to the
	-- session broadcast channel.
	relayfromuser session = relayFromSocket conn $ \msg -> do
		l <- mkLog (User msg) <$> getPOSIXTime
		writeSession session l
	
	-- Relay developer messages from the channel to the user's websocket.
	relaytouser userchan = relayToSocket conn $ do
		v <- atomically $ readTMChan userchan
		return $ case v of
			Just l -> case loggedMessage l of
				Developer m -> Just m
				User _ -> Nothing
			Nothing -> Nothing

developer ::  ServerOpts -> TVar ServerState -> SessionID -> WS.Connection -> IO ()
developer o ssv sid conn = bracket setup cleanup go
  where
	setup = atomically $ M.lookup sid <$> readTVar ssv
	cleanup _ = return ()
	go Nothing = error "Invalid session id!"
	go (Just session) = do
		-- Sending the SessionID to the developer is redundant, but
		-- is done to make the protocol startup sequence the same as
		-- it is for the user.
		sendTextData conn sid
		devchan <- replayBacklog o sid session conn
		_ <- relayfromdeveloper session
			`concurrently` relaytodeveloper devchan
		return ()
	
	-- Relay all messages from the developer's websocket to the
	-- broadcast channel.
	relayfromdeveloper session = relayFromSocket conn $ \msg -> do
		l <- mkLog (Developer msg) <$> getPOSIXTime
		writeSession session l
	
	-- Relay user messages from the channel to the developer's websocket.
	relaytodeveloper devchan = relayToSocket conn $ do
		v <- atomically $ readTMChan devchan
		return $ case v of
			Just l -> case loggedMessage l of
				User m -> Just m
				Developer _ -> Nothing
			Nothing -> Nothing

-- | Replay the log of what's happened in the session so far,
-- and return a channel that will get new session activity.
--
-- This is done atomically; even if new activity arrives while it's
-- running nothing more will be logged until the log file has been
-- replayed and the channel set up. 
--
-- Note that the session may appear to freeze for other users while
-- this is running.
replayBacklog :: ServerOpts -> SessionID -> Session -> WS.Connection -> IO (TMChan Log)
replayBacklog o sid session conn = preventWriteWhile session o sid $ do
	ls <- streamLog (sessionLogFile (serverDirectory o) sid)
	forM_ ls $ \l -> case loggedMessage <$> l of
		Right (User m) -> sendBinaryData conn m
		Right (Developer _) -> return ()
		-- This should not happen, since writes to the log
		-- are blocked. Unless there's a disk error..
		Left _ -> return ()
	atomically $ listenSession session