summaryrefslogtreecommitdiffhomepage
path: root/Server.hs
blob: 70ded97edd79b0085a1d30286c53ed6dfbd043a5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
{- Copyright 2017 Joey Hess <id@joeyh.name>
 -
 - Licensed under the GNU AGPL version 3 or higher.
 -}

{-# LANGUAGE OverloadedStrings, BangPatterns #-}

module Server where

import Types
import CmdLine
import WebSockets
import SessionID
import Log

import Network.Wai
import Network.Wai.Handler.Warp
import Network.Wai.Handler.WebSockets
import Network.WebSockets hiding (Message)
import qualified Network.WebSockets as WS
import Network.HTTP.Types
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TMChan
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import Data.Maybe
import qualified Data.Map as M
import qualified Data.Text as T
import Data.Time.Clock.POSIX
import System.IO
import System.Directory
import System.Mem.Weak
import Network.Mail.Mime
import System.Environment

type ServerState = M.Map SessionID Session

newServerState :: IO (TVar ServerState)
newServerState = newTVarIO M.empty

-- | A session consists of a broadcast TMChan, which both users and
-- developers write messages to. Writes are stored in the log file,
-- and a log lock allows atomic access to the log file for replays.
data Session = Session (TMChan (Broadcast Log)) (TVar Handle) (TMVar LogLock)

data LogLock = LogLock

-- | A broadcast message, with the ThreadId of the sending thread
-- (which probably wants to ignore the message it sent).
data Broadcast a = Broadcast a (Weak ThreadId)

newSession :: TVar Handle -> IO Session
newSession loghv = Session
	<$> newBroadcastTMChanIO
	<*> pure loghv
	<*> newTMVarIO LogLock

listenSession :: Session -> STM (TMChan (Broadcast Log))
listenSession (Session bchan _ _) = dupTMChan bchan

-- | While writing a log to the session the LogLock is drained until
-- the write has reached the log file. This prevents concurrent writes
-- to the file, and allows writes to be blocked while reading the log file.
writeSession :: Weak ThreadId -> Session -> Log -> IO ()
writeSession tid (Session bchan loghv loglock) l = do
	(ll, logh) <- atomically $ (,)
		<$> takeTMVar loglock
		<*> readTVar loghv
	writeLogHandle l logh
	atomically $ do
		putTMVar loglock ll
		writeTMChan bchan (Broadcast l tid)

-- | Run an action with the log file quiescent (and its write handle closed),
-- and nothing being added to the session's broadcast TMChan.
preventWriteWhile :: Session -> ServerOpts -> SessionID -> IO a -> IO a
preventWriteWhile (Session _ loghv loglock) o sid a = bracket setup cleanup go
  where
	setup = do
		(ll, logh) <- atomically $ (,)
			<$> takeTMVar loglock
			<*> readTVar loghv
		hClose logh
		return ll
	cleanup ll = do
		let f = sessionLogFile (serverDirectory o) sid
		h <- openFile f AppendMode
		atomically $ do
			putTMVar loglock ll
			writeTVar loghv h
	go _ = a

closeSession :: Session -> STM ()
closeSession (Session bchan _ _) = closeTMChan bchan

server :: ServerOpts -> IO ()
server o = do
	o' <- checkEnv o
	runSettings settings . app o' =<< newServerState
  where
	settings =
		-- Prefer IPv6 but allow IPv4 as well
		-- (Workaround for
		-- https://github.com/jaspervdj/websockets/issues/140)
		setHost "*6" $
		setPort (serverPort o) $
		defaultSettings

checkEnv :: ServerOpts -> IO ServerOpts
checkEnv o = go <$> lookupEnv "DEBUG_ME_FROM_EMAIL"
  where
	go Nothing = o
	go (Just email) = o { serverEmail = Just (T.pack email) }

app :: ServerOpts -> TVar ServerState -> Application
app o ssv = websocketsOr connectionOptions (websocketApp o ssv) webapp
  where
	webapp _ respond = respond $
		responseLBS status400 [] "This is a debug-me server, it does not serve any html pages."

websocketApp :: ServerOpts -> TVar ServerState -> WS.ServerApp
websocketApp o ssv pending_conn = do
	conn <- WS.acceptRequest pending_conn
	_v <- negotiateWireVersion conn
	r <- receiveData conn
	case r of
		SelectMode ClientSends (InitMode email) -> user email o ssv conn
		SelectMode ClientSends (ConnectMode t) -> 
			case mkSessionID (T.unpack t) of
				Nothing -> protocolError conn "Invalid session id!"
				Just sid -> developer o ssv sid conn
		_ -> protocolError conn "Expected SelectMode"

user :: EmailAddress -> ServerOpts -> TVar ServerState -> WS.Connection -> IO ()
user email o ssv conn = do
	sid <- withSessionID (serverDirectory o) $ \(loghv, sid) -> do
		sendBinaryData conn (Ready ServerSends sid)
		bracket (setup sid loghv) (cleanup sid) go
		return sid
	doneSessionLog email o sid
  where
	setup sid loghv = do
		session <- newSession loghv
		atomically $ modifyTVar' ssv $ M.insert sid session
		return session

	cleanup sid session = do
		atomically $ do
			closeSession session
			modifyTVar' ssv $ M.delete sid

	go session = do
		mytid <- mkWeakThreadId =<< myThreadId
		userchan <- atomically $ listenSession session
		_ <- relaytouser userchan
			`race` relayfromuser mytid session
		return ()
	
	-- Relay all messages from the user's websocket to the
	-- session broadcast channel.
	-- (The user is allowed to send Developer messages too.. perhaps
	-- they got them from a developer connected to them some other
	-- way.)
	relayfromuser mytid session = relayFromSocket conn $ \msg -> do
		l <- mkLog msg <$> getPOSIXTime
		writeSession mytid session l
	
	-- Relay Developer messages from the channel to the user's websocket.
	relaytouser userchan = do
		v <- atomically $ readTMChan userchan
		case v of
			Just (Broadcast l _from) -> case loggedMessage l of
				Developer m -> do
					sendBinaryData conn (AnyMessage (Developer m))
					relaytouser userchan
				User _ -> relaytouser userchan
			Nothing -> return ()

developer ::  ServerOpts -> TVar ServerState -> SessionID -> WS.Connection -> IO ()
developer o ssv sid conn = bracket setup cleanup go
  where
	setup = atomically $ M.lookup sid <$> readTVar ssv
	cleanup _ = return ()
	go Nothing = do
		exists <- doesFileExist $
			sessionLogFile (serverDirectory o) sid
		if exists
			then do
				sendBinaryData conn (Ready ServerSends sid)
				replayBacklog o sid conn
				sendBinaryData conn Done
			else protocolError conn "Unknown session ID"
	go (Just session) = do
		sendBinaryData conn (Ready ServerSends sid)
		devchan <- replayBacklogAndListen o sid session conn
		mytid <- mkWeakThreadId =<< myThreadId
		_ <- relayfromdeveloper mytid session
			`concurrently` relaytodeveloper mytid devchan
		return ()
	
	-- Relay all Developer amessages from the developer's websocket
	-- to the broadcast channel.
	relayfromdeveloper mytid session = relayFromSocket conn
		$ \msg -> case msg of
			Developer _ -> do
				l <- mkLog msg <$> getPOSIXTime
				writeSession mytid session l
			-- developer cannot send User messages
			User _ -> return ()
	
	-- Relay user messages from the developer's clone of the
	-- broadcast channel to the developer's websocket.
	relaytodeveloper mytid devchan = do
		v <- atomically $ readTMChan devchan
		case v of
			Just (Broadcast l from) -> do
				let sendit = sendBinaryData conn
					(AnyMessage $ loggedMessage l)
				case loggedMessage l of
					User _ -> sendit
					-- Relay messages from other
					-- developers, without looping
					-- back the developer's own messages.
					Developer _ -> do
						rfrom <- deRefWeak from
						rmy <- deRefWeak mytid
						if rfrom == rmy
							then return ()
							else sendit
				relaytodeveloper mytid devchan
			Nothing -> do
				sendBinaryData conn Done
				return ()

-- | Replay the log of what's happened in the session so far,
-- and return a channel that will get new session activity.
--
-- This is done atomically; even if new activity arrives while it's
-- running nothing more will be logged until the log file has been
-- replayed and the channel set up. 
--
-- Note that the session may appear to freeze for other users while
-- this is running.
replayBacklogAndListen :: ServerOpts -> SessionID -> Session -> WS.Connection -> IO (TMChan (Broadcast Log))
replayBacklogAndListen o sid session conn = 
	preventWriteWhile session o sid $ do
		replayBacklog o sid conn
		atomically $ listenSession session

replayBacklog :: ServerOpts -> SessionID -> WS.Connection -> IO ()
replayBacklog o sid conn = do
	ls <- streamLog (sessionLogFile (serverDirectory o) sid)
	forM_ ls $ \l -> case loggedMessage <$> l of
		Right m -> sendBinaryData conn (AnyMessage m)
		Left _ -> return ()

doneSessionLog :: EmailAddress -> ServerOpts -> SessionID -> IO ()
doneSessionLog email o sid = do
	let logfile = sessionLogFile (serverDirectory o) sid
	emailSessionLog email o logfile
	if serverDeleteOldLogs o
		then removeFile logfile
		else return ()

emailSessionLog :: EmailAddress -> ServerOpts -> FilePath -> IO ()
emailSessionLog email o logfile
	| isemail = renderSendMail 
		=<< simpleMail to from subject body body [("text/plain", logfile)]
	| otherwise = return ()
  where
	to = Address Nothing email
	from = Address Nothing $ fromMaybe "postmaster" (serverEmail o)
	subject = "Your recent debug-me session"
	body = "Attached is the log from your recent debug-me session."
	isemail = "@" `T.isInfixOf` email