diff options
Diffstat (limited to 'Utility/Metered.hs')
-rw-r--r-- | Utility/Metered.hs | 251 |
1 files changed, 172 insertions, 79 deletions
diff --git a/Utility/Metered.hs b/Utility/Metered.hs index ec16e33..a8a7111 100644 --- a/Utility/Metered.hs +++ b/Utility/Metered.hs @@ -1,6 +1,6 @@ {- Metered IO and actions - - - Copyright 2012-2018 Joey Hess <id@joeyh.name> + - Copyright 2012-2021 Joey Hess <id@joeyh.name> - - License: BSD-2-clause -} @@ -9,8 +9,10 @@ module Utility.Metered ( MeterUpdate, + MeterState(..), nullMeterUpdate, combineMeterUpdate, + TotalSize(..), BytesProcessed(..), toBytesProcessed, fromBytesProcessed, @@ -29,10 +31,13 @@ module Utility.Metered ( ProgressParser, commandMeter, commandMeter', + commandMeterExitCode, + commandMeterExitCode', demeterCommand, demeterCommandEnv, avoidProgress, rateLimitMeterUpdate, + bwLimitMeterUpdate, Meter, mkMeter, setMeterTotalSize, @@ -46,6 +51,9 @@ import Common import Utility.Percentage import Utility.DataUnits import Utility.HumanTime +import Utility.SimpleProtocol as Proto +import Utility.ThreadScheduler +import Utility.SafeOutput import qualified Data.ByteString.Lazy as L import qualified Data.ByteString as S @@ -73,7 +81,7 @@ combineMeterUpdate a b = \n -> a n >> b n {- Total number of bytes processed so far. -} newtype BytesProcessed = BytesProcessed Integer - deriving (Eq, Ord, Show) + deriving (Eq, Ord, Show, Read) class AsBytesProcessed a where toBytesProcessed :: a -> BytesProcessed @@ -113,23 +121,24 @@ withMeteredFile :: FilePath -> MeterUpdate -> (L.ByteString -> IO a) -> IO a withMeteredFile f meterupdate a = withBinaryFile f ReadMode $ \h -> hGetContentsMetered h meterupdate >>= a -{- Writes a ByteString to a Handle, updating a meter as it's written. -} -meteredWrite :: MeterUpdate -> Handle -> L.ByteString -> IO () -meteredWrite meterupdate h = void . meteredWrite' meterupdate h +{- Calls the action repeatedly with chunks from the lazy ByteString. + - Updates the meter after each chunk is processed. -} +meteredWrite :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO () +meteredWrite meterupdate a = void . meteredWrite' meterupdate a -meteredWrite' :: MeterUpdate -> Handle -> L.ByteString -> IO BytesProcessed -meteredWrite' meterupdate h = go zeroBytesProcessed . L.toChunks +meteredWrite' :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO BytesProcessed +meteredWrite' meterupdate a = go zeroBytesProcessed . L.toChunks where go sofar [] = return sofar go sofar (c:cs) = do - S.hPut h c + a c let !sofar' = addBytesProcessed sofar $ S.length c meterupdate sofar' go sofar' cs meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO () meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h -> - meteredWrite meterupdate h b + meteredWrite meterupdate (S.hPut h) b {- Applies an offset to a MeterUpdate. This can be useful when - performing a sequence of actions, such as multiple meteredWriteFiles, @@ -165,8 +174,9 @@ hGetMetered h wantsize meterupdate = lazyRead zeroBytesProcessed c <- S.hGet h (nextchunksize (fromBytesProcessed sofar)) if S.null c then do - hClose h - return $ L.empty + when (wantsize /= Just 0) $ + hClose h + return L.empty else do let !sofar' = addBytesProcessed sofar (S.length c) meterupdate sofar' @@ -218,7 +228,8 @@ watchFileSize f p a = bracket p sz watcher sz getsz = catchDefaultIO zeroBytesProcessed $ - toBytesProcessed <$> getFileSize f + toBytesProcessed <$> getFileSize f' + f' = toRawFilePath f data OutputHandler = OutputHandler { quietMode :: Bool @@ -226,31 +237,45 @@ data OutputHandler = OutputHandler } {- Parses the String looking for a command's progress output, and returns - - Maybe the number of bytes done so far, and any any remainder of the - - string that could be an incomplete progress output. That remainder - - should be prepended to future output, and fed back in. This interface - - allows the command's output to be read in any desired size chunk, or - - even one character at a time. + - Maybe the number of bytes done so far, optionally a total size, + - and any any remainder of the string that could be an incomplete + - progress output. That remainder should be prepended to future output, + - and fed back in. This interface allows the command's output to be read + - in any desired size chunk, or even one character at a time. -} -type ProgressParser = String -> (Maybe BytesProcessed, String) +type ProgressParser = String -> (Maybe BytesProcessed, Maybe TotalSize, String) + +newtype TotalSize = TotalSize Integer + deriving (Show, Eq) {- Runs a command and runs a ProgressParser on its output, in order - to update a meter. + - + - If the Meter is provided, the ProgressParser can report the total size, + - which allows creating a Meter before the size is known. -} -commandMeter :: ProgressParser -> OutputHandler -> MeterUpdate -> FilePath -> [CommandParam] -> IO Bool -commandMeter progressparser oh meterupdate cmd params = do - ret <- commandMeter' progressparser oh meterupdate cmd params +commandMeter :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> IO Bool +commandMeter progressparser oh meter meterupdate cmd params = + commandMeter' progressparser oh meter meterupdate cmd params id + +commandMeter' :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> IO Bool +commandMeter' progressparser oh meter meterupdate cmd params mkprocess = do + ret <- commandMeterExitCode' progressparser oh meter meterupdate cmd params mkprocess return $ case ret of Just ExitSuccess -> True _ -> False -commandMeter' :: ProgressParser -> OutputHandler -> MeterUpdate -> FilePath -> [CommandParam] -> IO (Maybe ExitCode) -commandMeter' progressparser oh meterupdate cmd params = - outputFilter cmd params Nothing - (feedprogress zeroBytesProcessed []) +commandMeterExitCode :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> IO (Maybe ExitCode) +commandMeterExitCode progressparser oh meter meterupdate cmd params = + commandMeterExitCode' progressparser oh meter meterupdate cmd params id + +commandMeterExitCode' :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> IO (Maybe ExitCode) +commandMeterExitCode' progressparser oh mmeter meterupdate cmd params mkprocess = + outputFilter cmd params mkprocess Nothing + (const $ feedprogress mmeter zeroBytesProcessed []) handlestderr where - feedprogress prev buf h = do + feedprogress sendtotalsize prev buf h = do b <- S.hGetSome h 80 if S.null b then return () @@ -259,17 +284,24 @@ commandMeter' progressparser oh meterupdate cmd params = S.hPut stdout b hFlush stdout let s = decodeBS b - let (mbytes, buf') = progressparser (buf++s) + let (mbytes, mtotalsize, buf') = progressparser (buf++s) + sendtotalsize' <- case (sendtotalsize, mtotalsize) of + (Just meter, Just t) -> do + setMeterTotalSize meter t + return Nothing + _ -> return sendtotalsize case mbytes of - Nothing -> feedprogress prev buf' h + Nothing -> feedprogress sendtotalsize' prev buf' h (Just bytes) -> do when (bytes /= prev) $ meterupdate bytes - feedprogress bytes buf' h + feedprogress sendtotalsize' bytes buf' h - handlestderr h = unlessM (hIsEOF h) $ do - stderrHandler oh =<< hGetLine h - handlestderr h + handlestderr ph h = hGetLineUntilExitOrEOF ph h >>= \case + Just l -> do + stderrHandler oh l + handlestderr ph h + Nothing -> return () {- Runs a command, that may display one or more progress meters on - either stdout or stderr, and prevents the meters from being displayed. @@ -281,46 +313,54 @@ demeterCommand oh cmd params = demeterCommandEnv oh cmd params Nothing demeterCommandEnv :: OutputHandler -> FilePath -> [CommandParam] -> Maybe [(String, String)] -> IO Bool demeterCommandEnv oh cmd params environ = do - ret <- outputFilter cmd params environ - (\outh -> avoidProgress True outh stdouthandler) - (\errh -> avoidProgress True errh $ stderrHandler oh) + ret <- outputFilter cmd params id environ + (\ph outh -> avoidProgress True ph outh stdouthandler) + (\ph errh -> avoidProgress True ph errh $ stderrHandler oh) return $ case ret of Just ExitSuccess -> True _ -> False where stdouthandler l = unless (quietMode oh) $ - putStrLn l + putStrLn (safeOutput l) {- To suppress progress output, while displaying other messages, - filter out lines that contain \r (typically used to reset to the - beginning of the line when updating a progress display). -} -avoidProgress :: Bool -> Handle -> (String -> IO ()) -> IO () -avoidProgress doavoid h emitter = unlessM (hIsEOF h) $ do - s <- hGetLine h - unless (doavoid && '\r' `elem` s) $ - emitter s - avoidProgress doavoid h emitter +avoidProgress :: Bool -> ProcessHandle -> Handle -> (String -> IO ()) -> IO () +avoidProgress doavoid ph h emitter = hGetLineUntilExitOrEOF ph h >>= \case + Just s -> do + unless (doavoid && '\r' `elem` s) $ + emitter s + avoidProgress doavoid ph h emitter + Nothing -> return () outputFilter :: FilePath -> [CommandParam] + -> (CreateProcess -> CreateProcess) -> Maybe [(String, String)] - -> (Handle -> IO ()) - -> (Handle -> IO ()) + -> (ProcessHandle -> Handle -> IO ()) + -> (ProcessHandle -> Handle -> IO ()) -> IO (Maybe ExitCode) -outputFilter cmd params environ outfilter errfilter = catchMaybeIO $ do - (_, Just outh, Just errh, pid) <- createProcess p - { std_out = CreatePipe +outputFilter cmd params mkprocess environ outfilter errfilter = + catchMaybeIO $ withCreateProcess p go + where + go _ (Just outh) (Just errh) ph = do + outt <- async $ tryIO (outfilter ph outh) >> hClose outh + errt <- async $ tryIO (errfilter ph errh) >> hClose errh + ret <- waitForProcess ph + wait outt + wait errt + return ret + go _ _ _ _ = error "internal" + + p = mkprocess (proc cmd (toCommand params)) + { env = environ + , std_out = CreatePipe , std_err = CreatePipe } - void $ async $ tryIO (outfilter outh) >> hClose outh - void $ async $ tryIO (errfilter errh) >> hClose errh - waitForProcess pid - where - p = (proc cmd (toCommand params)) - { env = environ } -- | Limit a meter to only update once per unit of time. -- @@ -333,7 +373,7 @@ rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do return $ mu lastupdate where mu lastupdate n@(BytesProcessed i) = readMVar totalsizev >>= \case - Just t | i >= t -> meterupdate n + Just (TotalSize t) | i >= t -> meterupdate n _ -> do now <- getPOSIXTime prev <- takeMVar lastupdate @@ -343,46 +383,95 @@ rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do meterupdate n else putMVar lastupdate prev -data Meter = Meter (MVar (Maybe Integer)) (MVar MeterState) (MVar String) DisplayMeter +-- | Bandwidth limiting by inserting a delay at the point that a meter is +-- updated. +-- +-- This will only work when the actions that use bandwidth are run in the +-- same process and thread as the call to the MeterUpdate. +-- +-- For example, if the desired bandwidth is 100kb/s, and over the past +-- 1/10th of a second, 30kb was sent, then the current bandwidth is +-- 300kb/s, 3x as fast as desired. So, after getting the next chunk, +-- pause for twice as long as it took to get it. +bwLimitMeterUpdate :: ByteSize -> Duration -> MeterUpdate -> IO MeterUpdate +bwLimitMeterUpdate bwlimit duration meterupdate + | bwlimit <= 0 = return meterupdate + | otherwise = do + nowtime <- getPOSIXTime + mv <- newMVar (nowtime, Nothing) + return (mu mv) + where + mu mv n@(BytesProcessed i) = do + endtime <- getPOSIXTime + (starttime, mprevi) <- takeMVar mv + + case mprevi of + Just previ -> do + let runtime = endtime - starttime + let currbw = fromIntegral (i - previ) / runtime + let pausescale = if currbw > bwlimit' + then (currbw / bwlimit') - 1 + else 0 + unboundDelay (floor (runtime * pausescale * msecs)) + Nothing -> return () + + meterupdate n -type MeterState = (BytesProcessed, POSIXTime) + nowtime <- getPOSIXTime + putMVar mv (nowtime, Just i) -type DisplayMeter = MVar String -> Maybe Integer -> (BytesProcessed, POSIXTime) -> (BytesProcessed, POSIXTime) -> IO () + bwlimit' = fromIntegral (bwlimit * durationSeconds duration) + msecs = fromIntegral oneSecond -type RenderMeter = Maybe Integer -> (BytesProcessed, POSIXTime) -> (BytesProcessed, POSIXTime) -> String +data Meter = Meter (MVar (Maybe TotalSize)) (MVar MeterState) (MVar String) DisplayMeter + +data MeterState = MeterState + { meterBytesProcessed :: BytesProcessed + , meterTimeStamp :: POSIXTime + } deriving (Show) + +type DisplayMeter = MVar String -> Maybe TotalSize -> MeterState -> MeterState -> IO () + +type RenderMeter = Maybe TotalSize -> MeterState -> MeterState -> String -- | Make a meter. Pass the total size, if it's known. -mkMeter :: Maybe Integer -> DisplayMeter -> IO Meter -mkMeter totalsize displaymeter = Meter - <$> newMVar totalsize - <*> ((\t -> newMVar (zeroBytesProcessed, t)) =<< getPOSIXTime) - <*> newMVar "" - <*> pure displaymeter - -setMeterTotalSize :: Meter -> Integer -> IO () +mkMeter :: Maybe TotalSize -> DisplayMeter -> IO Meter +mkMeter totalsize displaymeter = do + ts <- getPOSIXTime + Meter + <$> newMVar totalsize + <*> newMVar (MeterState zeroBytesProcessed ts) + <*> newMVar "" + <*> pure displaymeter + +setMeterTotalSize :: Meter -> TotalSize -> IO () setMeterTotalSize (Meter totalsizev _ _ _) = void . swapMVar totalsizev . Just -- | Updates the meter, displaying it if necessary. updateMeter :: Meter -> MeterUpdate updateMeter (Meter totalsizev sv bv displaymeter) new = do now <- getPOSIXTime - (old, before) <- swapMVar sv (new, now) - when (old /= new) $ do + let curms = MeterState new now + oldms <- swapMVar sv curms + when (meterBytesProcessed oldms /= new) $ do totalsize <- readMVar totalsizev - displaymeter bv totalsize (old, before) (new, now) + displaymeter bv totalsize oldms curms -- | Display meter to a Handle. displayMeterHandle :: Handle -> RenderMeter -> DisplayMeter displayMeterHandle h rendermeter v msize old new = do + olds <- takeMVar v let s = rendermeter msize old new - olds <- swapMVar v s + let padding = replicate (length olds - length s) ' ' + let s' = s <> padding + putMVar v s' -- Avoid writing when the rendered meter has not changed. - when (olds /= s) $ do - let padding = replicate (length olds - length s) ' ' - hPutStr h ('\r':s ++ padding) + when (olds /= s') $ do + hPutStr h ('\r':s') hFlush h --- | Clear meter displayed by displayMeterHandle. +-- | Clear meter displayed by displayMeterHandle. May be called before +-- outputting something else, followed by more calls to displayMeterHandle. clearMeterHandle :: Meter -> Handle -> IO () clearMeterHandle (Meter _ _ v _) h = do olds <- readMVar v @@ -394,7 +483,7 @@ clearMeterHandle (Meter _ _ v _) h = do -- or when total size is not known: -- 1.3 MiB 300 KiB/s bandwidthMeter :: RenderMeter -bandwidthMeter mtotalsize (BytesProcessed old, before) (BytesProcessed new, now) = +bandwidthMeter mtotalsize (MeterState (BytesProcessed old) before) (MeterState (BytesProcessed new) now) = unwords $ catMaybes [ Just percentamount -- Pad enough for max width: "100% xxxx.xx KiB xxxx KiB/s" @@ -403,22 +492,26 @@ bandwidthMeter mtotalsize (BytesProcessed old, before) (BytesProcessed new, now) , estimatedcompletion ] where - amount = roughSize' memoryUnits True 2 new + amount = roughSize' committeeUnits True 2 new percentamount = case mtotalsize of - Just totalsize -> + Just (TotalSize totalsize) -> let p = showPercentage 0 $ percentage totalsize (min new totalsize) in p ++ replicate (6 - length p) ' ' ++ amount Nothing -> amount - rate = roughSize' memoryUnits True 0 bytespersecond ++ "/s" + rate = roughSize' committeeUnits True 0 bytespersecond ++ "/s" bytespersecond | duration == 0 = fromIntegral transferred | otherwise = floor $ fromIntegral transferred / duration transferred = max 0 (new - old) duration = max 0 (now - before) estimatedcompletion = case mtotalsize of - Just totalsize + Just (TotalSize totalsize) | bytespersecond > 0 -> Just $ fromDuration $ Duration $ (totalsize - new) `div` bytespersecond _ -> Nothing + +instance Proto.Serializable BytesProcessed where + serialize (BytesProcessed n) = show n + deserialize = BytesProcessed <$$> readish |