summaryrefslogtreecommitdiff
path: root/Utility/Metered.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Utility/Metered.hs')
-rw-r--r--Utility/Metered.hs251
1 files changed, 172 insertions, 79 deletions
diff --git a/Utility/Metered.hs b/Utility/Metered.hs
index ec16e33..a8a7111 100644
--- a/Utility/Metered.hs
+++ b/Utility/Metered.hs
@@ -1,6 +1,6 @@
{- Metered IO and actions
-
- - Copyright 2012-2018 Joey Hess <id@joeyh.name>
+ - Copyright 2012-2021 Joey Hess <id@joeyh.name>
-
- License: BSD-2-clause
-}
@@ -9,8 +9,10 @@
module Utility.Metered (
MeterUpdate,
+ MeterState(..),
nullMeterUpdate,
combineMeterUpdate,
+ TotalSize(..),
BytesProcessed(..),
toBytesProcessed,
fromBytesProcessed,
@@ -29,10 +31,13 @@ module Utility.Metered (
ProgressParser,
commandMeter,
commandMeter',
+ commandMeterExitCode,
+ commandMeterExitCode',
demeterCommand,
demeterCommandEnv,
avoidProgress,
rateLimitMeterUpdate,
+ bwLimitMeterUpdate,
Meter,
mkMeter,
setMeterTotalSize,
@@ -46,6 +51,9 @@ import Common
import Utility.Percentage
import Utility.DataUnits
import Utility.HumanTime
+import Utility.SimpleProtocol as Proto
+import Utility.ThreadScheduler
+import Utility.SafeOutput
import qualified Data.ByteString.Lazy as L
import qualified Data.ByteString as S
@@ -73,7 +81,7 @@ combineMeterUpdate a b = \n -> a n >> b n
{- Total number of bytes processed so far. -}
newtype BytesProcessed = BytesProcessed Integer
- deriving (Eq, Ord, Show)
+ deriving (Eq, Ord, Show, Read)
class AsBytesProcessed a where
toBytesProcessed :: a -> BytesProcessed
@@ -113,23 +121,24 @@ withMeteredFile :: FilePath -> MeterUpdate -> (L.ByteString -> IO a) -> IO a
withMeteredFile f meterupdate a = withBinaryFile f ReadMode $ \h ->
hGetContentsMetered h meterupdate >>= a
-{- Writes a ByteString to a Handle, updating a meter as it's written. -}
-meteredWrite :: MeterUpdate -> Handle -> L.ByteString -> IO ()
-meteredWrite meterupdate h = void . meteredWrite' meterupdate h
+{- Calls the action repeatedly with chunks from the lazy ByteString.
+ - Updates the meter after each chunk is processed. -}
+meteredWrite :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO ()
+meteredWrite meterupdate a = void . meteredWrite' meterupdate a
-meteredWrite' :: MeterUpdate -> Handle -> L.ByteString -> IO BytesProcessed
-meteredWrite' meterupdate h = go zeroBytesProcessed . L.toChunks
+meteredWrite' :: MeterUpdate -> (S.ByteString -> IO ()) -> L.ByteString -> IO BytesProcessed
+meteredWrite' meterupdate a = go zeroBytesProcessed . L.toChunks
where
go sofar [] = return sofar
go sofar (c:cs) = do
- S.hPut h c
+ a c
let !sofar' = addBytesProcessed sofar $ S.length c
meterupdate sofar'
go sofar' cs
meteredWriteFile :: MeterUpdate -> FilePath -> L.ByteString -> IO ()
meteredWriteFile meterupdate f b = withBinaryFile f WriteMode $ \h ->
- meteredWrite meterupdate h b
+ meteredWrite meterupdate (S.hPut h) b
{- Applies an offset to a MeterUpdate. This can be useful when
- performing a sequence of actions, such as multiple meteredWriteFiles,
@@ -165,8 +174,9 @@ hGetMetered h wantsize meterupdate = lazyRead zeroBytesProcessed
c <- S.hGet h (nextchunksize (fromBytesProcessed sofar))
if S.null c
then do
- hClose h
- return $ L.empty
+ when (wantsize /= Just 0) $
+ hClose h
+ return L.empty
else do
let !sofar' = addBytesProcessed sofar (S.length c)
meterupdate sofar'
@@ -218,7 +228,8 @@ watchFileSize f p a = bracket
p sz
watcher sz
getsz = catchDefaultIO zeroBytesProcessed $
- toBytesProcessed <$> getFileSize f
+ toBytesProcessed <$> getFileSize f'
+ f' = toRawFilePath f
data OutputHandler = OutputHandler
{ quietMode :: Bool
@@ -226,31 +237,45 @@ data OutputHandler = OutputHandler
}
{- Parses the String looking for a command's progress output, and returns
- - Maybe the number of bytes done so far, and any any remainder of the
- - string that could be an incomplete progress output. That remainder
- - should be prepended to future output, and fed back in. This interface
- - allows the command's output to be read in any desired size chunk, or
- - even one character at a time.
+ - Maybe the number of bytes done so far, optionally a total size,
+ - and any any remainder of the string that could be an incomplete
+ - progress output. That remainder should be prepended to future output,
+ - and fed back in. This interface allows the command's output to be read
+ - in any desired size chunk, or even one character at a time.
-}
-type ProgressParser = String -> (Maybe BytesProcessed, String)
+type ProgressParser = String -> (Maybe BytesProcessed, Maybe TotalSize, String)
+
+newtype TotalSize = TotalSize Integer
+ deriving (Show, Eq)
{- Runs a command and runs a ProgressParser on its output, in order
- to update a meter.
+ -
+ - If the Meter is provided, the ProgressParser can report the total size,
+ - which allows creating a Meter before the size is known.
-}
-commandMeter :: ProgressParser -> OutputHandler -> MeterUpdate -> FilePath -> [CommandParam] -> IO Bool
-commandMeter progressparser oh meterupdate cmd params = do
- ret <- commandMeter' progressparser oh meterupdate cmd params
+commandMeter :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> IO Bool
+commandMeter progressparser oh meter meterupdate cmd params =
+ commandMeter' progressparser oh meter meterupdate cmd params id
+
+commandMeter' :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> IO Bool
+commandMeter' progressparser oh meter meterupdate cmd params mkprocess = do
+ ret <- commandMeterExitCode' progressparser oh meter meterupdate cmd params mkprocess
return $ case ret of
Just ExitSuccess -> True
_ -> False
-commandMeter' :: ProgressParser -> OutputHandler -> MeterUpdate -> FilePath -> [CommandParam] -> IO (Maybe ExitCode)
-commandMeter' progressparser oh meterupdate cmd params =
- outputFilter cmd params Nothing
- (feedprogress zeroBytesProcessed [])
+commandMeterExitCode :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> IO (Maybe ExitCode)
+commandMeterExitCode progressparser oh meter meterupdate cmd params =
+ commandMeterExitCode' progressparser oh meter meterupdate cmd params id
+
+commandMeterExitCode' :: ProgressParser -> OutputHandler -> Maybe Meter -> MeterUpdate -> FilePath -> [CommandParam] -> (CreateProcess -> CreateProcess) -> IO (Maybe ExitCode)
+commandMeterExitCode' progressparser oh mmeter meterupdate cmd params mkprocess =
+ outputFilter cmd params mkprocess Nothing
+ (const $ feedprogress mmeter zeroBytesProcessed [])
handlestderr
where
- feedprogress prev buf h = do
+ feedprogress sendtotalsize prev buf h = do
b <- S.hGetSome h 80
if S.null b
then return ()
@@ -259,17 +284,24 @@ commandMeter' progressparser oh meterupdate cmd params =
S.hPut stdout b
hFlush stdout
let s = decodeBS b
- let (mbytes, buf') = progressparser (buf++s)
+ let (mbytes, mtotalsize, buf') = progressparser (buf++s)
+ sendtotalsize' <- case (sendtotalsize, mtotalsize) of
+ (Just meter, Just t) -> do
+ setMeterTotalSize meter t
+ return Nothing
+ _ -> return sendtotalsize
case mbytes of
- Nothing -> feedprogress prev buf' h
+ Nothing -> feedprogress sendtotalsize' prev buf' h
(Just bytes) -> do
when (bytes /= prev) $
meterupdate bytes
- feedprogress bytes buf' h
+ feedprogress sendtotalsize' bytes buf' h
- handlestderr h = unlessM (hIsEOF h) $ do
- stderrHandler oh =<< hGetLine h
- handlestderr h
+ handlestderr ph h = hGetLineUntilExitOrEOF ph h >>= \case
+ Just l -> do
+ stderrHandler oh l
+ handlestderr ph h
+ Nothing -> return ()
{- Runs a command, that may display one or more progress meters on
- either stdout or stderr, and prevents the meters from being displayed.
@@ -281,46 +313,54 @@ demeterCommand oh cmd params = demeterCommandEnv oh cmd params Nothing
demeterCommandEnv :: OutputHandler -> FilePath -> [CommandParam] -> Maybe [(String, String)] -> IO Bool
demeterCommandEnv oh cmd params environ = do
- ret <- outputFilter cmd params environ
- (\outh -> avoidProgress True outh stdouthandler)
- (\errh -> avoidProgress True errh $ stderrHandler oh)
+ ret <- outputFilter cmd params id environ
+ (\ph outh -> avoidProgress True ph outh stdouthandler)
+ (\ph errh -> avoidProgress True ph errh $ stderrHandler oh)
return $ case ret of
Just ExitSuccess -> True
_ -> False
where
stdouthandler l =
unless (quietMode oh) $
- putStrLn l
+ putStrLn (safeOutput l)
{- To suppress progress output, while displaying other messages,
- filter out lines that contain \r (typically used to reset to the
- beginning of the line when updating a progress display).
-}
-avoidProgress :: Bool -> Handle -> (String -> IO ()) -> IO ()
-avoidProgress doavoid h emitter = unlessM (hIsEOF h) $ do
- s <- hGetLine h
- unless (doavoid && '\r' `elem` s) $
- emitter s
- avoidProgress doavoid h emitter
+avoidProgress :: Bool -> ProcessHandle -> Handle -> (String -> IO ()) -> IO ()
+avoidProgress doavoid ph h emitter = hGetLineUntilExitOrEOF ph h >>= \case
+ Just s -> do
+ unless (doavoid && '\r' `elem` s) $
+ emitter s
+ avoidProgress doavoid ph h emitter
+ Nothing -> return ()
outputFilter
:: FilePath
-> [CommandParam]
+ -> (CreateProcess -> CreateProcess)
-> Maybe [(String, String)]
- -> (Handle -> IO ())
- -> (Handle -> IO ())
+ -> (ProcessHandle -> Handle -> IO ())
+ -> (ProcessHandle -> Handle -> IO ())
-> IO (Maybe ExitCode)
-outputFilter cmd params environ outfilter errfilter = catchMaybeIO $ do
- (_, Just outh, Just errh, pid) <- createProcess p
- { std_out = CreatePipe
+outputFilter cmd params mkprocess environ outfilter errfilter =
+ catchMaybeIO $ withCreateProcess p go
+ where
+ go _ (Just outh) (Just errh) ph = do
+ outt <- async $ tryIO (outfilter ph outh) >> hClose outh
+ errt <- async $ tryIO (errfilter ph errh) >> hClose errh
+ ret <- waitForProcess ph
+ wait outt
+ wait errt
+ return ret
+ go _ _ _ _ = error "internal"
+
+ p = mkprocess (proc cmd (toCommand params))
+ { env = environ
+ , std_out = CreatePipe
, std_err = CreatePipe
}
- void $ async $ tryIO (outfilter outh) >> hClose outh
- void $ async $ tryIO (errfilter errh) >> hClose errh
- waitForProcess pid
- where
- p = (proc cmd (toCommand params))
- { env = environ }
-- | Limit a meter to only update once per unit of time.
--
@@ -333,7 +373,7 @@ rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do
return $ mu lastupdate
where
mu lastupdate n@(BytesProcessed i) = readMVar totalsizev >>= \case
- Just t | i >= t -> meterupdate n
+ Just (TotalSize t) | i >= t -> meterupdate n
_ -> do
now <- getPOSIXTime
prev <- takeMVar lastupdate
@@ -343,46 +383,95 @@ rateLimitMeterUpdate delta (Meter totalsizev _ _ _) meterupdate = do
meterupdate n
else putMVar lastupdate prev
-data Meter = Meter (MVar (Maybe Integer)) (MVar MeterState) (MVar String) DisplayMeter
+-- | Bandwidth limiting by inserting a delay at the point that a meter is
+-- updated.
+--
+-- This will only work when the actions that use bandwidth are run in the
+-- same process and thread as the call to the MeterUpdate.
+--
+-- For example, if the desired bandwidth is 100kb/s, and over the past
+-- 1/10th of a second, 30kb was sent, then the current bandwidth is
+-- 300kb/s, 3x as fast as desired. So, after getting the next chunk,
+-- pause for twice as long as it took to get it.
+bwLimitMeterUpdate :: ByteSize -> Duration -> MeterUpdate -> IO MeterUpdate
+bwLimitMeterUpdate bwlimit duration meterupdate
+ | bwlimit <= 0 = return meterupdate
+ | otherwise = do
+ nowtime <- getPOSIXTime
+ mv <- newMVar (nowtime, Nothing)
+ return (mu mv)
+ where
+ mu mv n@(BytesProcessed i) = do
+ endtime <- getPOSIXTime
+ (starttime, mprevi) <- takeMVar mv
+
+ case mprevi of
+ Just previ -> do
+ let runtime = endtime - starttime
+ let currbw = fromIntegral (i - previ) / runtime
+ let pausescale = if currbw > bwlimit'
+ then (currbw / bwlimit') - 1
+ else 0
+ unboundDelay (floor (runtime * pausescale * msecs))
+ Nothing -> return ()
+
+ meterupdate n
-type MeterState = (BytesProcessed, POSIXTime)
+ nowtime <- getPOSIXTime
+ putMVar mv (nowtime, Just i)
-type DisplayMeter = MVar String -> Maybe Integer -> (BytesProcessed, POSIXTime) -> (BytesProcessed, POSIXTime) -> IO ()
+ bwlimit' = fromIntegral (bwlimit * durationSeconds duration)
+ msecs = fromIntegral oneSecond
-type RenderMeter = Maybe Integer -> (BytesProcessed, POSIXTime) -> (BytesProcessed, POSIXTime) -> String
+data Meter = Meter (MVar (Maybe TotalSize)) (MVar MeterState) (MVar String) DisplayMeter
+
+data MeterState = MeterState
+ { meterBytesProcessed :: BytesProcessed
+ , meterTimeStamp :: POSIXTime
+ } deriving (Show)
+
+type DisplayMeter = MVar String -> Maybe TotalSize -> MeterState -> MeterState -> IO ()
+
+type RenderMeter = Maybe TotalSize -> MeterState -> MeterState -> String
-- | Make a meter. Pass the total size, if it's known.
-mkMeter :: Maybe Integer -> DisplayMeter -> IO Meter
-mkMeter totalsize displaymeter = Meter
- <$> newMVar totalsize
- <*> ((\t -> newMVar (zeroBytesProcessed, t)) =<< getPOSIXTime)
- <*> newMVar ""
- <*> pure displaymeter
-
-setMeterTotalSize :: Meter -> Integer -> IO ()
+mkMeter :: Maybe TotalSize -> DisplayMeter -> IO Meter
+mkMeter totalsize displaymeter = do
+ ts <- getPOSIXTime
+ Meter
+ <$> newMVar totalsize
+ <*> newMVar (MeterState zeroBytesProcessed ts)
+ <*> newMVar ""
+ <*> pure displaymeter
+
+setMeterTotalSize :: Meter -> TotalSize -> IO ()
setMeterTotalSize (Meter totalsizev _ _ _) = void . swapMVar totalsizev . Just
-- | Updates the meter, displaying it if necessary.
updateMeter :: Meter -> MeterUpdate
updateMeter (Meter totalsizev sv bv displaymeter) new = do
now <- getPOSIXTime
- (old, before) <- swapMVar sv (new, now)
- when (old /= new) $ do
+ let curms = MeterState new now
+ oldms <- swapMVar sv curms
+ when (meterBytesProcessed oldms /= new) $ do
totalsize <- readMVar totalsizev
- displaymeter bv totalsize (old, before) (new, now)
+ displaymeter bv totalsize oldms curms
-- | Display meter to a Handle.
displayMeterHandle :: Handle -> RenderMeter -> DisplayMeter
displayMeterHandle h rendermeter v msize old new = do
+ olds <- takeMVar v
let s = rendermeter msize old new
- olds <- swapMVar v s
+ let padding = replicate (length olds - length s) ' '
+ let s' = s <> padding
+ putMVar v s'
-- Avoid writing when the rendered meter has not changed.
- when (olds /= s) $ do
- let padding = replicate (length olds - length s) ' '
- hPutStr h ('\r':s ++ padding)
+ when (olds /= s') $ do
+ hPutStr h ('\r':s')
hFlush h
--- | Clear meter displayed by displayMeterHandle.
+-- | Clear meter displayed by displayMeterHandle. May be called before
+-- outputting something else, followed by more calls to displayMeterHandle.
clearMeterHandle :: Meter -> Handle -> IO ()
clearMeterHandle (Meter _ _ v _) h = do
olds <- readMVar v
@@ -394,7 +483,7 @@ clearMeterHandle (Meter _ _ v _) h = do
-- or when total size is not known:
-- 1.3 MiB 300 KiB/s
bandwidthMeter :: RenderMeter
-bandwidthMeter mtotalsize (BytesProcessed old, before) (BytesProcessed new, now) =
+bandwidthMeter mtotalsize (MeterState (BytesProcessed old) before) (MeterState (BytesProcessed new) now) =
unwords $ catMaybes
[ Just percentamount
-- Pad enough for max width: "100% xxxx.xx KiB xxxx KiB/s"
@@ -403,22 +492,26 @@ bandwidthMeter mtotalsize (BytesProcessed old, before) (BytesProcessed new, now)
, estimatedcompletion
]
where
- amount = roughSize' memoryUnits True 2 new
+ amount = roughSize' committeeUnits True 2 new
percentamount = case mtotalsize of
- Just totalsize ->
+ Just (TotalSize totalsize) ->
let p = showPercentage 0 $
percentage totalsize (min new totalsize)
in p ++ replicate (6 - length p) ' ' ++ amount
Nothing -> amount
- rate = roughSize' memoryUnits True 0 bytespersecond ++ "/s"
+ rate = roughSize' committeeUnits True 0 bytespersecond ++ "/s"
bytespersecond
| duration == 0 = fromIntegral transferred
| otherwise = floor $ fromIntegral transferred / duration
transferred = max 0 (new - old)
duration = max 0 (now - before)
estimatedcompletion = case mtotalsize of
- Just totalsize
+ Just (TotalSize totalsize)
| bytespersecond > 0 ->
Just $ fromDuration $ Duration $
(totalsize - new) `div` bytespersecond
_ -> Nothing
+
+instance Proto.Serializable BytesProcessed where
+ serialize (BytesProcessed n) = show n
+ deserialize = BytesProcessed <$$> readish