{- Copyright 2016 Joey Hess - - Licensed under the GNU AGPL version 3 or higher. -} module HTTP.RateLimit where import Types.Cost import HTTP import HTTP.ProofOfWork import HTTP.Logger import Tunables import CmdLine (ServerConfig(..)) import Types.Storage import Storage.Local import Servant import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.TokenBucket import Control.Concurrent.Thread.Delay import qualified Data.BloomFilter.Mutable as BloomFilter import qualified Data.BloomFilter.Hash as BloomFilter import Data.BloomFilter.Easy (suggestSizing) import Control.Monad import Control.Monad.ST import Control.Exception.Lifted (bracket) import System.DiskSpace import Data.Maybe import Data.Word import Control.Monad.IO.Class -- | A rate limiter is a series of buckets. Each bucket has a -- successively more difficult proof of work access requirement. -- -- To guard against DOS attacks that reuse the same proof of work, -- RandomSalt values are used, and bloom filters keep track of -- the ones that have been assigned and used. data RateLimiter = RateLimiter { buckets :: TMVar [Bucket] , unusedBuckets :: TMVar [Bucket] , assignedRandomSalts :: BloomFilter , assignedRandomSaltsOld :: BloomFilter , usedRandomSalts :: BloomFilter , usedRandomSaltsOld :: BloomFilter , numRandomSalts :: TMVar Int , randomSaltGenerationLimiter :: TokenBucket , blockedRequestQueue :: TMVar [()] , requestCounter :: TMVar Integer } type BloomFilter = TMVar (BloomFilter.MBloom RealWorld RandomSalt) -- | Buckets fill up at a fixed rate, and accessing a bucket -- removes one unit from it. data Bucket = Bucket { tokenBucket :: TokenBucket , proofOfWorkRequired :: Seconds , fillInterval :: Word64 } minFillInterval :: Word64 minFillInterval = 2 * 60 * 1000000 -- 1 token every other minute -- | Size of the bucket. This allows a burst of accesses after an idle -- period, which is especially useful when retrieving keys that were -- split into multiple chunks. However, setting this too high lets clients -- cheaply store lots of data on a server that has been idle for a while, -- which could be an attractive way to abuse keysafe servers. burstSize :: Word64 burstSize = 4 -- 256 kb immediate storage newRateLimiter :: ServerConfig -> Maybe LocalStorageDirectory -> Logger -> IO RateLimiter newRateLimiter cfg storedir logger = do rl <- RateLimiter <$> (newTMVarIO =<< mkbuckets (sdiv maxProofOfWork 2) []) <*> newTMVarIO [] <*> mkBloomFilter <*> mkBloomFilter <*> mkBloomFilter <*> mkBloomFilter <*> newTMVarIO 0 <*> newTokenBucket <*> newTMVarIO [] <*> newTMVarIO 0 _ <- forkIO (adjusterThread cfg storedir rl logger) return rl where -- The last bucket takes half of maxProofOfWork to access, and -- each earlier bucket quarters that time, down to the first bucket, -- which needs no proof of work. This ensures that in the edge case -- where a client keeps getting bumped up to more and more expensive -- buckets, it doesn't need to do more than maxProofOfWork total work. mkbuckets s@(Seconds n) bs | n <= 0 = finalbucket bs | otherwise = do case mkProofOfWorkRequirement s of Nothing -> finalbucket bs Just _ -> do b <- Bucket <$> newTokenBucket <*> pure s <*> pure minFillInterval mkbuckets (sdiv s 4) (b:bs) finalbucket bs = do b <- Bucket <$> newTokenBucket <*> pure (Seconds 0) <*> pure minFillInterval return (b:bs) sdiv (Seconds n) d = Seconds (n `div` d) mkBloomFilter :: IO BloomFilter mkBloomFilter = do b <- stToIO $ BloomFilter.new (BloomFilter.cheapHashes bloomhashes) bloomsize newTMVarIO b where -- Size the bloom filter to hold 1 million items, with a false -- positive rate of 1 in 100 thousand. This will use around 32 mb -- of memory. (bloomhashes, bloomsize) = suggestSizing bloomMaxSize (1/100000) -- | Maximum number of RandomSalts that can be stored in a bloom filter -- without the false positive rate getting bad. bloomMaxSize :: Int bloomMaxSize = 1000000 -- A request is tried in each bucket in turn which its proof of work allows -- access to, until one is found that accepts it. rateLimit :: POWIdent p => RateLimiter -> Logger -> Maybe ProofOfWork -> p -> Handler a -> Handler (POWGuarded a) rateLimit ratelimiter logger mpow p a = do validsalt <- liftIO $ checkValidSalt ratelimiter mpow bs <- getBuckets ratelimiter if validsalt then go bs else assignWork ratelimiter logger bs where go [] = allBucketsEmpty ratelimiter logger a go (b:bs) = case mkProofOfWorkRequirement (proofOfWorkRequired b) of Nothing -> checkbucket b bs Just mkreq -> case mpow of Nothing -> assignWork ratelimiter logger (b:bs) Just pow@(ProofOfWork _ salt) -> if isValidProofOfWork pow (mkreq salt) p then checkbucket b bs else assignWork ratelimiter logger (b:bs) checkbucket b bs = do allowed <- liftIO $ tokenBucketTryAlloc (tokenBucket b) burstSize (fillInterval b) 1 if allowed then allowRequest ratelimiter a else go bs checkValidSalt :: RateLimiter -> Maybe ProofOfWork -> IO Bool checkValidSalt _ Nothing = return True checkValidSalt rl (Just (ProofOfWork _ salt)) = do assigned <- iselem assignedRandomSalts oldassigned <- iselem assignedRandomSaltsOld used <- iselem usedRandomSalts oldused <- iselem usedRandomSaltsOld if assigned && not oldassigned && not used && not oldused then do withBloomFilter rl usedRandomSalts (`BloomFilter.insert` salt) return True else return False where iselem f = withBloomFilter rl f (BloomFilter.elem salt) assignWork :: RateLimiter -> Logger -> [Bucket] -> Handler (POWGuarded a) assignWork ratelimiter logger bs = case mapMaybe (mkProofOfWorkRequirement . proofOfWorkRequired) bs of [] -> throwError err404 (mkreq:_) -> liftIO $ do -- This prevents an attacker flooding requests that -- cause new random salts to be assigned, in order -- to fill up the bloom table and cause salts assigned -- to other clients to be rejected. -- Since the bloom filters hold 1 million salts, -- the attacker would need to send requests for over 10 -- hours to force a bloom filter rotation, so would not -- impact many users. tokenBucketWait (randomSaltGenerationLimiter ratelimiter) 100 -- burst 1000000 -- refill 1 token per second salt <- liftIO mkRandomSalt withBloomFilter ratelimiter assignedRandomSalts (`BloomFilter.insert` salt) needrot <- atomically $ do n <- takeTMVar (numRandomSalts ratelimiter) if n > bloomMaxSize `div` 2 then return Nothing else do putTMVar (numRandomSalts ratelimiter) (n+1) return (Just n) handlerotation needrot return $ NeedProofOfWork $ mkreq salt where handlerotation Nothing = return () handlerotation (Just n) = do logStderr logger $ "rotating bloom filters after processing " ++ show n ++ " requests" newassigned <- mkBloomFilter newused <- mkBloomFilter atomically $ do oldassigned <- takeTMVar (assignedRandomSalts ratelimiter) oldused <- takeTMVar (usedRandomSalts ratelimiter) putTMVar (assignedRandomSaltsOld ratelimiter) oldassigned putTMVar (usedRandomSaltsOld ratelimiter) oldused putTMVar (assignedRandomSalts ratelimiter) =<< takeTMVar newassigned putTMVar (usedRandomSalts ratelimiter) =<< takeTMVar newused putTMVar (numRandomSalts ratelimiter) 0 withBloomFilter :: RateLimiter -> (RateLimiter -> BloomFilter) -> (BloomFilter.MBloom RealWorld RandomSalt -> ST RealWorld a) -> IO a withBloomFilter rl field a = do b <- atomically $ readTMVar (field rl) stToIO (a b) getBuckets :: MonadIO m => RateLimiter -> m [Bucket] getBuckets = liftIO . atomically . readTMVar . buckets putBuckets :: MonadIO m => RateLimiter -> [Bucket] -> m () putBuckets rl bs = liftIO $ atomically $ do _ <- takeTMVar (buckets rl) putTMVar (buckets rl) bs -- When all buckets are empty, and the client has provided a good enough -- proof of work to access the last bucket, the request is still processed, -- but blocked until the last token bucket refills. -- -- Only 100 requests are allowed to block this way at a time, since the -- requests are taking up server memory while blocked, and because we don't -- want to stall legitimate clients too long. allBucketsEmpty :: RateLimiter -> Logger -> Handler a -> Handler (POWGuarded a) allBucketsEmpty ratelimiter logger a = bracket (liftIO addq) (liftIO . removeq) go where q = blockedRequestQueue ratelimiter addq = liftIO $ atomically $ do l <- takeTMVar q if length l >= 100 then do putTMVar q l return False else do putTMVar q (():l) return True removeq False = return () removeq True = liftIO $ atomically $ do l <- takeTMVar q putTMVar q (drop 1 l) waitlast = do bs <- getBuckets ratelimiter case reverse bs of (lastb:_) -> do logStderr logger "** warning: All token buckets are empty. Delaying request.." tokenBucketWait (tokenBucket lastb) burstSize (fillInterval lastb) return True [] -> return False go False = giveup go True = do ok <- liftIO waitlast if ok then allowRequest ratelimiter a else giveup giveup = do liftIO $ logStderr logger "** warning: All token buckets are empty and request queue is large; possible DOS attack? Rejected request.." assignWork ratelimiter logger =<< getBuckets ratelimiter -- | How much data could be stored, in bytes per second, assuming all -- buckets in the rate limiter being constantly drained by requests, -- and all requests store objects. maximumStorageRate :: RateLimiter -> IO Integer maximumStorageRate ratelimiter = do bs <- getBuckets ratelimiter return $ sum $ map calc bs where storesize = maximum knownObjectSizes calc b = fromIntegral $ (storesize * 1000000) `div` fromIntegral (fillInterval b) describeRateLimiter :: RateLimiter -> IO String describeRateLimiter ratelimiter = do storerate <- maximumStorageRate ratelimiter bs <- getBuckets ratelimiter return $ concat [ "rate limiter buckets: " ++ show bs , " ; maximum allowed storage rate: " , showBytes (storerate * 60 * 60 * 24 * 31) ++ "/month" ] showBytes :: Integer -> String showBytes n | n <= 1024*1024 = show (n `div` 1024) ++ " KiB" | n <= 1024*1024*1024 = show (n `div` (1024 * 1024)) ++ " MiB" | otherwise = show (n `div` (1024 * 1024 * 1024)) ++ " GiB" instance Show Bucket where show b = show (fillInterval b `div` (60 * 1000000)) ++ " Second/Request" ++ " (PoW=" ++ show (proofOfWorkRequired b) ++ ")" increaseDifficulty :: Logger -> RateLimiter -> IO () increaseDifficulty logger ratelimiter = do bs <- getBuckets ratelimiter case bs of [] -> unable (b:[]) | fillInterval b < maxBound `div` 2 -> do -- Make the remaining bucket take longer to fill. let b' = b { fillInterval = fillInterval b * 2 } putBuckets ratelimiter [b'] done | otherwise -> unable (b:rest) -> do -- Remove less expensive to access buckets, -- so that clients have to do some work. -- This is done first to cut off any freeloaders -- that may be abusing the keysafe server. atomically $ do unused <- takeTMVar (unusedBuckets ratelimiter) putTMVar (unusedBuckets ratelimiter) (b:unused) putBuckets ratelimiter rest done where unable = logStderr logger "Unable to increase difficulty any further!" done = do desc <- describeRateLimiter ratelimiter logStdout logger $ "increased difficulty -- " ++ desc -- Should undo the effect of increaseDifficulty. reduceDifficulty :: Logger -> RateLimiter -> IO () reduceDifficulty logger ratelimiter = do bs <- getBuckets ratelimiter case bs of (b:[]) | fillInterval b > minFillInterval -> do let b' = b { fillInterval = fillInterval b `div` 2 } putBuckets ratelimiter [b'] done _ -> do mb <- getunused case mb of Nothing -> unable Just b -> do putBuckets ratelimiter (b:bs) done where getunused = atomically $ do unused <- takeTMVar (unusedBuckets ratelimiter) case unused of (b:bs) -> do putTMVar (unusedBuckets ratelimiter) bs return (Just b) [] -> do putTMVar (unusedBuckets ratelimiter) [] return Nothing unable = return () done = do desc <- describeRateLimiter ratelimiter logStdout logger $ "reduced difficulty -- " ++ desc allowRequest :: RateLimiter -> Handler a -> Handler (POWGuarded a) allowRequest ratelimiter a = do liftIO $ addRequest ratelimiter 1 Result <$> a addRequest :: RateLimiter -> Integer -> IO () addRequest ratelimiter n = liftIO $ atomically $ do v <- takeTMVar c putTMVar c (v + n) where c = requestCounter ratelimiter -- Thread that wakes up periodically and checks the request rate -- against the available disk space. If the disk is filling too quickly, -- the difficulty is increased. adjusterThread :: ServerConfig -> Maybe LocalStorageDirectory -> RateLimiter -> Logger -> IO () adjusterThread cfg storedir ratelimiter logger = forever $ do delay (1000000 * intervalsecs) checkRequestRate cfg storedir ratelimiter logger intervalsecs where intervalsecs = 60*60 checkRequestRate :: ServerConfig -> Maybe LocalStorageDirectory -> RateLimiter -> Logger -> Integer -> IO () checkRequestRate cfg storedir ratelimiter logger intervalsecs = do let storesize = maximum knownObjectSizes n <- liftIO $ atomically $ swapTMVar (requestCounter ratelimiter) 0 let maxstoredinterval = n * fromIntegral storesize let maxstoredthismonth = maxstoredinterval * (intervalsecs `div` (60*60)) * 24 * 31 freespace <- diskFree <$> localDiskUsage storedir let target = monthsToFillHalfDisk cfg let estimate = if maxstoredthismonth <= 0 then 10000 else freespace `div` maxstoredthismonth `div` 2 logStdout logger $ unlines [ "rate limit check" , " free disk space:" ++ showBytes freespace , " number of requests since last check: " ++ show n , " estimated max incoming data in the next month: " ++ showBytes maxstoredthismonth , " estimate min " ++ show estimate ++ " months to fill half of disk" ] if estimate > target * 2 then reduceDifficulty logger ratelimiter else if estimate < target then increaseDifficulty logger ratelimiter else return ()