From 48e49d83867a5335f5e45a42dbac202caa42cd5d Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 13 Sep 2016 17:13:19 -0400 Subject: implemented dynamic rate limiting --- HTTP/RateLimit.hs | 116 +++++++++++++++++++++++++++++++++++++++++------------- HTTP/Server.hs | 20 +++++----- 2 files changed, 100 insertions(+), 36 deletions(-) (limited to 'HTTP') diff --git a/HTTP/RateLimit.hs b/HTTP/RateLimit.hs index da22b92..d9ec752 100644 --- a/HTTP/RateLimit.hs +++ b/HTTP/RateLimit.hs @@ -9,15 +9,22 @@ import Types.Cost import HTTP import HTTP.ProofOfWork import Tunables +import CmdLine (ServerConfig(..)) +import Types.Storage +import Storage.Local import Servant +import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.TokenBucket +import Control.Concurrent.Thread.Delay import qualified Data.BloomFilter.Mutable as BloomFilter import qualified Data.BloomFilter.Hash as BloomFilter import Data.BloomFilter.Easy (suggestSizing) +import Control.Monad import Control.Monad.ST import Control.Exception.Lifted (bracket) import System.IO +import System.DiskSpace import Data.Maybe import Data.Word import Control.Monad.IO.Class @@ -38,6 +45,7 @@ data RateLimiter = RateLimiter , numRandomSalts :: TMVar Int , randomSaltGenerationLimiter :: TokenBucket , blockedRequestQueue :: TMVar [()] + , requestCounter :: TMVar Integer } type BloomFilter = TMVar (BloomFilter.MBloom RealWorld RandomSalt) @@ -61,17 +69,21 @@ minFillInterval = 2 * 60 * 1000000 -- 1 token every other minute burstSize :: Word64 burstSize = 4 -- 256 kb immediate storage -newRateLimiter :: IO RateLimiter -newRateLimiter = RateLimiter - <$> (newTMVarIO =<< mkbuckets (sdiv maxProofOfWork 2) []) - <*> newTMVarIO [] - <*> mkBloomFilter - <*> mkBloomFilter - <*> mkBloomFilter - <*> mkBloomFilter - <*> newTMVarIO 0 - <*> newTokenBucket - <*> newTMVarIO [] +newRateLimiter :: ServerConfig -> Maybe LocalStorageDirectory -> IO RateLimiter +newRateLimiter cfg storedir = do + rl <- RateLimiter + <$> (newTMVarIO =<< mkbuckets (sdiv maxProofOfWork 2) []) + <*> newTMVarIO [] + <*> mkBloomFilter + <*> mkBloomFilter + <*> mkBloomFilter + <*> mkBloomFilter + <*> newTMVarIO 0 + <*> newTokenBucket + <*> newTMVarIO [] + <*> newTMVarIO 0 + _ <- forkIO (adjusterThread cfg storedir rl) + return rl where -- The last bucket takes half of maxProofOfWork to access, and -- each earlier bucket quarters that time, down to the first bucket, @@ -136,7 +148,7 @@ rateLimit ratelimiter mpow p a = do allowed <- liftIO $ tokenBucketTryAlloc (tokenBucket b) burstSize (fillInterval b) 1 if allowed - then Result <$> a + then allowRequest ratelimiter a else go bs checkValidSalt :: RateLimiter -> Maybe ProofOfWork -> IO Bool @@ -255,7 +267,7 @@ allBucketsEmpty ratelimiter a = bracket (liftIO addq) (liftIO . removeq) go go True = do ok <- liftIO waitlast if ok - then Result <$> a + then allowRequest ratelimiter a else giveup giveup = do @@ -265,13 +277,14 @@ allBucketsEmpty ratelimiter a = bracket (liftIO addq) (liftIO . removeq) go -- | How much data could be stored, in bytes per second, assuming all -- buckets in the rate limiter being constantly drained by requests, -- and all requests store objects. -maximumStorageRate :: RateLimiter -> IO Int +maximumStorageRate :: RateLimiter -> IO Integer maximumStorageRate ratelimiter = do bs <- getBuckets ratelimiter return $ sum $ map calc bs where storesize = maximum knownObjectSizes - calc b = (storesize * 1000000) `div` fromIntegral (fillInterval b) + calc b = fromIntegral $ + (storesize * 1000000) `div` fromIntegral (fillInterval b) describeRateLimiter :: RateLimiter -> IO String describeRateLimiter ratelimiter = do @@ -280,13 +293,14 @@ describeRateLimiter ratelimiter = do return $ concat [ "rate limiter buckets: " ++ show bs , " ; maximum allowed storage rate: " - , showrate (storerate * 60 * 60 * 24 * 31) ++ "/month" + , showBytes (storerate * 60 * 60 * 24 * 31) ++ "/month" ] - where - showrate n - | n < 1024*1024 = show (n `div` 1024) ++ " KiB" - | n < 1024*1024*1024 = show (n `div` (1024 * 1024)) ++ " MiB" - | otherwise = show (n `div` (1024 * 1024 * 1024)) ++ " GiB" + +showBytes :: Integer -> String +showBytes n + | n <= 1024*1024 = show (n `div` 1024) ++ " KiB" + | n <= 1024*1024*1024 = show (n `div` (1024 * 1024)) ++ " MiB" + | otherwise = show (n `div` (1024 * 1024 * 1024)) ++ " GiB" instance Show Bucket where show b = show (fillInterval b `div` (60 * 1000000)) ++ " Second/Request" @@ -297,11 +311,13 @@ increaseDifficulty ratelimiter = do bs <- getBuckets ratelimiter case bs of [] -> unable - (b:[]) -> do - -- Make the remaining bucket take longer to fill. - let b' = b { fillInterval = fillInterval b * 2 } - putBuckets ratelimiter [b'] - done + (b:[]) + | fillInterval b < maxBound `div` 2 -> do + -- Make the remaining bucket take longer to fill. + let b' = b { fillInterval = fillInterval b * 2 } + putBuckets ratelimiter [b'] + done + | otherwise -> unable (b:rest) -> do -- Remove less expensive to access buckets, -- so that clients have to do some work. @@ -313,7 +329,7 @@ increaseDifficulty ratelimiter = do putBuckets ratelimiter rest done where - unable = putStrLn "unable to increase difficulty; out of buckets" + unable = putStrLn "Unable to increase difficulty any further!" done = do desc <- describeRateLimiter ratelimiter putStrLn $ "increased difficulty -- " ++ desc @@ -348,3 +364,49 @@ reduceDifficulty ratelimiter = do done = do desc <- describeRateLimiter ratelimiter putStrLn $ "reduced difficulty -- " ++ desc + +allowRequest :: RateLimiter -> Handler a -> Handler (POWGuarded a) +allowRequest ratelimiter a = do + liftIO $ addRequest ratelimiter 1 + Result <$> a + +addRequest :: RateLimiter -> Integer -> IO () +addRequest ratelimiter n = liftIO $ atomically $ do + v <- takeTMVar c + putTMVar c (v + n) + where + c = requestCounter ratelimiter + +-- Thread that wakes up periodically and checks the request rate +-- against the available disk space. If the disk is filling too quickly, +-- the difficulty is increased. +adjusterThread :: ServerConfig -> Maybe LocalStorageDirectory -> RateLimiter -> IO () +adjusterThread cfg storedir ratelimiter = forever $ do + delay (1000000 * intervalsecs) + checkRequestRate cfg storedir ratelimiter intervalsecs + where + intervalsecs = 60*60 + +checkRequestRate :: ServerConfig -> Maybe LocalStorageDirectory -> RateLimiter -> Integer -> IO () +checkRequestRate cfg storedir ratelimiter intervalsecs = do + let storesize = maximum knownObjectSizes + n <- liftIO $ atomically $ swapTMVar (requestCounter ratelimiter) 0 + let maxstoredinterval = n * fromIntegral storesize + let maxstoredthismonth = maxstoredinterval * (intervalsecs `div` (60*60)) * 24 * 31 + freespace <- diskFree <$> localDiskUsage storedir + let target = monthsToFillHalfDisk cfg + let estimate = if maxstoredthismonth <= 0 + then 10000 + else freespace `div` maxstoredthismonth `div` 2 + putStrLn $ unlines + [ "rate limit check" + , " free disk space:" ++ showBytes freespace + , " number of requests since last check: " ++ show n + , " estimated max incoming data in the next month: " ++ showBytes maxstoredthismonth + , " estimate min " ++ show estimate ++ " months to fill half of disk" + ] + if estimate > target * 2 + then reduceDifficulty ratelimiter + else if estimate < target + then increaseDifficulty ratelimiter + else return () diff --git a/HTTP/Server.hs b/HTTP/Server.hs index 65d3d32..aab3dab 100644 --- a/HTTP/Server.hs +++ b/HTTP/Server.hs @@ -13,6 +13,7 @@ import HTTP.RateLimit import Types import Types.Storage import Tunables +import CmdLine (ServerConfig(..)) import Storage.Local import Serialization () import Servant @@ -20,6 +21,7 @@ import Network.Wai import Network.Wai.Handler.Warp import Control.Monad.IO.Class import Control.Concurrent +import Control.Concurrent.Thread.Delay import Control.Concurrent.STM import Data.String import qualified Data.ByteString as B @@ -30,20 +32,20 @@ data ServerState = ServerState , rateLimiter :: RateLimiter } -newServerState :: Maybe LocalStorageDirectory -> IO ServerState -newServerState d = ServerState +newServerState :: Maybe LocalStorageDirectory -> ServerConfig -> IO ServerState +newServerState d cfg = ServerState <$> newEmptyTMVarIO <*> pure d - <*> newRateLimiter + <*> newRateLimiter cfg d -runServer :: Maybe LocalStorageDirectory -> String -> Port -> IO () -runServer d bindaddress port = do - st <- newServerState d +runServer :: Maybe LocalStorageDirectory -> ServerConfig -> IO () +runServer d cfg = do + st <- newServerState d cfg _ <- forkIO $ obscurerThread st runSettings settings (app st) where - settings = setHost host $ setPort port $ defaultSettings - host = fromString bindaddress + settings = setHost host $ setPort (serverPort cfg) $ defaultSettings + host = fromString (serverAddress cfg) serverStorage :: ServerState -> Storage serverStorage st = localStorage (storageDir $ storageDirectory st) "server" @@ -100,7 +102,7 @@ obscurerThread :: ServerState -> IO () obscurerThread st = do _ <- obscureShares (serverStorage st) putStrLn "obscured shares" - threadDelay (1000000*60*30) + delay (1000000*60*30) _ <- atomically $ takeTMVar (obscurerRequest st) obscurerThread st -- cgit v1.2.3