1 | {-# LANGUAGE BangPatterns #-} |
2 | {-# LANGUAGE CPP #-} |
3 | {-# LANGUAGE GADTs #-} |
4 | {-# LANGUAGE LambdaCase #-} |
5 | {-# LANGUAGE RankNTypes #-} |
6 | {-# LANGUAGE ViewPatterns #-} |
7 | |
8 | -- | |
9 | -- Module : Streaming.ByteString |
10 | -- Copyright : (c) Don Stewart 2006 |
11 | -- (c) Duncan Coutts 2006-2011 |
12 | -- (c) Michael Thompson 2015 |
13 | -- License : BSD-style |
14 | -- |
15 | -- Maintainer : what_is_it_to_do_anything@yahoo.com |
16 | -- Stability : experimental |
17 | -- Portability : portable |
18 | -- |
19 | -- See the simple examples of use <https://gist.github.com/michaelt/6c6843e6dd8030e95d58 here> |
20 | -- and the @ghci@ examples especially in "Streaming.ByteString.Char8". |
21 | -- We begin with a slight modification of the documentation to "Data.ByteString.Lazy": |
22 | -- |
23 | -- A time and space-efficient implementation of effectful byte streams using a |
24 | -- stream of packed 'Word8' arrays, suitable for high performance use, both in |
25 | -- terms of large data quantities, or high speed requirements. Streaming |
26 | -- ByteStrings are encoded as streams of strict chunks of bytes. |
27 | -- |
28 | -- A key feature of streaming ByteStrings is the means to manipulate large or |
29 | -- unbounded streams of data without requiring the entire sequence to be |
30 | -- resident in memory. To take advantage of this you have to write your |
31 | -- functions in a streaming style, e.g. classic pipeline composition. The |
32 | -- default I\/O chunk size is 32k, which should be good in most circumstances. |
33 | -- |
34 | -- Some operations, such as 'concat', 'append', and 'cons', have better |
35 | -- complexity than their "Data.ByteString" equivalents, due to optimisations |
36 | -- resulting from the list spine structure. For other operations streaming, like |
37 | -- lazy, ByteStrings are usually within a few percent of strict ones. |
38 | -- |
39 | -- This module is intended to be imported @qualified@, to avoid name clashes |
40 | -- with "Prelude" functions. eg. |
41 | -- |
42 | -- > import qualified Streaming.ByteString as Q |
43 | -- |
44 | -- Original GHC implementation by Bryan O\'Sullivan. Rewritten to use |
45 | -- 'Data.Array.Unboxed.UArray' by Simon Marlow. Rewritten to support slices and |
46 | -- use 'Foreign.ForeignPtr.ForeignPtr' by David Roundy. Rewritten again and |
47 | -- extended by Don Stewart and Duncan Coutts. Lazy variant by Duncan Coutts and |
48 | -- Don Stewart. Streaming variant by Michael Thompson, following the ideas of |
49 | -- Gabriel Gonzales' pipes-bytestring. |
50 | module Streaming.ByteString |
51 | ( -- * The @ByteStream@ type |
52 | ByteStream |
53 | , ByteString |
54 | |
55 | -- * Introducing and eliminating 'ByteStream's |
56 | , empty |
57 | , singleton |
58 | , pack |
59 | , unpack |
60 | , fromLazy |
61 | , toLazy |
62 | , toLazy_ |
63 | , fromChunks |
64 | , toChunks |
65 | , fromStrict |
66 | , toStrict |
67 | , toStrict_ |
68 | , effects |
69 | , copy |
70 | , drained |
71 | , mwrap |
72 | |
73 | -- * Transforming ByteStreams |
74 | , map |
75 | , for |
76 | , intercalate |
77 | , intersperse |
78 | |
79 | -- * Basic interface |
80 | , cons |
81 | , cons' |
82 | , snoc |
83 | , append |
84 | , filter |
85 | , uncons |
86 | , nextByte |
87 | |
88 | -- * Substrings |
89 | -- ** Breaking strings |
90 | , break |
91 | , drop |
92 | , dropWhile |
93 | , group |
94 | , groupBy |
95 | , span |
96 | , splitAt |
97 | , splitWith |
98 | , take |
99 | , takeWhile |
100 | |
101 | -- ** Breaking into many substrings |
102 | , split |
103 | |
104 | -- ** Special folds |
105 | , concat |
106 | , denull |
107 | |
108 | -- * Builders |
109 | , toStreamingByteString |
110 | |
111 | , toStreamingByteStringWith |
112 | |
113 | , toBuilder |
114 | , concatBuilders |
115 | |
116 | -- * Building ByteStreams |
117 | -- ** Infinite ByteStreams |
118 | , repeat |
119 | , iterate |
120 | , cycle |
121 | |
122 | -- ** Unfolding ByteStreams |
123 | , unfoldM |
124 | , unfoldr |
125 | , reread |
126 | |
127 | -- * Folds, including support for `Control.Foldl` |
128 | , foldr |
129 | , fold |
130 | , fold_ |
131 | , head |
132 | , head_ |
133 | , last |
134 | , last_ |
135 | , length |
136 | , length_ |
137 | , null |
138 | , null_ |
139 | , nulls |
140 | , testNull |
141 | , count |
142 | , count_ |
143 | |
144 | -- * I\/O with 'ByteStream's |
145 | -- ** Standard input and output |
146 | , getContents |
147 | , stdin |
148 | , stdout |
149 | , interact |
150 | |
151 | -- ** Files |
152 | , readFile |
153 | , writeFile |
154 | , appendFile |
155 | |
156 | -- ** I\/O with Handles |
157 | , fromHandle |
158 | , toHandle |
159 | , hGet |
160 | , hGetContents |
161 | , hGetContentsN |
162 | , hGetN |
163 | , hGetNonBlocking |
164 | , hGetNonBlockingN |
165 | , hPut |
166 | -- , hPutNonBlocking |
167 | |
168 | -- * Simple chunkwise operations |
169 | , unconsChunk |
170 | , nextChunk |
171 | , chunk |
172 | , foldrChunks |
173 | , foldlChunks |
174 | , chunkFold |
175 | , chunkFoldM |
176 | , chunkMap |
177 | , chunkMapM |
178 | , chunkMapM_ |
179 | |
180 | -- * Etc. |
181 | , dematerialize |
182 | , materialize |
183 | , distribute |
184 | , zipWithStream |
185 | ) where |
186 | |
187 | import Prelude hiding |
188 | (all, any, appendFile, break, concat, concatMap, cycle, drop, dropWhile, |
189 | elem, filter, foldl, foldl1, foldr, foldr1, getContents, getLine, head, |
190 | init, interact, iterate, last, length, lines, map, maximum, minimum, |
191 | notElem, null, putStr, putStrLn, readFile, repeat, replicate, reverse, |
192 | scanl, scanl1, scanr, scanr1, span, splitAt, tail, take, takeWhile, |
193 | unlines, unzip, writeFile, zip, zipWith) |
194 | |
195 | import qualified Data.ByteString as P (ByteString) |
196 | import qualified Data.ByteString as B |
197 | import Data.ByteString.Builder.Internal hiding |
198 | (append, defaultChunkSize, empty, hPut) |
199 | import qualified Data.ByteString.Internal as B |
200 | import qualified Data.ByteString.Lazy.Internal as BI |
201 | import qualified Data.ByteString.Unsafe as B |
202 | |
203 | import Streaming hiding (concats, distribute, unfold) |
204 | import Streaming.ByteString.Internal |
205 | import Streaming.Internal (Stream(..)) |
206 | import qualified Streaming.Prelude as SP |
207 | |
208 | import Control.Monad (forever) |
209 | import Control.Monad.Trans.Resource |
210 | import Data.Int (Int64) |
211 | import qualified Data.List as L |
212 | import Data.Word (Word8) |
213 | import Foreign.Ptr |
214 | import Foreign.Storable |
215 | import System.IO (Handle, IOMode(..), hClose, openBinaryFile) |
216 | import qualified System.IO as IO (stdin, stdout) |
217 | import System.IO.Error (illegalOperationErrorType, mkIOError) |
218 | |
219 | -- | /O(n)/ Concatenate a stream of byte streams. |
220 | concat :: Monad m => Stream (ByteStream m) m r -> ByteStream m r |
221 | concat x = destroy x join Go Empty |
222 | {-# INLINE concat #-} |
223 | |
224 | -- | Given a byte stream on a transformed monad, make it possible to \'run\' |
225 | -- transformer. |
226 | distribute |
227 | :: (Monad m, MonadTrans t, MFunctor t, Monad (t m), Monad (t (ByteStream m))) |
228 | => ByteStream (t m) a -> t (ByteStream m) a |
229 | distribute ls = dematerialize ls |
230 | return |
231 | (\bs x -> join $ lift $ Chunk bs (Empty x) ) |
232 | (join . hoist (Go . fmap Empty)) |
233 | {-# INLINE distribute #-} |
234 | |
235 | -- | Perform the effects contained in an effectful bytestring, ignoring the bytes. |
236 | effects :: Monad m => ByteStream m r -> m r |
237 | effects bs = case bs of |
238 | Empty r -> return r |
239 | Go m -> m >>= effects |
240 | Chunk _ rest -> effects rest |
241 | {-# INLINABLE effects #-} |
242 | |
243 | -- | Perform the effects contained in the second in an effectful pair of |
244 | -- bytestrings, ignoring the bytes. It would typically be used at the type |
245 | -- |
246 | -- > ByteStream m (ByteStream m r) -> ByteStream m r |
247 | drained :: (Monad m, MonadTrans t, Monad (t m)) => t m (ByteStream m r) -> t m r |
248 | drained t = t >>= lift . effects |
249 | |
250 | -- ----------------------------------------------------------------------------- |
251 | -- Introducing and eliminating 'ByteStream's |
252 | |
253 | -- | /O(1)/ The empty 'ByteStream' -- i.e. @return ()@ Note that @ByteStream m w@ is |
254 | -- generally a monoid for monoidal values of @w@, like @()@. |
255 | empty :: ByteStream m () |
256 | empty = Empty () |
257 | {-# INLINE empty #-} |
258 | |
259 | -- | /O(1)/ Yield a 'Word8' as a minimal 'ByteStream'. |
260 | singleton :: Monad m => Word8 -> ByteStream m () |
261 | singleton w = Chunk (B.singleton w) (Empty ()) |
262 | {-# INLINE singleton #-} |
263 | |
264 | -- | /O(n)/ Convert a monadic stream of individual 'Word8's into a packed byte stream. |
265 | pack :: Monad m => Stream (Of Word8) m r -> ByteStream m r |
266 | pack = packBytes |
267 | {-# INLINE pack #-} |
268 | |
269 | -- | /O(n)/ Converts a packed byte stream into a stream of individual bytes. |
270 | unpack :: Monad m => ByteStream m r -> Stream (Of Word8) m r |
271 | unpack = unpackBytes |
272 | |
273 | -- | /O(c)/ Convert a monadic stream of individual strict 'ByteString' chunks |
274 | -- into a byte stream. |
275 | fromChunks :: Monad m => Stream (Of P.ByteString) m r -> ByteStream m r |
276 | fromChunks cs = destroy cs (\(bs :> rest) -> Chunk bs rest) Go return |
277 | {-# INLINE fromChunks #-} |
278 | |
279 | -- | /O(c)/ Convert a byte stream into a stream of individual strict |
280 | -- bytestrings. This of course exposes the internal chunk structure. |
281 | toChunks :: Monad m => ByteStream m r -> Stream (Of P.ByteString) m r |
282 | toChunks bs = dematerialize bs return (\b mx -> Step (b:> mx)) Effect |
283 | {-# INLINE toChunks #-} |
284 | |
285 | -- | /O(1)/ Yield a strict 'ByteString' chunk. |
286 | fromStrict :: P.ByteString -> ByteStream m () |
287 | fromStrict bs | B.null bs = Empty () |
288 | | otherwise = Chunk bs (Empty ()) |
289 | {-# INLINE fromStrict #-} |
290 | |
291 | -- | /O(n)/ Convert a byte stream into a single strict 'ByteString'. |
292 | -- |
293 | -- Note that this is an /expensive/ operation that forces the whole monadic |
294 | -- ByteString into memory and then copies all the data. If possible, try to |
295 | -- avoid converting back and forth between streaming and strict bytestrings. |
296 | toStrict_ :: Monad m => ByteStream m r -> m B.ByteString |
297 | #if MIN_VERSION_streaming (0,2,2) |
298 | toStrict_ = fmap B.concat . SP.toList_ . toChunks |
299 | #else |
300 | toStrict_ = fmap B.concat . SP.toList_ . void . toChunks |
301 | #endif |
302 | {-# INLINE toStrict_ #-} |
303 | |
304 | -- | /O(n)/ Convert a monadic byte stream into a single strict 'ByteString', |
305 | -- retaining the return value of the original pair. This operation is for use |
306 | -- with 'mapped'. |
307 | -- |
308 | -- > mapped R.toStrict :: Monad m => Stream (ByteStream m) m r -> Stream (Of ByteString) m r |
309 | -- |
310 | -- It is subject to all the objections one makes to Data.ByteString.Lazy |
311 | -- 'toStrict'; all of these are devastating. |
312 | toStrict :: Monad m => ByteStream m r -> m (Of B.ByteString r) |
313 | toStrict bs = do |
314 | (bss :> r) <- SP.toList (toChunks bs) |
315 | return (B.concat bss :> r) |
316 | {-# INLINE toStrict #-} |
317 | |
318 | -- |/O(c)/ Transmute a pseudo-pure lazy bytestring to its representation as a |
319 | -- monadic stream of chunks. |
320 | -- |
321 | -- >>> Q.putStrLn $ Q.fromLazy "hi" |
322 | -- hi |
323 | -- >>> Q.fromLazy "hi" |
324 | -- Chunk "hi" (Empty (())) -- note: a 'show' instance works in the identity monad |
325 | -- >>> Q.fromLazy $ BL.fromChunks ["here", "are", "some", "chunks"] |
326 | -- Chunk "here" (Chunk "are" (Chunk "some" (Chunk "chunks" (Empty (()))))) |
327 | fromLazy :: Monad m => BI.ByteString -> ByteStream m () |
328 | fromLazy = BI.foldrChunks Chunk (Empty ()) |
329 | {-# INLINE fromLazy #-} |
330 | |
331 | -- | /O(n)/ Convert an effectful byte stream into a single lazy 'ByteStream' |
332 | -- with the same internal chunk structure. See `toLazy` which preserve |
333 | -- connectedness by keeping the return value of the effectful bytestring. |
334 | toLazy_ :: Monad m => ByteStream m r -> m BI.ByteString |
335 | toLazy_ bs = dematerialize bs (\_ -> return BI.Empty) (fmap . BI.Chunk) join |
336 | {-# INLINE toLazy_ #-} |
337 | |
338 | -- | /O(n)/ Convert an effectful byte stream into a single lazy 'ByteString' |
339 | -- with the same internal chunk structure, retaining the original return value. |
340 | -- |
341 | -- This is the canonical way of breaking streaming (`toStrict` and the like are |
342 | -- far more demonic). Essentially one is dividing the interleaved layers of |
343 | -- effects and bytes into one immense layer of effects, followed by the memory |
344 | -- of the succession of bytes. |
345 | -- |
346 | -- Because one preserves the return value, `toLazy` is a suitable argument for |
347 | -- 'Streaming.mapped': |
348 | -- |
349 | -- > S.mapped Q.toLazy :: Stream (ByteStream m) m r -> Stream (Of L.ByteString) m r |
350 | -- |
351 | -- >>> Q.toLazy "hello" |
352 | -- "hello" :> () |
353 | -- >>> S.toListM $ traverses Q.toLazy $ Q.lines "one\ntwo\nthree\nfour\nfive\n" |
354 | -- ["one","two","three","four","five",""] -- [L.ByteString] |
355 | toLazy :: Monad m => ByteStream m r -> m (Of BI.ByteString r) |
356 | toLazy bs0 = dematerialize bs0 |
357 | (\r -> return (BI.Empty :> r)) |
358 | (\b mx -> do |
359 | (bs :> x) <- mx |
360 | return $ BI.Chunk b bs :> x |
361 | ) |
362 | join |
363 | {-# INLINE toLazy #-} |
364 | |
365 | -- --------------------------------------------------------------------- |
366 | -- Basic interface |
367 | -- |
368 | |
369 | -- | Test whether a `ByteStream` is empty, collecting its return value; to reach |
370 | -- the return value, this operation must check the whole length of the string. |
371 | -- |
372 | -- >>> Q.null "one\ntwo\three\nfour\nfive\n" |
373 | -- False :> () |
374 | -- >>> Q.null "" |
375 | -- True :> () |
376 | -- >>> S.print $ mapped R.null $ Q.lines "yours,\nMeredith" |
377 | -- False |
378 | -- False |
379 | -- |
380 | -- Suitable for use with `SP.mapped`: |
381 | -- |
382 | -- @ |
383 | -- S.mapped Q.null :: Streaming (ByteStream m) m r -> Stream (Of Bool) m r |
384 | -- @ |
385 | null :: Monad m => ByteStream m r -> m (Of Bool r) |
386 | null (Empty r) = return (True :> r) |
387 | null (Go m) = m >>= null |
388 | null (Chunk bs rest) = if B.null bs |
389 | then null rest |
390 | else do |
391 | r <- SP.effects (toChunks rest) |
392 | return (False :> r) |
393 | {-# INLINABLE null #-} |
394 | |
395 | -- | /O(1)/ Test whether a `ByteStream` is empty. The value is of course in the |
396 | -- monad of the effects. |
397 | -- |
398 | -- >>> Q.null "one\ntwo\three\nfour\nfive\n" |
399 | -- False |
400 | -- >>> Q.null $ Q.take 0 Q.stdin |
401 | -- True |
402 | -- >>> :t Q.null $ Q.take 0 Q.stdin |
403 | -- Q.null $ Q.take 0 Q.stdin :: MonadIO m => m Bool |
404 | null_ :: Monad m => ByteStream m r -> m Bool |
405 | null_ (Empty _) = return True |
406 | null_ (Go m) = m >>= null_ |
407 | null_ (Chunk bs rest) = if B.null bs |
408 | then null_ rest |
409 | else return False |
410 | {-# INLINABLE null_ #-} |
411 | |
412 | -- | Similar to `null`, but yields the remainder of the `ByteStream` stream when |
413 | -- an answer has been determined. |
414 | testNull :: Monad m => ByteStream m r -> m (Of Bool (ByteStream m r)) |
415 | testNull (Empty r) = return (True :> Empty r) |
416 | testNull (Go m) = m >>= testNull |
417 | testNull p@(Chunk bs rest) = if B.null bs |
418 | then testNull rest |
419 | else return (False :> p) |
420 | {-# INLINABLE testNull #-} |
421 | |
422 | -- | Remove empty ByteStrings from a stream of bytestrings. |
423 | denull :: Monad m => Stream (ByteStream m) m r -> Stream (ByteStream m) m r |
424 | {-# INLINABLE denull #-} |
425 | denull = loop . Right |
426 | where |
427 | -- Scan each substream, dropping empty chunks along the way. As soon as a |
428 | -- non-empty chunk is found, just apply the loop to the next substream in |
429 | -- the terminal value via fmap. If Empty comes up before that happens, |
430 | -- continue the current stream instead with its denulled tail. |
431 | -- |
432 | -- This implementation is tail recursive: |
433 | -- * Recursion via 'loop . Left' continues scanning an inner ByteStream. |
434 | -- * Recursion via 'loop . Right' moves to the next substream. |
435 | -- |
436 | -- The old version below was shorter, but noticeably slower, especially |
437 | -- when empty substreams are frequent: |
438 | -- |
439 | -- denull = hoist (run . maps effects) . separate . mapped nulls |
440 | -- |
441 | loop = \ case |
442 | Left mbs -> case mbs of |
443 | Chunk c cs | B.length c > 0 -> Step $ Chunk c $ fmap (loop . Right) cs |
444 | | otherwise -> loop $ Left cs |
445 | Go m -> Effect $ loop . Left <$> m |
446 | Empty r -> loop $ Right r |
447 | Right strm -> case strm of |
448 | Step mbs -> case mbs of |
449 | Chunk c cs | B.length c > 0 -> Step $ Chunk c $ fmap (loop . Right) cs |
450 | | otherwise -> loop $ Left cs |
451 | Go m -> Effect $ loop . Left <$> m |
452 | Empty r -> loop $ Right r |
453 | Effect m -> Effect $ fmap (loop . Right) m |
454 | r@(Return _) -> r |
455 | |
456 | {-| /O1/ Distinguish empty from non-empty lines, while maintaining streaming; |
457 | the empty ByteStrings are on the right |
458 | |
459 | >>> nulls :: ByteStream m r -> m (Sum (ByteStream m) (ByteStream m) r) |
460 | |
461 | There are many (generally slower) ways to remove null bytestrings from a |
462 | @Stream (ByteStream m) m r@ (besides using @denull@). If we pass next to |
463 | |
464 | >>> mapped nulls bs :: Stream (Sum (ByteStream m) (ByteStream m)) m r |
465 | |
466 | then can then apply @Streaming.separate@ to get |
467 | |
468 | >>> separate (mapped nulls bs) :: Stream (ByteStream m) (Stream (ByteStream m) m) r |
469 | |
470 | The inner monad is now made of the empty bytestrings; we act on this |
471 | with @hoist@ , considering that |
472 | |
473 | >>> :t Q.effects . Q.concat |
474 | Q.effects . Q.concat |
475 | :: Monad m => Stream (Q.ByteStream m) m r -> m r |
476 | |
477 | we have |
478 | |
479 | >>> hoist (Q.effects . Q.concat) . separate . mapped Q.nulls |
480 | :: Monad n => Stream (Q.ByteStream n) n b -> Stream (Q.ByteStream n) n b |
481 | -} |
482 | nulls :: Monad m => ByteStream m r -> m (Sum (ByteStream m) (ByteStream m) r) |
483 | nulls (Empty r) = return (InR (return r)) |
484 | nulls (Go m) = m >>= nulls |
485 | nulls (Chunk bs rest) = if B.null bs |
486 | then nulls rest |
487 | else return (InL (Chunk bs rest)) |
488 | {-# INLINABLE nulls #-} |
489 | |
490 | -- | Like `length`, report the length in bytes of the `ByteStream` by running |
491 | -- through its contents. Since the return value is in the effect @m@, this is |
492 | -- one way to "get out" of the stream. |
493 | length_ :: Monad m => ByteStream m r -> m Int |
494 | length_ = fmap (\(n:> _) -> n) . foldlChunks (\n c -> n + fromIntegral (B.length c)) 0 |
495 | {-# INLINE length_ #-} |
496 | |
497 | -- | /O(n\/c)/ 'length' returns the length of a byte stream as an 'Int' together |
498 | -- with the return value. This makes various maps possible. |
499 | -- |
500 | -- >>> Q.length "one\ntwo\three\nfour\nfive\n" |
501 | -- 23 :> () |
502 | -- >>> S.print $ S.take 3 $ mapped Q.length $ Q.lines "one\ntwo\three\nfour\nfive\n" |
503 | -- 3 |
504 | -- 8 |
505 | -- 4 |
506 | length :: Monad m => ByteStream m r -> m (Of Int r) |
507 | length = foldlChunks (\n c -> n + fromIntegral (B.length c)) 0 |
508 | {-# INLINE length #-} |
509 | |
510 | -- | /O(1)/ 'cons' is analogous to @(:)@ for lists. |
511 | cons :: Monad m => Word8 -> ByteStream m r -> ByteStream m r |
512 | cons c cs = Chunk (B.singleton c) cs |
513 | {-# INLINE cons #-} |
514 | |
515 | -- | /O(1)/ Unlike 'cons', 'cons\'' is strict in the ByteString that we are |
516 | -- consing onto. More precisely, it forces the head and the first chunk. It does |
517 | -- this because, for space efficiency, it may coalesce the new byte onto the |
518 | -- first \'chunk\' rather than starting a new \'chunk\'. |
519 | -- |
520 | -- So that means you can't use a lazy recursive contruction like this: |
521 | -- |
522 | -- > let xs = cons\' c xs in xs |
523 | -- |
524 | -- You can however use 'cons', as well as 'repeat' and 'cycle', to build |
525 | -- infinite byte streams. |
526 | cons' :: Word8 -> ByteStream m r -> ByteStream m r |
527 | cons' w (Chunk c cs) | B.length c < 16 = Chunk (B.cons w c) cs |
528 | cons' w cs = Chunk (B.singleton w) cs |
529 | {-# INLINE cons' #-} |
530 | |
531 | -- | /O(n\/c)/ Append a byte to the end of a 'ByteStream'. |
532 | snoc :: Monad m => ByteStream m r -> Word8 -> ByteStream m r |
533 | snoc cs w = do -- cs <* singleton w |
534 | r <- cs |
535 | singleton w |
536 | return r |
537 | {-# INLINE snoc #-} |
538 | |
539 | -- | /O(1)/ Extract the first element of a 'ByteStream', which must be non-empty. |
540 | head_ :: Monad m => ByteStream m r -> m Word8 |
541 | head_ (Empty _) = error "head" |
542 | head_ (Chunk c bs) = if B.null c |
543 | then head_ bs |
544 | else return $ B.unsafeHead c |
545 | head_ (Go m) = m >>= head_ |
546 | {-# INLINABLE head_ #-} |
547 | |
548 | -- | /O(c)/ Extract the first element of a 'ByteStream', if there is one. |
549 | -- Suitable for use with `SP.mapped`: |
550 | -- |
551 | -- @ |
552 | -- S.mapped Q.head :: Stream (Q.ByteStream m) m r -> Stream (Of (Maybe Word8)) m r |
553 | -- @ |
554 | head :: Monad m => ByteStream m r -> m (Of (Maybe Word8) r) |
555 | head (Empty r) = return (Nothing :> r) |
556 | head (Chunk c rest) = case B.uncons c of |
557 | Nothing -> head rest |
558 | Just (w,_) -> do |
559 | r <- SP.effects $ toChunks rest |
560 | return $! Just w :> r |
561 | head (Go m) = m >>= head |
562 | {-# INLINABLE head #-} |
563 | |
564 | -- | /O(1)/ Extract the head and tail of a 'ByteStream', or its return value if |
565 | -- it is empty. This is the \'natural\' uncons for an effectful byte stream. |
566 | uncons :: Monad m => ByteStream m r -> m (Either r (Word8, ByteStream m r)) |
567 | uncons (Chunk c@(B.length -> len) cs) |
568 | | len > 0 = let !h = B.unsafeHead c |
569 | !t = if len > 1 then Chunk (B.unsafeTail c) cs else cs |
570 | in return $ Right (h, t) |
571 | | otherwise = uncons cs |
572 | uncons (Go m) = m >>= uncons |
573 | uncons (Empty r) = return (Left r) |
574 | {-# INLINABLE uncons #-} |
575 | |
576 | -- | The same as `uncons`, will be removed in the next version. |
577 | nextByte :: Monad m => ByteStream m r -> m (Either r (Word8, ByteStream m r)) |
578 | nextByte = uncons |
579 | {-# INLINABLE nextByte #-} |
580 | {-# DEPRECATED nextByte "Use uncons instead." #-} |
581 | |
582 | -- | Like `uncons`, but yields the entire first `B.ByteString` chunk that the |
583 | -- stream is holding onto. If there wasn't one, it tries to fetch it. Yields |
584 | -- the final @r@ return value when the 'ByteStream' is empty. |
585 | unconsChunk :: Monad m => ByteStream m r -> m (Either r (B.ByteString, ByteStream m r)) |
586 | unconsChunk (Chunk c cs) |
587 | | B.null c = unconsChunk cs |
588 | | otherwise = return (Right (c,cs)) |
589 | unconsChunk (Go m) = m >>= unconsChunk |
590 | unconsChunk (Empty r) = return (Left r) |
591 | {-# INLINABLE unconsChunk #-} |
592 | |
593 | -- | The same as `unconsChunk`, will be removed in the next version. |
594 | nextChunk :: Monad m => ByteStream m r -> m (Either r (B.ByteString, ByteStream m r)) |
595 | nextChunk = unconsChunk |
596 | {-# INLINABLE nextChunk #-} |
597 | {-# DEPRECATED nextChunk "Use unconsChunk instead." #-} |
598 | |
599 | -- | /O(n\/c)/ Extract the last element of a 'ByteStream', which must be finite |
600 | -- and non-empty. |
601 | last_ :: Monad m => ByteStream m r -> m Word8 |
602 | last_ (Empty _) = error "Streaming.ByteString.last: empty string" |
603 | last_ (Go m) = m >>= last_ |
604 | last_ (Chunk c0 cs0) = go c0 cs0 |
605 | where |
606 | go c (Empty _) = if B.null c |
607 | then error "Streaming.ByteString.last: empty string" |
608 | else return $ unsafeLast c |
609 | go _ (Chunk c cs) = go c cs |
610 | go x (Go m) = m >>= go x |
611 | {-# INLINABLE last_ #-} |
612 | |
613 | -- | Extract the last element of a `ByteStream`, if possible. Suitable for use |
614 | -- with `SP.mapped`: |
615 | -- |
616 | -- @ |
617 | -- S.mapped Q.last :: Streaming (ByteStream m) m r -> Stream (Of (Maybe Word8)) m r |
618 | -- @ |
619 | last :: Monad m => ByteStream m r -> m (Of (Maybe Word8) r) |
620 | last (Empty r) = return (Nothing :> r) |
621 | last (Go m) = m >>= last |
622 | last (Chunk c0 cs0) = go c0 cs0 |
623 | where |
624 | go c (Empty r) = return (Just (unsafeLast c) :> r) |
625 | go _ (Chunk c cs) = go c cs |
626 | go x (Go m) = m >>= go x |
627 | {-# INLINABLE last #-} |
628 | |
629 | -- | /O(n\/c)/ Append two `ByteString`s together. |
630 | append :: Monad m => ByteStream m r -> ByteStream m s -> ByteStream m s |
631 | append xs ys = dematerialize xs (const ys) Chunk Go |
632 | {-# INLINE append #-} |
633 | |
634 | -- --------------------------------------------------------------------- |
635 | -- Transformations |
636 | |
637 | -- | /O(n)/ 'map' @f xs@ is the ByteStream obtained by applying @f@ to each |
638 | -- element of @xs@. |
639 | map :: Monad m => (Word8 -> Word8) -> ByteStream m r -> ByteStream m r |
640 | map f z = dematerialize z Empty (Chunk . B.map f) Go |
641 | {-# INLINE map #-} |
642 | |
643 | -- | @'for' xs f@ applies @f@ to each chunk in the stream, and |
644 | -- concatenates the resulting streams. |
645 | -- |
646 | -- Generalised in 0.2.4 to match @streaming@: the callback's (ignored) |
647 | -- return value can be of any type. |
648 | -- |
649 | -- @since 0.2.3 |
650 | for :: Monad m => ByteStream m r -> (P.ByteString -> ByteStream m x) -> ByteStream m r |
651 | for stream f = case stream of |
652 | Empty r -> Empty r |
653 | Chunk bs bss -> f bs *> for bss f |
654 | Go m -> Go ((`for` f) <$> m) |
655 | {-# INLINE for #-} |
656 | |
657 | -- -- | /O(n)/ 'reverse' @xs@ returns the elements of @xs@ in reverse order. |
658 | -- reverse :: ByteString -> ByteString |
659 | -- reverse cs0 = rev Empty cs0 |
660 | -- where rev a Empty = a |
661 | -- rev a (Chunk c cs) = rev (Chunk (B.reverse c) a) cs |
662 | -- {-# INLINE reverse #-} |
663 | |
664 | -- | The 'intersperse' function takes a 'Word8' and a 'ByteStream' and |
665 | -- \`intersperses\' that byte between the elements of the 'ByteStream'. It is |
666 | -- analogous to the intersperse function on Streams. |
667 | intersperse :: Monad m => Word8 -> ByteStream m r -> ByteStream m r |
668 | intersperse _ (Empty r) = Empty r |
669 | intersperse w (Go m) = Go (fmap (intersperse w) m) |
670 | intersperse w (Chunk c cs) | B.null c = intersperse w cs |
671 | | otherwise = |
672 | Chunk (B.intersperse w c) |
673 | (dematerialize cs Empty (Chunk . intersperse') Go) |
674 | where intersperse' :: P.ByteString -> P.ByteString |
675 | intersperse' (B.PS fp o l) |
676 | | l > 0 = B.unsafeCreate (2*l) $ \p' -> unsafeWithForeignPtr fp $ \p -> do |
677 | poke p' w |
678 | B.c_intersperse (p' `plusPtr` 1) (p `plusPtr` o) (fromIntegral l) w |
679 | | otherwise = B.empty |
680 | {-# INLINABLE intersperse #-} |
681 | |
682 | -- | 'foldr', applied to a binary operator, a starting value (typically the |
683 | -- right-identity of the operator), and a ByteStream, reduces the ByteStream |
684 | -- using the binary operator, from right to left. |
685 | -- |
686 | foldr :: Monad m => (Word8 -> a -> a) -> a -> ByteStream m () -> m a |
687 | foldr k = foldrChunks (flip (B.foldr k)) |
688 | {-# INLINE foldr #-} |
689 | |
690 | -- | 'fold_', applied to a binary operator, a starting value (typically the |
691 | -- left-identity of the operator), and a ByteStream, reduces the ByteStream |
692 | -- using the binary operator, from left to right. We use the style of the foldl |
693 | -- library for left folds |
694 | fold_ :: Monad m => (x -> Word8 -> x) -> x -> (x -> b) -> ByteStream m () -> m b |
695 | fold_ step0 begin finish p0 = loop p0 begin |
696 | where |
697 | loop p !x = case p of |
698 | Chunk bs bss -> loop bss $! B.foldl' step0 x bs |
699 | Go m -> m >>= \p' -> loop p' x |
700 | Empty _ -> return (finish x) |
701 | {-# INLINABLE fold_ #-} |
702 | |
703 | -- | 'fold' keeps the return value of the left-folded bytestring. Useful for |
704 | -- simultaneous folds over a segmented bytestream. |
705 | fold :: Monad m => (x -> Word8 -> x) -> x -> (x -> b) -> ByteStream m r -> m (Of b r) |
706 | fold step0 begin finish p0 = loop p0 begin |
707 | where |
708 | loop p !x = case p of |
709 | Chunk bs bss -> loop bss $! B.foldl' step0 x bs |
710 | Go m -> m >>= \p' -> loop p' x |
711 | Empty r -> return (finish x :> r) |
712 | {-# INLINABLE fold #-} |
713 | |
714 | -- --------------------------------------------------------------------- |
715 | -- Special folds |
716 | |
717 | -- /O(n)/ Concatenate a list of ByteStreams. |
718 | -- concat :: (Monad m) => [ByteStream m ()] -> ByteStream m () |
719 | -- concat css0 = to css0 |
720 | -- where |
721 | -- go css (Empty m') = to css |
722 | -- go css (Chunk c cs) = Chunk c (go css cs) |
723 | -- go css (Go m) = Go (fmap (go css) m) |
724 | -- to [] = Empty () |
725 | -- to (cs:css) = go css cs |
726 | |
727 | -- --------------------------------------------------------------------- |
728 | -- Unfolds and replicates |
729 | |
730 | {-| @'iterate' f x@ returns an infinite ByteStream of repeated applications |
731 | -- of @f@ to @x@: |
732 | |
733 | > iterate f x == [x, f x, f (f x), ...] |
734 | |
735 | >>> R.stdout $ R.take 50 $ R.iterate succ 39 |
736 | ()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXY |
737 | >>> Q.putStrLn $ Q.take 50 $ Q.iterate succ '\'' |
738 | ()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXY |
739 | -} |
740 | iterate :: (Word8 -> Word8) -> Word8 -> ByteStream m r |
741 | iterate f = unfoldr (\x -> case f x of !x' -> Right (x', x')) |
742 | {-# INLINABLE iterate #-} |
743 | |
744 | {- | @'repeat' x@ is an infinite ByteStream, with @x@ the value of every |
745 | element. |
746 | |
747 | >>> R.stdout $ R.take 50 $ R.repeat 60 |
748 | <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< |
749 | >>> Q.putStrLn $ Q.take 50 $ Q.repeat 'z' |
750 | zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz |
751 | -} |
752 | repeat :: Word8 -> ByteStream m r |
753 | repeat w = cs where cs = Chunk (B.replicate BI.smallChunkSize w) cs |
754 | {-# INLINABLE repeat #-} |
755 | |
756 | {- | 'cycle' ties a finite ByteStream into a circular one, or equivalently, |
757 | the infinite repetition of the original ByteStream. For an empty bytestring |
758 | (like @return 17@) it of course makes an unproductive loop |
759 | |
760 | >>> Q.putStrLn $ Q.take 7 $ Q.cycle "y\n" |
761 | y |
762 | y |
763 | y |
764 | y |
765 | -} |
766 | cycle :: Monad m => ByteStream m r -> ByteStream m s |
767 | cycle = forever |
768 | {-# INLINE cycle #-} |
769 | |
770 | -- | /O(n)/ The 'unfoldM' function is analogous to the Stream @unfoldr@. |
771 | -- 'unfoldM' builds a ByteStream from a seed value. The function takes the |
772 | -- element and returns 'Nothing' if it is done producing the ByteStream or |
773 | -- returns @'Just' (a,b)@, in which case, @a@ is a prepending to the ByteStream |
774 | -- and @b@ is used as the next element in a recursive call. |
775 | unfoldM :: Monad m => (a -> Maybe (Word8, a)) -> a -> ByteStream m () |
776 | unfoldM f s0 = unfoldChunk 32 s0 |
777 | where unfoldChunk n s = |
778 | case B.unfoldrN n f s of |
779 | (c, Nothing) |
780 | | B.null c -> Empty () |
781 | | otherwise -> Chunk c (Empty ()) |
782 | (c, Just s') -> Chunk c (unfoldChunk (n*2) s') |
783 | {-# INLINABLE unfoldM #-} |
784 | |
785 | -- | Like `unfoldM`, but yields a final @r@ when the `Word8` generation is |
786 | -- complete. |
787 | unfoldr :: (a -> Either r (Word8, a)) -> a -> ByteStream m r |
788 | unfoldr f s0 = unfoldChunk 32 s0 |
789 | where unfoldChunk n s = |
790 | case unfoldrNE n f s of |
791 | (c, Left r) |
792 | | B.null c -> Empty r |
793 | | otherwise -> Chunk c (Empty r) |
794 | (c, Right s') -> Chunk c (unfoldChunk (n*2) s') |
795 | {-# INLINABLE unfoldr #-} |
796 | |
797 | -- --------------------------------------------------------------------- |
798 | -- Substrings |
799 | |
800 | {-| /O(n\/c)/ 'take' @n@, applied to a ByteStream @xs@, returns the prefix |
801 | of @xs@ of length @n@, or @xs@ itself if @n > 'length' xs@. |
802 | |
803 | Note that in the streaming context this drops the final return value; |
804 | 'splitAt' preserves this information, and is sometimes to be preferred. |
805 | |
806 | >>> Q.putStrLn $ Q.take 8 $ "Is there a God?" >> return True |
807 | Is there |
808 | >>> Q.putStrLn $ "Is there a God?" >> return True |
809 | Is there a God? |
810 | True |
811 | >>> rest <- Q.putStrLn $ Q.splitAt 8 $ "Is there a God?" >> return True |
812 | Is there |
813 | >>> Q.effects rest |
814 | True |
815 | -} |
816 | take :: Monad m => Int64 -> ByteStream m r -> ByteStream m () |
817 | take i _ | i <= 0 = Empty () |
818 | take i cs0 = take' i cs0 |
819 | where take' 0 _ = Empty () |
820 | take' _ (Empty _) = Empty () |
821 | take' n (Chunk c cs) = |
822 | if n < fromIntegral (B.length c) |
823 | then Chunk (B.take (fromIntegral n) c) (Empty ()) |
824 | else Chunk c (take' (n - fromIntegral (B.length c)) cs) |
825 | take' n (Go m) = Go (fmap (take' n) m) |
826 | {-# INLINABLE take #-} |
827 | |
828 | {-| /O(n\/c)/ 'drop' @n xs@ returns the suffix of @xs@ after the first @n@ |
829 | elements, or @[]@ if @n > 'length' xs@. |
830 | |
831 | >>> Q.putStrLn $ Q.drop 6 "Wisconsin" |
832 | sin |
833 | >>> Q.putStrLn $ Q.drop 16 "Wisconsin" |
834 | <BLANKLINE> |
835 | -} |
836 | drop :: Monad m => Int64 -> ByteStream m r -> ByteStream m r |
837 | drop i p | i <= 0 = p |
838 | drop i cs0 = drop' i cs0 |
839 | where drop' 0 cs = cs |
840 | drop' _ (Empty r) = Empty r |
841 | drop' n (Chunk c cs) = |
842 | if n < fromIntegral (B.length c) |
843 | then Chunk (B.drop (fromIntegral n) c) cs |
844 | else drop' (n - fromIntegral (B.length c)) cs |
845 | drop' n (Go m) = Go (fmap (drop' n) m) |
846 | {-# INLINABLE drop #-} |
847 | |
848 | {-| /O(n\/c)/ 'splitAt' @n xs@ is equivalent to @('take' n xs, 'drop' n xs)@. |
849 | |
850 | >>> rest <- Q.putStrLn $ Q.splitAt 3 "therapist is a danger to good hyphenation, as Knuth notes" |
851 | the |
852 | >>> Q.putStrLn $ Q.splitAt 19 rest |
853 | rapist is a danger |
854 | -} |
855 | splitAt :: Monad m => Int64 -> ByteStream m r -> ByteStream m (ByteStream m r) |
856 | splitAt i cs0 | i <= 0 = Empty cs0 |
857 | splitAt i cs0 = splitAt' i cs0 |
858 | where splitAt' 0 cs = Empty cs |
859 | splitAt' _ (Empty r ) = Empty (Empty r) |
860 | splitAt' n (Chunk c cs) = |
861 | if n < fromIntegral (B.length c) |
862 | then Chunk (B.take (fromIntegral n) c) $ |
863 | Empty (Chunk (B.drop (fromIntegral n) c) cs) |
864 | else Chunk c (splitAt' (n - fromIntegral (B.length c)) cs) |
865 | splitAt' n (Go m) = Go (fmap (splitAt' n) m) |
866 | {-# INLINABLE splitAt #-} |
867 | |
868 | -- | 'takeWhile', applied to a predicate @p@ and a ByteStream @xs@, returns the |
869 | -- longest prefix (possibly empty) of @xs@ of elements that satisfy @p@. |
870 | takeWhile :: Monad m => (Word8 -> Bool) -> ByteStream m r -> ByteStream m () |
871 | takeWhile f cs0 = takeWhile' cs0 |
872 | where |
873 | takeWhile' (Empty _) = Empty () |
874 | takeWhile' (Go m) = Go $ fmap takeWhile' m |
875 | takeWhile' (Chunk c cs) = |
876 | case findIndexOrEnd (not . f) c of |
877 | 0 -> Empty () |
878 | n | n < B.length c -> Chunk (B.take n c) (Empty ()) |
879 | | otherwise -> Chunk c (takeWhile' cs) |
880 | {-# INLINABLE takeWhile #-} |
881 | |
882 | -- | 'dropWhile' @p xs@ returns the suffix remaining after 'takeWhile' @p xs@. |
883 | dropWhile :: Monad m => (Word8 -> Bool) -> ByteStream m r -> ByteStream m r |
884 | dropWhile p = drop' where |
885 | drop' bs = case bs of |
886 | Empty r -> Empty r |
887 | Go m -> Go (fmap drop' m) |
888 | Chunk c cs -> case findIndexOrEnd (not . p) c of |
889 | 0 -> Chunk c cs |
890 | n | n < B.length c -> Chunk (B.drop n c) cs |
891 | | otherwise -> drop' cs |
892 | {-# INLINABLE dropWhile #-} |
893 | |
894 | -- | 'break' @p@ is equivalent to @'span' ('not' . p)@. |
895 | break :: Monad m => (Word8 -> Bool) -> ByteStream m r -> ByteStream m (ByteStream m r) |
896 | break f cs0 = break' cs0 |
897 | where break' (Empty r) = Empty (Empty r) |
898 | break' (Chunk c cs) = |
899 | case findIndexOrEnd f c of |
900 | 0 -> Empty (Chunk c cs) |
901 | n | n < B.length c -> Chunk (B.take n c) $ |
902 | Empty (Chunk (B.drop n c) cs) |
903 | | otherwise -> Chunk c (break' cs) |
904 | break' (Go m) = Go (fmap break' m) |
905 | {-# INLINABLE break #-} |
906 | |
907 | -- | 'span' @p xs@ breaks the ByteStream into two segments. It is equivalent to |
908 | -- @('takeWhile' p xs, 'dropWhile' p xs)@. |
909 | span :: Monad m => (Word8 -> Bool) -> ByteStream m r -> ByteStream m (ByteStream m r) |
910 | span p = break (not . p) |
911 | {-# INLINE span #-} |
912 | |
913 | -- | /O(n)/ Splits a 'ByteStream' into components delimited by separators, where |
914 | -- the predicate returns True for a separator element. The resulting components |
915 | -- do not contain the separators. Two adjacent separators result in an empty |
916 | -- component in the output. eg. |
917 | -- |
918 | -- > splitWith (=='a') "aabbaca" == ["","","bb","c",""] |
919 | -- > splitWith (=='a') [] == [] |
920 | splitWith :: Monad m => (Word8 -> Bool) -> ByteStream m r -> Stream (ByteStream m) m r |
921 | splitWith _ (Empty r) = Return r |
922 | splitWith p (Go m) = Effect $ fmap (splitWith p) m |
923 | splitWith p (Chunk c0 cs0) = comb [] (B.splitWith p c0) cs0 |
924 | where |
925 | -- comb :: [P.ByteString] -> [P.ByteString] -> ByteString -> [ByteString] |
926 | -- comb acc (s:[]) (Empty r) = Step (revChunks (s:acc) (Return r)) |
927 | comb acc [s] (Empty r) = Step $ L.foldl' (flip Chunk) |
928 | (Empty (Return r)) |
929 | (s:acc) |
930 | comb acc [s] (Chunk c cs) = comb (s:acc) (B.splitWith p c) cs |
931 | comb acc b (Go m) = Effect (fmap (comb acc b) m) |
932 | comb acc (s:ss) cs = Step $ L.foldl' (flip Chunk) |
933 | (Empty (comb [] ss cs)) |
934 | (s:acc) |
935 | comb acc [] (Empty r) = Step $ L.foldl' (flip Chunk) |
936 | (Empty (Return r)) |
937 | acc |
938 | comb acc [] (Chunk c cs) = comb acc (B.splitWith p c) cs |
939 | -- comb acc (s:ss) cs = Step (revChunks (s:acc) (comb [] ss cs)) |
940 | |
941 | {-# INLINABLE splitWith #-} |
942 | |
943 | -- | /O(n)/ Break a 'ByteStream' into pieces separated by the byte |
944 | -- argument, consuming the delimiter. I.e. |
945 | -- |
946 | -- > split '\n' "a\nb\nd\ne" == ["a","b","d","e"] |
947 | -- > split 'a' "aXaXaXa" == ["","X","X","X",""] |
948 | -- > split 'x' "x" == ["",""] |
949 | -- |
950 | -- and |
951 | -- |
952 | -- > intercalate [c] . split c == id |
953 | -- > split == splitWith . (==) |
954 | -- |
955 | -- As for all splitting functions in this library, this function does not copy |
956 | -- the substrings, it just constructs new 'ByteStream's that are slices of the |
957 | -- original. |
958 | split :: Monad m => Word8 -> ByteStream m r -> Stream (ByteStream m) m r |
959 | split w = loop |
960 | where |
961 | loop !x = case x of |
962 | Empty r -> Return r |
963 | Go m -> Effect $ fmap loop m |
964 | Chunk c0 cs0 -> comb [] (B.split w c0) cs0 |
965 | comb !acc [] (Empty r) = Step $ revChunks acc (Return r) |
966 | comb acc [] (Chunk c cs) = comb acc (B.split w c) cs |
967 | comb !acc [s] (Empty r) = Step $ revChunks (s:acc) (Return r) |
968 | comb acc [s] (Chunk c cs) = comb (s:acc) (B.split w c) cs |
969 | comb acc b (Go m) = Effect (fmap (comb acc b) m) |
970 | comb acc (s:ss) cs = Step $ revChunks (s:acc) (comb [] ss cs) |
971 | {-# INLINABLE split #-} |
972 | |
973 | -- | The 'group' function takes a ByteStream and returns a list of ByteStreams |
974 | -- such that the concatenation of the result is equal to the argument. Moreover, |
975 | -- each sublist in the result contains only equal elements. For example, |
976 | -- |
977 | -- > group "Mississippi" = ["M","i","ss","i","ss","i","pp","i"] |
978 | -- |
979 | -- It is a special case of 'groupBy', which allows the programmer to supply |
980 | -- their own equality test. |
981 | group :: Monad m => ByteStream m r -> Stream (ByteStream m) m r |
982 | group = go |
983 | where |
984 | go (Empty r) = Return r |
985 | go (Go m) = Effect $ fmap go m |
986 | go (Chunk c cs) |
987 | | B.length c == 1 = Step $ to [c] (B.unsafeHead c) cs |
988 | | otherwise = Step $ to [B.unsafeTake 1 c] (B.unsafeHead c) (Chunk (B.unsafeTail c) cs) |
989 | |
990 | to acc !_ (Empty r) = revNonEmptyChunks acc (Empty (Return r)) |
991 | to acc !w (Go m) = Go $ to acc w <$> m |
992 | to acc !w (Chunk c cs) = case findIndexOrEnd (/= w) c of |
993 | 0 -> revNonEmptyChunks acc (Empty (go (Chunk c cs))) |
994 | n | n == B.length c -> to (B.unsafeTake n c : acc) w cs |
995 | | otherwise -> revNonEmptyChunks (B.unsafeTake n c : acc) (Empty (go (Chunk (B.unsafeDrop n c) cs))) |
996 | {-# INLINABLE group #-} |
997 | |
998 | -- | The 'groupBy' function is a generalized version of 'group'. |
999 | groupBy :: Monad m => (Word8 -> Word8 -> Bool) -> ByteStream m r -> Stream (ByteStream m) m r |
1000 | groupBy rel = go |
1001 | where |
1002 | -- go :: ByteStream m r -> Stream (ByteStream m) m r |
1003 | go (Empty r) = Return r |
1004 | go (Go m) = Effect $ fmap go m |
1005 | go (Chunk c cs) |
1006 | | B.length c == 1 = Step $ to [c] (B.unsafeHead c) cs |
1007 | | otherwise = Step $ to [B.unsafeTake 1 c] (B.unsafeHead c) (Chunk (B.unsafeTail c) cs) |
1008 | |
1009 | -- to :: [B.ByteString] -> Word8 -> ByteStream m r -> ByteStream m (Stream (ByteStream m) m r) |
1010 | to acc !_ (Empty r) = revNonEmptyChunks acc (Empty (Return r)) |
1011 | to acc !w (Go m) = Go $ to acc w <$> m |
1012 | to acc !w (Chunk c cs) = case findIndexOrEnd (not . rel w) c of |
1013 | 0 -> revNonEmptyChunks acc (Empty (go (Chunk c cs))) |
1014 | n | n == B.length c -> to (B.unsafeTake n c : acc) w cs |
1015 | | otherwise -> revNonEmptyChunks (B.unsafeTake n c : acc) (Empty (go (Chunk (B.unsafeDrop n c) cs))) |
1016 | {-# INLINABLE groupBy #-} |
1017 | |
1018 | -- | /O(n)/ The 'intercalate' function takes a 'ByteStream' and a list of |
1019 | -- 'ByteStream's and concatenates the list after interspersing the first |
1020 | -- argument between each element of the list. |
1021 | intercalate :: Monad m => ByteStream m () -> Stream (ByteStream m) m r -> ByteStream m r |
1022 | intercalate s = loop |
1023 | where |
1024 | loop (Return r) = Empty r |
1025 | loop (Effect m) = Go $ fmap loop m |
1026 | loop (Step bs) = bs >>= \case |
1027 | Return r -> Empty r -- not between final substream and stream end |
1028 | x -> s >> loop x |
1029 | {-# INLINABLE intercalate #-} |
1030 | |
1031 | -- | Returns the number of times its argument appears in the `ByteStream`. |
1032 | -- |
1033 | -- > count = length . elemIndices |
1034 | count_ :: Monad m => Word8 -> ByteStream m r -> m Int |
1035 | count_ w = fmap (\(n :> _) -> n) . foldlChunks (\n c -> n + fromIntegral (B.count w c)) 0 |
1036 | {-# INLINE count_ #-} |
1037 | |
1038 | -- | Returns the number of times its argument appears in the `ByteStream`. |
1039 | -- Suitable for use with `SP.mapped`: |
1040 | -- |
1041 | -- @ |
1042 | -- S.mapped (Q.count 37) :: Stream (Q.ByteStream m) m r -> Stream (Of Int) m r |
1043 | -- @ |
1044 | count :: Monad m => Word8 -> ByteStream m r -> m (Of Int r) |
1045 | count w cs = foldlChunks (\n c -> n + fromIntegral (B.count w c)) 0 cs |
1046 | {-# INLINE count #-} |
1047 | |
1048 | -- --------------------------------------------------------------------- |
1049 | -- Searching ByteStreams |
1050 | |
1051 | -- | /O(n)/ 'filter', applied to a predicate and a ByteStream, returns a |
1052 | -- ByteStream containing those characters that satisfy the predicate. |
1053 | filter :: Monad m => (Word8 -> Bool) -> ByteStream m r -> ByteStream m r |
1054 | filter p s = go s |
1055 | where |
1056 | go (Empty r ) = Empty r |
1057 | go (Chunk x xs) = consChunk (B.filter p x) (go xs) |
1058 | go (Go m) = Go (fmap go m) |
1059 | -- should inspect for null |
1060 | {-# INLINABLE filter #-} |
1061 | |
1062 | -- --------------------------------------------------------------------- |
1063 | -- ByteStream IO |
1064 | -- |
1065 | -- Rule for when to close: is it expected to read the whole file? |
1066 | -- If so, close when done. |
1067 | -- |
1068 | |
1069 | -- | Read entire handle contents /lazily/ into a 'ByteStream'. Chunks are read |
1070 | -- on demand, in at most @k@-sized chunks. It does not block waiting for a whole |
1071 | -- @k@-sized chunk, so if less than @k@ bytes are available then they will be |
1072 | -- returned immediately as a smaller chunk. |
1073 | -- |
1074 | -- Note: the 'Handle' should be placed in binary mode with |
1075 | -- 'System.IO.hSetBinaryMode' for 'hGetContentsN' to work correctly. |
1076 | hGetContentsN :: MonadIO m => Int -> Handle -> ByteStream m () |
1077 | hGetContentsN k h = loop -- TODO close on exceptions |
1078 | where |
1079 | loop = do |
1080 | c <- liftIO (B.hGetSome h k) |
1081 | -- only blocks if there is no data available |
1082 | if B.null c |
1083 | then Empty () |
1084 | else Chunk c loop |
1085 | {-# INLINABLE hGetContentsN #-} -- very effective inline pragma |
1086 | |
1087 | -- | Read @n@ bytes into a 'ByteStream', directly from the specified 'Handle', |
1088 | -- in chunks of size @k@. |
1089 | hGetN :: MonadIO m => Int -> Handle -> Int -> ByteStream m () |
1090 | hGetN k h n | n > 0 = readChunks n |
1091 | where |
1092 | readChunks !i = Go $ do |
1093 | c <- liftIO $ B.hGet h (min k i) |
1094 | case B.length c of |
1095 | 0 -> return $ Empty () |
1096 | m -> return $ Chunk c (readChunks (i - m)) |
1097 | hGetN _ _ 0 = Empty () |
1098 | hGetN _ h n = liftIO $ illegalBufferSize h "hGet" n -- <--- REPAIR !!! |
1099 | {-# INLINABLE hGetN #-} |
1100 | |
1101 | -- | hGetNonBlockingN is similar to 'hGetContentsN', except that it will never |
1102 | -- block waiting for data to become available, instead it returns only whatever |
1103 | -- data is available. Chunks are read on demand, in @k@-sized chunks. |
1104 | hGetNonBlockingN :: MonadIO m => Int -> Handle -> Int -> ByteStream m () |
1105 | hGetNonBlockingN k h n | n > 0 = readChunks n |
1106 | where |
1107 | readChunks !i = Go $ do |
1108 | c <- liftIO $ B.hGetNonBlocking h (min k i) |
1109 | case B.length c of |
1110 | 0 -> return (Empty ()) |
1111 | m -> return (Chunk c (readChunks (i - m))) |
1112 | hGetNonBlockingN _ _ 0 = Empty () |
1113 | hGetNonBlockingN _ h n = liftIO $ illegalBufferSize h "hGetNonBlocking" n |
1114 | {-# INLINABLE hGetNonBlockingN #-} |
1115 | |
1116 | illegalBufferSize :: Handle -> String -> Int -> IO a |
1117 | illegalBufferSize handle fn sz = |
1118 | ioError (mkIOError illegalOperationErrorType msg (Just handle) Nothing) |
1119 | --TODO: System.IO uses InvalidArgument here, but it's not exported :-( |
1120 | where |
1121 | msg = fn ++ ": illegal ByteStream size " ++ showsPrec 9 sz [] |
1122 | {-# INLINABLE illegalBufferSize #-} |
1123 | |
1124 | -- | Read entire handle contents /lazily/ into a 'ByteStream'. Chunks are read |
1125 | -- on demand, using the default chunk size. |
1126 | -- |
1127 | -- Note: the 'Handle' should be placed in binary mode with |
1128 | -- 'System.IO.hSetBinaryMode' for 'hGetContents' to work correctly. |
1129 | hGetContents :: MonadIO m => Handle -> ByteStream m () |
1130 | hGetContents = hGetContentsN defaultChunkSize |
1131 | {-# INLINE hGetContents #-} |
1132 | |
1133 | -- | Pipes-style nomenclature for 'hGetContents'. |
1134 | fromHandle :: MonadIO m => Handle -> ByteStream m () |
1135 | fromHandle = hGetContents |
1136 | {-# INLINE fromHandle #-} |
1137 | |
1138 | -- | Pipes-style nomenclature for 'getContents'. |
1139 | stdin :: MonadIO m => ByteStream m () |
1140 | stdin = hGetContents IO.stdin |
1141 | {-# INLINE stdin #-} |
1142 | |
1143 | -- | Read @n@ bytes into a 'ByteStream', directly from the specified 'Handle'. |
1144 | hGet :: MonadIO m => Handle -> Int -> ByteStream m () |
1145 | hGet = hGetN defaultChunkSize |
1146 | {-# INLINE hGet #-} |
1147 | |
1148 | -- | hGetNonBlocking is similar to 'hGet', except that it will never block |
1149 | -- waiting for data to become available, instead it returns only whatever data |
1150 | -- is available. If there is no data available to be read, 'hGetNonBlocking' |
1151 | -- returns 'empty'. |
1152 | -- |
1153 | -- Note: on Windows and with Haskell implementation other than GHC, this |
1154 | -- function does not work correctly; it behaves identically to 'hGet'. |
1155 | hGetNonBlocking :: MonadIO m => Handle -> Int -> ByteStream m () |
1156 | hGetNonBlocking = hGetNonBlockingN defaultChunkSize |
1157 | {-# INLINE hGetNonBlocking #-} |
1158 | |
1159 | -- | Write a 'ByteStream' to a file. Use |
1160 | -- 'Control.Monad.Trans.ResourceT.runResourceT' to ensure that the handle is |
1161 | -- closed. |
1162 | -- |
1163 | -- >>> :set -XOverloadedStrings |
1164 | -- >>> runResourceT $ Q.writeFile "hello.txt" "Hello world.\nGoodbye world.\n" |
1165 | -- >>> :! cat "hello.txt" |
1166 | -- Hello world. |
1167 | -- Goodbye world. |
1168 | -- >>> runResourceT $ Q.writeFile "hello2.txt" $ Q.readFile "hello.txt" |
1169 | -- >>> :! cat hello2.txt |
1170 | -- Hello world. |
1171 | -- Goodbye world. |
1172 | writeFile :: MonadResource m => FilePath -> ByteStream m r -> m r |
1173 | writeFile f str = do |
1174 | (key, handle) <- allocate (openBinaryFile f WriteMode) hClose |
1175 | r <- hPut handle str |
1176 | release key |
1177 | return r |
1178 | {-# INLINE writeFile #-} |
1179 | |
1180 | -- | Read an entire file into a chunked @'ByteStream' IO ()@. The handle will be |
1181 | -- held open until EOF is encountered. The block governed by |
1182 | -- 'Control.Monad.Trans.Resource.runResourceT' will end with the closing of any |
1183 | -- handles opened. |
1184 | -- |
1185 | -- >>> :! cat hello.txt |
1186 | -- Hello world. |
1187 | -- Goodbye world. |
1188 | -- >>> runResourceT $ Q.stdout $ Q.readFile "hello.txt" |
1189 | -- Hello world. |
1190 | -- Goodbye world. |
1191 | readFile :: MonadResource m => FilePath -> ByteStream m () |
1192 | readFile f = bracketByteString (openBinaryFile f ReadMode) hClose hGetContents |
1193 | {-# INLINE readFile #-} |
1194 | |
1195 | -- | Append a 'ByteStream' to a file. Use |
1196 | -- 'Control.Monad.Trans.ResourceT.runResourceT' to ensure that the handle is |
1197 | -- closed. |
1198 | -- |
1199 | -- >>> runResourceT $ Q.writeFile "hello.txt" "Hello world.\nGoodbye world.\n" |
1200 | -- >>> runResourceT $ Q.stdout $ Q.readFile "hello.txt" |
1201 | -- Hello world. |
1202 | -- Goodbye world. |
1203 | -- >>> runResourceT $ Q.appendFile "hello.txt" "sincerely yours,\nArthur\n" |
1204 | -- >>> runResourceT $ Q.stdout $ Q.readFile "hello.txt" |
1205 | -- Hello world. |
1206 | -- Goodbye world. |
1207 | -- sincerely yours, |
1208 | -- Arthur |
1209 | appendFile :: MonadResource m => FilePath -> ByteStream m r -> m r |
1210 | appendFile f str = do |
1211 | (key, handle) <- allocate (openBinaryFile f AppendMode) hClose |
1212 | r <- hPut handle str |
1213 | release key |
1214 | return r |
1215 | {-# INLINE appendFile #-} |
1216 | |
1217 | -- | Equivalent to @hGetContents stdin@. Will read /lazily/. |
1218 | getContents :: MonadIO m => ByteStream m () |
1219 | getContents = hGetContents IO.stdin |
1220 | {-# INLINE getContents #-} |
1221 | |
1222 | -- | Outputs a 'ByteStream' to the specified 'Handle'. |
1223 | hPut :: MonadIO m => Handle -> ByteStream m r -> m r |
1224 | hPut h cs = dematerialize cs return (\x y -> liftIO (B.hPut h x) >> y) (>>= id) |
1225 | {-# INLINE hPut #-} |
1226 | |
1227 | -- | Pipes nomenclature for 'hPut'. |
1228 | toHandle :: MonadIO m => Handle -> ByteStream m r -> m r |
1229 | toHandle = hPut |
1230 | {-# INLINE toHandle #-} |
1231 | |
1232 | -- | Pipes-style nomenclature for @putStr@. |
1233 | stdout :: MonadIO m => ByteStream m r -> m r |
1234 | stdout = hPut IO.stdout |
1235 | {-# INLINE stdout #-} |
1236 | |
1237 | -- -- | Similar to 'hPut' except that it will never block. Instead it returns |
1238 | -- any tail that did not get written. This tail may be 'empty' in the case that |
1239 | -- the whole string was written, or the whole original string if nothing was |
1240 | -- written. Partial writes are also possible. |
1241 | -- |
1242 | -- Note: on Windows and with Haskell implementation other than GHC, this |
1243 | -- function does not work correctly; it behaves identically to 'hPut'. |
1244 | -- |
1245 | -- hPutNonBlocking :: MonadIO m => Handle -> ByteStream m r -> ByteStream m r |
1246 | -- hPutNonBlocking _ (Empty r) = Empty r |
1247 | -- hPutNonBlocking h (Go m) = Go $ fmap (hPutNonBlocking h) m |
1248 | -- hPutNonBlocking h bs@(Chunk c cs) = do |
1249 | -- c' <- lift $ B.hPutNonBlocking h c |
1250 | -- case B.length c' of |
1251 | -- l' | l' == B.length c -> hPutNonBlocking h cs |
1252 | -- 0 -> bs |
1253 | -- _ -> Chunk c' cs |
1254 | -- {-# INLINABLE hPutNonBlocking #-} |
1255 | |
1256 | -- | A synonym for @hPut@, for compatibility |
1257 | -- |
1258 | -- hPutStr :: Handle -> ByteStream IO r -> IO r |
1259 | -- hPutStr = hPut |
1260 | -- |
1261 | -- -- | Write a ByteStream to stdout |
1262 | -- putStr :: ByteStream IO r -> IO r |
1263 | -- putStr = hPut IO.stdout |
1264 | |
1265 | -- | The interact function takes a function of type @ByteStream -> ByteStream@ |
1266 | -- as its argument. The entire input from the standard input device is passed to |
1267 | -- this function as its argument, and the resulting string is output on the |
1268 | -- standard output device. |
1269 | -- |
1270 | -- > interact morph = stdout (morph stdin) |
1271 | interact :: (ByteStream IO () -> ByteStream IO r) -> IO r |
1272 | interact f = stdout (f stdin) |
1273 | {-# INLINE interact #-} |
1274 | |
1275 | -- -- --------------------------------------------------------------------- |
1276 | -- -- Internal utilities |
1277 | |
1278 | -- | Used in `group` and `groupBy`. |
1279 | revNonEmptyChunks :: [P.ByteString] -> ByteStream m r -> ByteStream m r |
1280 | revNonEmptyChunks = L.foldl' (\f bs -> Chunk bs . f) id |
1281 | {-# INLINE revNonEmptyChunks #-} |
1282 | |
1283 | -- | Reverse a list of possibly-empty chunks into a lazy ByteString. |
1284 | revChunks :: Monad m => [P.ByteString] -> r -> ByteStream m r |
1285 | revChunks cs r = L.foldl' (flip Chunk) (Empty r) cs |
1286 | {-# INLINE revChunks #-} |
1287 | |
1288 | -- | Zip a list and a stream-of-byte-streams together. |
1289 | zipWithStream |
1290 | :: Monad m |
1291 | => (forall x . a -> ByteStream m x -> ByteStream m x) |
1292 | -> [a] |
1293 | -> Stream (ByteStream m) m r |
1294 | -> Stream (ByteStream m) m r |
1295 | zipWithStream op zs = loop zs |
1296 | where |
1297 | loop [] !ls = loop zs ls |
1298 | loop a@(x:xs) ls = case ls of |
1299 | Return r -> Return r |
1300 | Step fls -> Step $ fmap (loop xs) (op x fls) |
1301 | Effect mls -> Effect $ fmap (loop a) mls |
1302 | {-# INLINABLE zipWithStream #-} |
1303 | |
1304 | -- | Take a builder constructed otherwise and convert it to a genuine streaming |
1305 | -- bytestring. |
1306 | -- |
1307 | -- >>> Q.putStrLn $ Q.toStreamingByteString $ stringUtf8 "哈斯克尔" <> stringUtf8 " " <> integerDec 98 |
1308 | -- 哈斯克尔 98 |
1309 | -- |
1310 | -- <https://gist.github.com/michaelt/6ea89ca95a77b0ef91f3 This benchmark> shows |
1311 | -- its performance is indistinguishable from @toLazyByteString@ |
1312 | toStreamingByteString :: MonadIO m => Builder -> ByteStream m () |
1313 | toStreamingByteString = toStreamingByteStringWith |
1314 | (safeStrategy BI.smallChunkSize BI.defaultChunkSize) |
1315 | {-# INLINE toStreamingByteString #-} |
1316 | |
1317 | -- | Take a builder and convert it to a genuine streaming bytestring, using a |
1318 | -- specific allocation strategy. |
1319 | toStreamingByteStringWith :: MonadIO m => AllocationStrategy -> Builder -> ByteStream m () |
1320 | toStreamingByteStringWith strategy builder0 = do |
1321 | cios <- liftIO (buildStepToCIOS strategy (runBuilder builder0)) |
1322 | let loop cios0 = case cios0 of |
1323 | Yield1 bs io -> Chunk bs $ do |
1324 | cios1 <- liftIO io |
1325 | loop cios1 |
1326 | Finished buf r -> trimmedChunkFromBuffer buf (Empty r) |
1327 | trimmedChunkFromBuffer buffer k |
1328 | | B.null bs = k |
1329 | | 2 * B.length bs < bufferSize buffer = Chunk (B.copy bs) k |
1330 | | otherwise = Chunk bs k |
1331 | where |
1332 | bs = byteStringFromBuffer buffer |
1333 | loop cios |
1334 | {-# INLINABLE toStreamingByteStringWith #-} |
1335 | {-# SPECIALIZE toStreamingByteStringWith :: AllocationStrategy -> Builder -> ByteStream IO () #-} |
1336 | |
1337 | -- | Concatenate a stream of builders (not a streaming bytestring!) into a |
1338 | -- single builder. |
1339 | -- |
1340 | -- >>> let aa = yield (integerDec 10000) >> yield (string8 " is a number.") >> yield (char8 '\n') |
1341 | -- >>> hPutBuilder IO.stdout $ concatBuilders aa |
1342 | -- 10000 is a number. |
1343 | concatBuilders :: Stream (Of Builder) IO () -> Builder |
1344 | concatBuilders p = builder $ \bstep r -> do |
1345 | case p of |
1346 | Return _ -> runBuilderWith mempty bstep r |
1347 | Step (b :> rest) -> runBuilderWith (b `mappend` concatBuilders rest) bstep r |
1348 | Effect m -> m >>= \p' -> runBuilderWith (concatBuilders p') bstep r |
1349 | {-# INLINABLE concatBuilders #-} |
1350 | |
1351 | -- | A simple construction of a builder from a 'ByteString'. |
1352 | -- |
1353 | -- >>> let aaa = "10000 is a number\n" :: Q.ByteString IO () |
1354 | -- >>> hPutBuilder IO.stdout $ toBuilder aaa |
1355 | -- 10000 is a number |
1356 | toBuilder :: ByteStream IO () -> Builder |
1357 | toBuilder = concatBuilders . SP.map byteString . toChunks |
1358 | {-# INLINABLE toBuilder #-} |