1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
|
{- Copyright 2017 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
{-# LANGUAGE OverloadedStrings, BangPatterns #-}
module Server where
import Types
import CmdLine
import WebSockets
import SessionID
import Log
import Network.Wai
import Network.Wai.Handler.Warp
import Network.Wai.Handler.WebSockets
import Network.WebSockets hiding (Message)
import qualified Network.WebSockets as WS
import Network.HTTP.Types
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TMChan
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import Data.Maybe
import qualified Data.Map as M
import qualified Data.Text as T
import Data.Time.Clock.POSIX
import System.IO
import System.Directory
import System.Mem.Weak
import Network.Mail.Mime
import System.Environment
type ServerState = M.Map SessionID Session
newServerState :: IO (TVar ServerState)
newServerState = newTVarIO M.empty
-- | A session consists of a broadcast TMChan, which both users and
-- developers write messages to. Writes are stored in the log file,
-- and a log lock allows atomic access to the log file for replays.
data Session = Session (TMChan (Broadcast Log)) (TVar Handle) (TMVar LogLock)
data LogLock = LogLock
-- | A broadcast message, with the ThreadId of the sending thread
-- (which probably wants to ignore the message it sent).
data Broadcast a = Broadcast a (Weak ThreadId)
newSession :: TVar Handle -> IO Session
newSession loghv = Session
<$> newBroadcastTMChanIO
<*> pure loghv
<*> newTMVarIO LogLock
listenSession :: Session -> STM (TMChan (Broadcast Log))
listenSession (Session bchan _ _) = dupTMChan bchan
-- | While writing a log to the session the LogLock is drained until
-- the write has reached the log file. This prevents concurrent writes
-- to the file, and allows writes to be blocked while reading the log file.
writeSession :: Weak ThreadId -> Session -> Log -> IO ()
writeSession tid (Session bchan loghv loglock) l = do
(ll, logh) <- atomically $ (,)
<$> takeTMVar loglock
<*> readTVar loghv
writeLogHandle l logh
atomically $ do
putTMVar loglock ll
writeTMChan bchan (Broadcast l tid)
-- | Run an action with the log file quiescent (and its write handle closed),
-- and nothing being added to the session's broadcast TMChan.
preventWriteWhile :: Session -> ServerOpts -> SessionID -> IO a -> IO a
preventWriteWhile (Session _ loghv loglock) o sid a = bracket setup cleanup go
where
setup = do
(ll, logh) <- atomically $ (,)
<$> takeTMVar loglock
<*> readTVar loghv
hClose logh
return ll
cleanup ll = do
let f = sessionLogFile (serverDirectory o) sid
h <- openFile f AppendMode
atomically $ do
putTMVar loglock ll
writeTVar loghv h
go _ = a
closeSession :: Session -> STM ()
closeSession (Session bchan _ _) = closeTMChan bchan
server :: ServerOpts -> IO ()
server o = do
o' <- checkEnv o
runSettings settings . app o' =<< newServerState
where
settings =
-- Prefer IPv6 but allow IPv4 as well
-- (Workaround for
-- https://github.com/jaspervdj/websockets/issues/140)
setHost "*6" $
setPort (serverPort o) $
defaultSettings
checkEnv :: ServerOpts -> IO ServerOpts
checkEnv o = go <$> lookupEnv "DEBUG_ME_FROM_EMAIL"
where
go Nothing = o
go (Just email) = o { serverEmail = Just (T.pack email) }
app :: ServerOpts -> TVar ServerState -> Application
app o ssv = websocketsOr connectionOptions (websocketApp o ssv) webapp
where
webapp _ respond = respond $
responseLBS status400 [] "This is a debug-me server, it does not serve any html pages."
websocketApp :: ServerOpts -> TVar ServerState -> WS.ServerApp
websocketApp o ssv pending_conn = do
conn <- WS.acceptRequest pending_conn
_v <- negotiateWireVersion conn
r <- receiveData conn
case r of
SelectMode ClientSends (InitMode email) -> user email o ssv conn
SelectMode ClientSends (ConnectMode t) ->
case mkSessionID (T.unpack t) of
Nothing -> protocolError conn "Invalid session id!"
Just sid -> developer o ssv sid conn
_ -> protocolError conn "Expected SelectMode"
user :: EmailAddress -> ServerOpts -> TVar ServerState -> WS.Connection -> IO ()
user email o ssv conn = do
sid <- withSessionID (serverDirectory o) $ \(loghv, sid) -> do
sendBinaryData conn (Ready ServerSends sid)
bracket (setup sid loghv) (cleanup sid) go
return sid
doneSessionLog email o sid
where
setup sid loghv = do
session <- newSession loghv
atomically $ modifyTVar' ssv $ M.insert sid session
return session
cleanup sid session = do
atomically $ do
closeSession session
modifyTVar' ssv $ M.delete sid
go session = do
mytid <- mkWeakThreadId =<< myThreadId
userchan <- atomically $ listenSession session
_ <- relaytouser userchan
`race` relayfromuser mytid session
return ()
-- Relay all messages from the user's websocket to the
-- session broadcast channel.
-- (The user is allowed to send Developer messages too.. perhaps
-- they got them from a developer connected to them some other
-- way.)
relayfromuser mytid session = relayFromSocket conn $ \msg -> do
l <- mkLog msg <$> getPOSIXTime
writeSession mytid session l
-- Relay Developer messages from the channel to the user's websocket.
relaytouser userchan = do
v <- atomically $ readTMChan userchan
case v of
Just (Broadcast l _from) -> case loggedMessage l of
Developer m -> do
sendBinaryData conn (AnyMessage (Developer m))
relaytouser userchan
User _ -> relaytouser userchan
Nothing -> return ()
developer :: ServerOpts -> TVar ServerState -> SessionID -> WS.Connection -> IO ()
developer o ssv sid conn = bracket setup cleanup go
where
setup = atomically $ M.lookup sid <$> readTVar ssv
cleanup _ = return ()
go Nothing = do
exists <- doesFileExist $
sessionLogFile (serverDirectory o) sid
if exists
then do
sendBinaryData conn (Ready ServerSends sid)
replayBacklog o sid conn
sendBinaryData conn Done
else protocolError conn "Unknown session ID"
go (Just session) = do
sendBinaryData conn (Ready ServerSends sid)
devchan <- replayBacklogAndListen o sid session conn
mytid <- mkWeakThreadId =<< myThreadId
_ <- relayfromdeveloper mytid session
`concurrently` relaytodeveloper mytid devchan
return ()
-- Relay all Developer amessages from the developer's websocket
-- to the broadcast channel.
relayfromdeveloper mytid session = relayFromSocket conn
$ \msg -> case msg of
Developer _ -> do
l <- mkLog msg <$> getPOSIXTime
writeSession mytid session l
-- developer cannot send User messages
User _ -> return ()
-- Relay user messages from the developer's clone of the
-- broadcast channel to the developer's websocket.
relaytodeveloper mytid devchan = do
v <- atomically $ readTMChan devchan
case v of
Just (Broadcast l from) -> do
let sendit = sendBinaryData conn
(AnyMessage $ loggedMessage l)
case loggedMessage l of
User _ -> sendit
-- Relay messages from other
-- developers, without looping
-- back the developer's own messages.
Developer _ -> do
rfrom <- deRefWeak from
rmy <- deRefWeak mytid
if rfrom == rmy
then return ()
else sendit
relaytodeveloper mytid devchan
Nothing -> do
sendBinaryData conn Done
return ()
-- | Replay the log of what's happened in the session so far,
-- and return a channel that will get new session activity.
--
-- This is done atomically; even if new activity arrives while it's
-- running nothing more will be logged until the log file has been
-- replayed and the channel set up.
--
-- Note that the session may appear to freeze for other users while
-- this is running.
replayBacklogAndListen :: ServerOpts -> SessionID -> Session -> WS.Connection -> IO (TMChan (Broadcast Log))
replayBacklogAndListen o sid session conn =
preventWriteWhile session o sid $ do
replayBacklog o sid conn
atomically $ listenSession session
replayBacklog :: ServerOpts -> SessionID -> WS.Connection -> IO ()
replayBacklog o sid conn = do
ls <- streamLog (sessionLogFile (serverDirectory o) sid)
forM_ ls $ \l -> case loggedMessage <$> l of
Right m -> sendBinaryData conn (AnyMessage m)
Left _ -> return ()
doneSessionLog :: EmailAddress -> ServerOpts -> SessionID -> IO ()
doneSessionLog email o sid = do
let logfile = sessionLogFile (serverDirectory o) sid
emailSessionLog email o logfile
if serverDeleteOldLogs o
then removeFile logfile
else return ()
emailSessionLog :: EmailAddress -> ServerOpts -> FilePath -> IO ()
emailSessionLog email o logfile
| isemail = renderSendMail
=<< simpleMail to from subject body body [("text/plain", logfile)]
| otherwise = return ()
where
to = Address Nothing email
from = Address Nothing $ fromMaybe "postmaster" (serverEmail o)
subject = "Your recent debug-me session"
body = "Attached is the log from your recent debug-me session."
isemail = "@" `T.isInfixOf` email
|