diff options
Diffstat (limited to 'HTTP/RateLimit.hs')
-rw-r--r-- | HTTP/RateLimit.hs | 419 |
1 files changed, 419 insertions, 0 deletions
diff --git a/HTTP/RateLimit.hs b/HTTP/RateLimit.hs new file mode 100644 index 0000000..591c918 --- /dev/null +++ b/HTTP/RateLimit.hs @@ -0,0 +1,419 @@ +{- Copyright 2016 Joey Hess <id@joeyh.name> + - + - Licensed under the GNU AGPL version 3 or higher. + -} + +module HTTP.RateLimit where + +import Types.Cost +import HTTP +import HTTP.ProofOfWork +import HTTP.Logger +import Tunables +import CmdLine (ServerConfig(..)) +import Types.Storage +import Storage.Local +import Servant +import Control.Concurrent +import Control.Concurrent.STM +import Control.Concurrent.TokenBucket +import qualified Control.Concurrent.FairRWLock as FairRWLock +import Control.Concurrent.Thread.Delay +import qualified Data.BloomFilter.Mutable as BloomFilter +import qualified Data.BloomFilter.Hash as BloomFilter +import Data.BloomFilter.Easy (suggestSizing) +import Control.Monad +import Control.Monad.ST +import Control.Exception.Lifted (bracket) +import System.DiskSpace +import Data.Maybe +import Data.Word +import Control.Monad.IO.Class + +-- | A rate limiter is a series of buckets. Each bucket has a +-- successively more difficult proof of work access requirement. +-- +-- To guard against DOS attacks that reuse the same proof of work, +-- bloom filters keep track of RequestIDs that have been used before. +data RateLimiter = RateLimiter + { buckets :: TMVar [Bucket] + , unusedBuckets :: TMVar [Bucket] + , fallbackQueue :: FallbackQueue + , usedRequestIDs :: BloomFilter + , usedRequestIDsOld :: BloomFilter + , numUsedRequestIDs :: TMVar Int + , requestIDSecret :: RequestIDSecret + , requestCounter :: TMVar Integer + } + +type BloomFilter = TMVar (BloomFilter.MBloom RealWorld RequestID) + +-- | Buckets fill up at a fixed rate, and accessing a bucket +-- removes one unit from it. +data Bucket = Bucket + { tokenBucket :: TokenBucket + , proofOfWorkRequired :: Seconds + , fillInterval :: Word64 + } + +minFillInterval :: Word64 +minFillInterval = 2 * 60 * 1000000 -- 1 token every other minute + +-- | Size of the bucket. This allows a burst of accesses after an idle +-- period, which is especially useful when retrieving keys that were +-- split into multiple chunks. However, setting this too high lets clients +-- cheaply store lots of data on a server that has been idle for a while, +-- which could be an attractive way to abuse keysafe servers. +burstSize :: Word64 +burstSize = 4 -- 256 kb immediate storage + +newRateLimiter :: ServerConfig -> Maybe LocalStorageDirectory -> Logger -> IO RateLimiter +newRateLimiter cfg storedir logger = do + rl <- RateLimiter + <$> (newTMVarIO =<< mkbuckets (sdiv maxProofOfWork 2) []) + <*> newTMVarIO [] + <*> newFallbackQueue + <*> mkBloomFilter + <*> mkBloomFilter + <*> newTMVarIO 0 + <*> newRequestIDSecret + <*> newTMVarIO 0 + _ <- forkIO (adjusterThread cfg storedir rl logger) + return rl + where + -- The last bucket takes half of maxProofOfWork to access, and + -- each earlier bucket quarters that time, down to the first bucket, + -- which needs no proof of work. This ensures that in the edge case + -- where a client keeps getting bumped up to more and more expensive + -- buckets, it doesn't need to do more than maxProofOfWork total work. + mkbuckets s@(Seconds n) bs + | n <= 0 = finalbucket bs + | otherwise = do + case mkProofOfWorkRequirement s of + Nothing -> finalbucket bs + Just _ -> do + b <- Bucket + <$> newTokenBucket + <*> pure s + <*> pure minFillInterval + mkbuckets (sdiv s 4) (b:bs) + finalbucket bs = do + b <- Bucket + <$> newTokenBucket + <*> pure (Seconds 0) + <*> pure minFillInterval + return (b:bs) + + sdiv (Seconds n) d = Seconds (n / d) + +mkBloomFilter :: IO BloomFilter +mkBloomFilter = do + b <- stToIO $ BloomFilter.new (BloomFilter.cheapHashes bloomhashes) bloomsize + newTMVarIO b + where + -- Size the bloom filter to hold 1 million items, with a false + -- positive rate of 1 in 100 thousand. This will use around 32 mb + -- of memory. + (bloomsize, bloomhashes) = suggestSizing bloomMaxSize (1/100000) + +-- | Maximum number of RequestIDs that can be stored in a bloom filter +-- without the false positive rate getting bad. +bloomMaxSize :: Int +bloomMaxSize = 1000000 + +-- A request is tried in each bucket in turn which its proof of work allows +-- access to, until one is found that accepts it. +rateLimit :: POWIdent p => RateLimiter -> Logger -> Maybe ProofOfWork -> p -> Handler a -> Handler (POWGuarded a) +rateLimit ratelimiter logger mpow p a = do + bs <- getBuckets ratelimiter + validrequest <- liftIO $ checkValidRequestID ratelimiter logger mpow + if validrequest + then go bs + else assignWork ratelimiter bs + where + go [] = fallback ratelimiter logger a + go (b:bs) = case mkProofOfWorkRequirement (proofOfWorkRequired b) of + Nothing -> checkbucket b bs + Just mkreq -> case mpow of + Nothing -> assignWork ratelimiter (b:bs) + Just pow@(ProofOfWork _ rid) -> + if isValidProofOfWork pow (mkreq rid) p + then checkbucket b bs + else assignWork ratelimiter (b:bs) + checkbucket b bs = do + allowed <- liftIO $ tokenBucketTryAlloc (tokenBucket b) + burstSize (fillInterval b) 1 + if allowed + then allowRequest ratelimiter a + else go bs + +checkValidRequestID :: RateLimiter -> Logger -> Maybe ProofOfWork -> IO Bool +checkValidRequestID _ _ Nothing = return True +checkValidRequestID rl logger (Just (ProofOfWork _ rid)) + | validRequestID (requestIDSecret rl) rid = do + used <- iselem usedRequestIDs + oldused <- iselem usedRequestIDsOld + if not used && not oldused + then do + withBloomFilter rl usedRequestIDs + (`BloomFilter.insert` rid) + checkbloomsize + return True + else return False + | otherwise = return False + where + iselem f = withBloomFilter rl f (BloomFilter.elem rid) + + checkbloomsize = do + needrot <- atomically $ do + n <- takeTMVar (numUsedRequestIDs rl) + if n > bloomMaxSize `div` 2 + then return (Just n) + else do + putTMVar (numUsedRequestIDs rl) (n+1) + return Nothing + handlerotation needrot + + handlerotation Nothing = return () + handlerotation (Just n) = do + logStderr logger $ "rotating bloom filters after processing " ++ show n ++ " requests" + newused <- mkBloomFilter + atomically $ do + oldused <- takeTMVar (usedRequestIDs rl) + putTMVar (usedRequestIDsOld rl) oldused + putTMVar (usedRequestIDs rl) =<< takeTMVar newused + putTMVar (numUsedRequestIDs rl) 0 + +assignWork :: RateLimiter -> [Bucket] -> Handler (POWGuarded a) +assignWork ratelimiter bs = + case mapMaybe (mkProofOfWorkRequirement . proofOfWorkRequired) bs of + [] -> throwError err404 + (mkreq:_) -> do + rid <- liftIO $ mkRequestID $ requestIDSecret ratelimiter + return $ NeedProofOfWork $ mkreq rid + +withBloomFilter + :: RateLimiter + -> (RateLimiter -> BloomFilter) + -> (BloomFilter.MBloom RealWorld RequestID -> ST RealWorld a) + -> IO a +withBloomFilter rl field a = do + b <- atomically $ readTMVar (field rl) + stToIO (a b) + +getBuckets :: MonadIO m => RateLimiter -> m [Bucket] +getBuckets = liftIO . atomically . readTMVar . buckets + +putBuckets :: MonadIO m => RateLimiter -> [Bucket] -> m () +putBuckets rl bs = liftIO $ atomically $ do + _ <- takeTMVar (buckets rl) + putTMVar (buckets rl) bs + +-- The fallback queue is used when a client has provided a good enough +-- proof of work to access all buckets, but all are empty. +-- +-- Only a limited number of requests can be in the queue, since they take +-- up server memory while blocked, and since too large a queue would stall +-- requests for too long. +-- +-- Once in the queue, requests are run in FIFO order. +-- +-- A separate bucket is used to rate limit requests in the fallback queue, +-- so requests in the queue do not need to contend with requests not in the +-- queue. +data FallbackQueue = FallbackQueue + { fallbackBucket :: TokenBucket + , blockedRequestLock :: FairRWLock.RWLock + , fallbackQueueSlots :: TMVar Int + } + +newFallbackQueue :: IO FallbackQueue +newFallbackQueue = FallbackQueue + <$> newTokenBucket + <*> FairRWLock.new + <*> newTMVarIO 100 + +fallback :: RateLimiter -> Logger -> Handler a -> Handler (POWGuarded a) +fallback ratelimiter logger a = + bracket (liftIO addq) (liftIO . removeq) go + where + q = fallbackQueueSlots (fallbackQueue ratelimiter) + + addq = liftIO $ atomically $ do + n <- takeTMVar q + if n <= 0 + then do + putTMVar q n + return False + else do + putTMVar q (n-1) + return True + + removeq False = return () + removeq True = liftIO $ atomically $ do + n <- takeTMVar q + putTMVar q (n+1) + + -- tokenBucketWait is not fair, so use the blockedRequestLock + -- to get fair FIFO ordering. + waitbucket = do + logStderr logger "** warning: All token buckets are empty. Delaying request.." + FairRWLock.withWrite (blockedRequestLock (fallbackQueue ratelimiter)) $ do + -- For simplicity, use the same fillInterval as the + -- last bucket in the rate limiter for the fallback + -- bucket. + bs <- getBuckets ratelimiter + case reverse bs of + (lastb:_) -> tokenBucketWait + (fallbackBucket (fallbackQueue ratelimiter)) + burstSize (fillInterval lastb) + [] -> return () + go False = giveup + go True = do + liftIO waitbucket + allowRequest ratelimiter a + + giveup = do + liftIO $ logStderr logger "** warning: All token buckets are empty and request queue is large; possible DOS attack? Rejected request.." + assignWork ratelimiter =<< getBuckets ratelimiter + +-- | How much data could be stored, in bytes per second, assuming all +-- buckets in the rate limiter being constantly drained by requests, +-- and all requests store objects. +maximumStorageRate :: RateLimiter -> IO Integer +maximumStorageRate ratelimiter = do + bs <- getBuckets ratelimiter + -- The last bucket is counted a second time, because the fallback + -- request queue has its own bucket with the same characteristics + -- as this bucket. + let fallbackb = take 1 (reverse bs) + return $ sum $ map calc (bs ++ fallbackb) + where + storesize = maximum knownObjectSizes + calc b = fromIntegral $ + (storesize * 1000000) `div` fromIntegral (fillInterval b) + +describeRateLimiter :: RateLimiter -> IO String +describeRateLimiter ratelimiter = do + storerate <- maximumStorageRate ratelimiter + bs <- getBuckets ratelimiter + return $ concat + [ "rate limiter buckets: " ++ show bs + , " ; maximum allowed storage rate: " + , showBytes (storerate * 60 * 60 * 24 * 31) ++ "/month" + ] + +showBytes :: Integer -> String +showBytes n + | n <= 1024*1024 = show (n `div` 1024) ++ " KiB" + | n <= 1024*1024*1024 = show (n `div` (1024 * 1024)) ++ " MiB" + | otherwise = show (n `div` (1024 * 1024 * 1024)) ++ " GiB" + +instance Show Bucket where + show b = show (fillInterval b `div` (60 * 1000000)) ++ " Second/Request" + ++ " (PoW=" ++ show (proofOfWorkRequired b) ++ ")" + +increaseDifficulty :: Logger -> RateLimiter -> IO () +increaseDifficulty logger ratelimiter = do + bs <- getBuckets ratelimiter + case bs of + [] -> unable + (b:[]) + | fillInterval b < maxBound `div` 2 -> do + -- Make the remaining bucket take longer to fill. + let b' = b { fillInterval = fillInterval b * 2 } + putBuckets ratelimiter [b'] + done + | otherwise -> unable + (b:rest) -> do + -- Remove less expensive to access buckets, + -- so that clients have to do some work. + -- This is done first to cut off any freeloaders + -- that may be abusing the keysafe server. + atomically $ do + unused <- takeTMVar (unusedBuckets ratelimiter) + putTMVar (unusedBuckets ratelimiter) (b:unused) + putBuckets ratelimiter rest + done + where + unable = logStderr logger "Unable to increase difficulty any further!" + done = do + desc <- describeRateLimiter ratelimiter + logStdout logger $ "increased difficulty -- " ++ desc + +-- Should undo the effect of increaseDifficulty. +reduceDifficulty :: Logger -> RateLimiter -> IO () +reduceDifficulty logger ratelimiter = do + bs <- getBuckets ratelimiter + case bs of + (b:[]) | fillInterval b > minFillInterval -> do + let b' = b { fillInterval = fillInterval b `div` 2 } + putBuckets ratelimiter [b'] + done + _ -> do + mb <- getunused + case mb of + Nothing -> unable + Just b -> do + putBuckets ratelimiter (b:bs) + done + where + getunused = atomically $ do + unused <- takeTMVar (unusedBuckets ratelimiter) + case unused of + (b:bs) -> do + putTMVar (unusedBuckets ratelimiter) bs + return (Just b) + [] -> do + putTMVar (unusedBuckets ratelimiter) [] + return Nothing + unable = return () + done = do + desc <- describeRateLimiter ratelimiter + logStdout logger $ "reduced difficulty -- " ++ desc + +allowRequest :: RateLimiter -> Handler a -> Handler (POWGuarded a) +allowRequest ratelimiter a = do + liftIO $ addRequest ratelimiter 1 + Result <$> a + +addRequest :: RateLimiter -> Integer -> IO () +addRequest ratelimiter n = liftIO $ atomically $ do + v <- takeTMVar c + putTMVar c (v + n) + where + c = requestCounter ratelimiter + +-- Thread that wakes up periodically and checks the request rate +-- against the available disk space. If the disk is filling too quickly, +-- the difficulty is increased. +adjusterThread :: ServerConfig -> Maybe LocalStorageDirectory -> RateLimiter -> Logger -> IO () +adjusterThread cfg storedir ratelimiter logger = forever $ do + delay (1000000 * intervalsecs) + checkRequestRate cfg storedir ratelimiter logger intervalsecs + where + intervalsecs = 60*15 + +checkRequestRate :: ServerConfig -> Maybe LocalStorageDirectory -> RateLimiter -> Logger -> Integer -> IO () +checkRequestRate cfg storedir ratelimiter logger intervalsecs = do + let storesize = maximum knownObjectSizes + n <- liftIO $ atomically $ swapTMVar (requestCounter ratelimiter) 0 + let maxstoredinterval = n * fromIntegral storesize + let maxstoredthismonth = maxstoredinterval * (intervalsecs * 24*31 `div` (60*60)) + freespace <- diskFree <$> localDiskUsage storedir + let target = monthsToFillHalfDisk cfg + let estimate = if maxstoredthismonth <= 0 + then 10000 + else freespace `div` maxstoredthismonth `div` 2 + logStdout logger $ unlines + [ "rate limit check" + , " free disk space: " ++ showBytes freespace + , " number of requests since last check: " ++ show n + , " estimated max incoming data in the next month: " ++ showBytes maxstoredthismonth + , " estimate min " ++ show estimate ++ " months to fill half of disk" + ] + if estimate > target * 2 + then reduceDifficulty logger ratelimiter + else if estimate < target + then increaseDifficulty logger ratelimiter + else return () |