From 9ece23c48f4162139ee2ffcf868390cf633ba2c4 Mon Sep 17 00:00:00 2001 From: Joey Hess Date: Tue, 13 Sep 2016 19:50:21 -0400 Subject: made fallback request queue fair Once on the queue, requests should not need to contend with other requests that are not on the queue, so added a fallback request bucket. tokenBucketWait is not fair, so ensure FIFO processing of the queue by using a FairRWLock. --- HTTP/RateLimit.hs | 91 +++++++++++++++++++++++++++++++++++-------------------- keysafe.cabal | 1 + 2 files changed, 59 insertions(+), 33 deletions(-) diff --git a/HTTP/RateLimit.hs b/HTTP/RateLimit.hs index e09da6e..194e798 100644 --- a/HTTP/RateLimit.hs +++ b/HTTP/RateLimit.hs @@ -17,6 +17,7 @@ import Servant import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.TokenBucket +import qualified Control.Concurrent.FairRWLock as FairRWLock import Control.Concurrent.Thread.Delay import qualified Data.BloomFilter.Mutable as BloomFilter import qualified Data.BloomFilter.Hash as BloomFilter @@ -38,13 +39,13 @@ import Control.Monad.IO.Class data RateLimiter = RateLimiter { buckets :: TMVar [Bucket] , unusedBuckets :: TMVar [Bucket] + , fallbackQueue :: FallbackQueue , assignedRandomSalts :: BloomFilter , assignedRandomSaltsOld :: BloomFilter , usedRandomSalts :: BloomFilter , usedRandomSaltsOld :: BloomFilter , numRandomSalts :: TMVar Int , randomSaltGenerationLimiter :: TokenBucket - , blockedRequestQueue :: TMVar [()] , requestCounter :: TMVar Integer } @@ -74,13 +75,13 @@ newRateLimiter cfg storedir logger = do rl <- RateLimiter <$> (newTMVarIO =<< mkbuckets (sdiv maxProofOfWork 2) []) <*> newTMVarIO [] + <*> newFallbackQueue <*> mkBloomFilter <*> mkBloomFilter <*> mkBloomFilter <*> mkBloomFilter <*> newTMVarIO 0 <*> newTokenBucket - <*> newTMVarIO [] <*> newTMVarIO 0 _ <- forkIO (adjusterThread cfg storedir rl logger) return rl @@ -107,7 +108,7 @@ newRateLimiter cfg storedir logger = do <*> pure (Seconds 0) <*> pure minFillInterval return (b:bs) - + sdiv (Seconds n) d = Seconds (n `div` d) mkBloomFilter :: IO BloomFilter @@ -135,7 +136,7 @@ rateLimit ratelimiter logger mpow p a = do then go bs else assignWork ratelimiter logger bs where - go [] = allBucketsEmpty ratelimiter logger a + go [] = fallback ratelimiter logger a go (b:bs) = case mkProofOfWorkRequirement (proofOfWorkRequired b) of Nothing -> checkbucket b bs Just mkreq -> case mpow of @@ -227,49 +228,69 @@ putBuckets rl bs = liftIO $ atomically $ do _ <- takeTMVar (buckets rl) putTMVar (buckets rl) bs --- When all buckets are empty, and the client has provided a good enough --- proof of work to access the last bucket, the request is still processed, --- but blocked until the last token bucket refills. +-- The fallback queue is used when a client has provided a good enough +-- proof of work to access all buckets, but all are empty. +-- +-- Only a limited number of requests can be in the queue, since they take +-- up server memory while blocked, and since too large a queue would stall +-- requests for too long. -- --- Only 100 requests are allowed to block this way at a time, since the --- requests are taking up server memory while blocked, and because we don't --- want to stall legitimate clients too long. -allBucketsEmpty :: RateLimiter -> Logger -> Handler a -> Handler (POWGuarded a) -allBucketsEmpty ratelimiter logger a = +-- Once in the queue, requests are run in FIFO order. +-- +-- A separate bucket is used to rate limit requests in the fallback queue, +-- so requests in the queue do not need to contend with requests not in the +-- queue. +data FallbackQueue = FallbackQueue + { fallbackBucket :: TokenBucket + , blockedRequestLock :: FairRWLock.RWLock + , fallbackQueueSlots :: TMVar Int + } + +newFallbackQueue :: IO FallbackQueue +newFallbackQueue = FallbackQueue + <$> newTokenBucket + <*> FairRWLock.new + <*> newTMVarIO 100 + +fallback :: RateLimiter -> Logger -> Handler a -> Handler (POWGuarded a) +fallback ratelimiter logger a = bracket (liftIO addq) (liftIO . removeq) go where - q = blockedRequestQueue ratelimiter + q = fallbackQueueSlots (fallbackQueue ratelimiter) addq = liftIO $ atomically $ do - l <- takeTMVar q - if length l >= 100 + n <- takeTMVar q + if n <= 0 then do - putTMVar q l + putTMVar q n return False else do - putTMVar q (():l) + putTMVar q (n-1) return True removeq False = return () removeq True = liftIO $ atomically $ do - l <- takeTMVar q - putTMVar q (drop 1 l) + n <- takeTMVar q + putTMVar q (n+1) - waitlast = do - bs <- getBuckets ratelimiter - case reverse bs of - (lastb:_) -> do - logStderr logger "** warning: All token buckets are empty. Delaying request.." - tokenBucketWait (tokenBucket lastb) burstSize (fillInterval lastb) - return True - [] -> return False - + -- tokenBucketWait is not fair, so use the blockedRequestLock + -- to get fair FIFO ordering. + waitbucket = do + logStderr logger "** warning: All token buckets are empty. Delaying request.." + FairRWLock.withWrite (blockedRequestLock (fallbackQueue ratelimiter)) $ do + -- For simplicity, use the same fillInterval as the + -- last bucket in the rate limiter for the fallback + -- bucket. + bs <- getBuckets ratelimiter + case reverse bs of + (lastb:_) -> tokenBucketWait + (fallbackBucket (fallbackQueue ratelimiter)) + burstSize (fillInterval lastb) + [] -> return () go False = giveup go True = do - ok <- liftIO waitlast - if ok - then allowRequest ratelimiter a - else giveup + liftIO waitbucket + allowRequest ratelimiter a giveup = do liftIO $ logStderr logger "** warning: All token buckets are empty and request queue is large; possible DOS attack? Rejected request.." @@ -281,7 +302,11 @@ allBucketsEmpty ratelimiter logger a = maximumStorageRate :: RateLimiter -> IO Integer maximumStorageRate ratelimiter = do bs <- getBuckets ratelimiter - return $ sum $ map calc bs + -- The last bucket is counted a second time, because the fallback + -- request queue has its own bucket with the same characteristics + -- as this bucket. + let fallbackb = take 1 (reverse bs) + return $ sum $ map calc (bs ++ fallbackb) where storesize = maximum knownObjectSizes calc b = fromIntegral $ diff --git a/keysafe.cabal b/keysafe.cabal index e34d334..0d256bc 100644 --- a/keysafe.cabal +++ b/keysafe.cabal @@ -66,6 +66,7 @@ Executable keysafe , lifted-base == 0.2.* , unbounded-delays == 0.1.* , fast-logger == 2.4.* + , SafeSemaphore == 0.10.* -- Temporarily inlined due to FTBFS bug -- https://github.com/ocharles/argon2/issues/2 -- argon2 == 1.1.* -- cgit v1.2.3