In my last blog post, I made the case that automatic termination is the cause of a lot of problems in both pipes and conduit. In this blog post, I'd like to:
Derive a simple core datatype that does not have automatic termination.
Discuss the desired behavior of this datatype.
Assess what convenience functionality we lost along the way, and see if we can get it back with an added layer on top of the core.
Note that this blog post is meant to demonstrate an idea; it's not to be taken as a working implementation. There are many details that still need to be worked out. I'm hoping that presenting my thoughts in this manner will give enough of a basis for an informed discussion.
Deriving the core datatype
I want to start as simple as possible. Let's begin with the core datatype from pipes 1.0. This leaves out any extra complexity we don't want to deal with in this process, like leftovers, finalizers, or bidirectionality.
data Pipe i o m r
= Pure r
| M (m (Pipe i o m r))
| Yield (Pipe i o m r) o
| Await (i -> Pipe i o m r)
Quick recap: M
is for performing monadic actions. Yield
passes a value
downstream, and gives up control of the flow of execution to downstream. If
downstream meanwhile returns Pure
, the current Pipe
will never get control
of execution again. Similarly, Await
asks for a value from upstream, and will
similarly terminate of upstream returns Pure
.
Let's implement the identity pipe in terms of the raw constructors:
idP :: Monad m => Pipe i i m r
idP = Await (Yield idP)
We'll also need some kind of function to compose two pipes together. The type for this function is:
fuse :: Monad m
=> Pipe a b m r
-> Pipe b c m r
-> Pipe a c m r
You can experiment with some sample code here:
import Control.Monad
data Pipe i o m r
= Pure r
| M (m (Pipe i o m r))
| Yield (Pipe i o m r) o
| Await (i -> Pipe i o m r)
fuse :: Monad m
=> Pipe a b m r
-> Pipe b c m r
-> Pipe a c m r
fuse _ (Pure r) = Pure r
fuse up (M m) = M (liftM (fuse up) m)
fuse up (Yield down o) = Yield (fuse up down) o
fuse up0 (Await down) =
go up0
where
go (Pure r) = Pure r
go (M m) = M (liftM go m)
go (Yield up b) = fuse up (down b)
go (Await up) = Await (go . up)
(>->) = fuse
idP :: Monad m => Pipe i i m r
idP = Await (Yield idP)
consume :: Monad m => Int -> Pipe i o m [i]
consume =
go id
where
go front 0 = Pure (front [])
go front count = Await $ \i -> go (front . (i:)) (count - 1)
yieldMany :: Monad m => [o] -> Pipe i o m r
yieldMany [] = error "FIXME"
yieldMany (o:os) = Yield (yieldMany os) o
runPipe :: Monad m => Pipe () () m r -> m r
runPipe (Pure r) = return r
runPipe (M m) = m >>= runPipe
runPipe (Await f) = runPipe (f ())
runPipe (Yield f ()) = runPipe f
main = runPipe (yieldMany [1..] >-> idP >-> consume 10) >>= print
In this blog post, I want to modify the core datatype to allow for
non-termination. We'll start with the Await
side of the equation, and allow
the identity pipe guide our design for the rest of the other constructors.
(Note: we're going to target simplicity here, not efficiency. Efficiency can be
addressed another time.)
Await and Yield: add Maybe
The simplest way to allow for non-termination is to modify Await
to include a
Maybe
wrapper:
Await (Maybe i -> Pipe i o m r)
This means that, when awaiting for a response from upstream, we can be informed
via Nothing
that no values are available. However, our idP
no longer
compiles. That's because we're getting a Maybe i
, but Yield
expects an i
.
Let's fix this by modifying our Yield
constructor also:
Yield (Pipe i o m r) (Maybe o)
Now our original idP
continues to compile. This change now allows us to write a fold, e.g.:
fold :: Monad m => (r -> i -> r) -> r -> Pipe i o m r
fold f =
loop
where
loop r = Await $ \mi ->
case mi of
Just i -> loop $! f r i
Nothing -> Pure r
Finalization
However, we still haven't actually fixed the non-termination problem. As soon as one Pipe
terminates, the whole Pipe
terminates. The practical issue is that prompt termination is still not achieved. Try out the following example:
import Control.Monad
data Pipe i o m r
= Pure r
| M (m (Pipe i o m r))
| Yield (Pipe i o m r) (Maybe o)
| Await (Maybe i -> Pipe i o m r)
fuse :: Monad m
=> Pipe a b m r
-> Pipe b c m r
-> Pipe a c m r
fuse _ (Pure r) = Pure r
fuse up (M m) = M (liftM (fuse up) m)
fuse up (Yield down o) = Yield (fuse up down) o
fuse up0 (Await down) =
go up0
where
go (Pure r) = Pure r
go (M m) = M (liftM go m)
go (Yield up b) = fuse up (down b)
go (Await up) = Await (go . up)
(>->) :: Monad m
=> Pipe a b m r
-> Pipe b c m r
-> Pipe a c m r
(>->) = fuse
idP :: Monad m => Pipe i i m r
idP = Await (Yield idP)
consume :: Monad m => Int -> Pipe i o m [i]
consume =
go id
where
go front 0 = Pure (front [])
go front count = Await $ \mi ->
case mi of
Just i -> go (front . (i:)) (count - 1)
Nothing -> Pure (front [])
-- show Data producer with finalization
yieldMany :: [o] -> Pipe i o IO r
yieldMany [] = M (putStrLn "Finalization" >> return (Yield (yieldMany []) Nothing))
yieldMany (o:os) = Yield (yieldMany os) (Just o)
-- /show
fold :: Monad m => (r -> i -> r) -> r -> Pipe i o m r
fold f =
loop
where
loop r = Await $ \mi ->
case mi of
Just i -> loop $! f r i
Nothing -> Pure r
runPipe :: Monad m => Pipe () () m r -> m r
runPipe (Pure r) = return r
runPipe (M m) = m >>= runPipe
runPipe (Await f) = runPipe (f Nothing)
runPipe (Yield f _) = runPipe f
-- show When does the finalizer get called?
main = do
runPipe (yieldMany [1..10] >-> idP >-> fold (+) 0) >>= print
runPipe (yieldMany [1..10] >-> idP >-> consume 5) >>= print
-- /show
If the input stream it fully consumed, then Finalization
is printed.
Otherwise, it's not. We need to change our semantics so that we don't exit until all Pipe
s exit.
Downstream is already notified when upstream is done producing data, via Nothing
getting passed with Yield
. We need a similar mechanism on the upstream side. In this case, we want it to be a Maybe
result value. This value needs to be present as soon as the Pipe
begins execution. To allow for this, we're going to refactor our type a bit as follows:
data Step i o m r
= Pure r
| M (m (Step i o m r))
| Yield (Pipe i o m r) (Maybe o)
| Await (Maybe i -> Step i o m r)
type Pipe i o m r = Maybe r -> Step i o m r
Now a Pipe
is notified if downstream has already completed execution. I'd like to focus on one important distinction in the Step
type's constructors: whereas M
and Await
represent the next thing to be done via a Step
value, Yield
represents it with a Pipe
value. The reason for this distinction is that, when you call Yield
, downstream has a chance to continue processing, and may provide a return value if it hadn't provided one previously. In the M
and Await
cases, downstream never gets a chance to provide a new result value.
Let's think about the identity pipe. Its semantics should be that, if downstream is done processing, it's also done processing. If downstream is not done processing, it should await for a new value from upstream and yield it downstream. This turns out to be pretty easy to implement:
idP :: Monad m => Pipe i i m r
idP Nothing = Await (Yield idP)
idP (Just r) = Pure r
Below is our full running example.
import Control.Monad
data Step i o m r
= Pure r
| M (m (Step i o m r))
| Yield (Pipe i o m r) (Maybe o)
| Await (Maybe i -> Step i o m r)
type Pipe i o m r = Maybe r -> Step i o m r
-- show Fusion is a bit more complicated now
fuse :: Monad m
=> Pipe a b m r
-> Pipe b c m r
-> Pipe a c m r
fuse up down mr = fuse' up (down mr)
fuse' :: Monad m
=> (Maybe r -> Step a b m r)
-> Step b c m r
-> Step a c m r
fuse' up0 (Pure r) =
go $ up0 $ Just r
where
go (Pure r') = Pure r'
go (M m) = M (liftM go m)
go (Yield up _) = go $ up $ Just r
go (Await up) = Await $ \ma -> go $ up ma
fuse' up (M m) = M (liftM (fuse' up) m)
fuse' up (Yield down o) = Yield (fuse up down) o
fuse' up0 (Await down) =
go $ up0 Nothing
where
-- It's easy to get this next clause wrong.
-- We need to make sure that we give downstream
-- a chance to finish processing, not just terminate
-- immediately.
go (Pure r) = fuse' (\_ -> Pure r) (down Nothing)
go (M m) = M (liftM go m)
go (Yield up b) = fuse' up (down b)
go (Await up) = Await (go . up)
-- /show
(>->) :: Monad m
=> Pipe a b m r
-> Pipe b c m r
-> Pipe a c m r
(>->) = fuse
idP :: Monad m => Pipe i i m r
idP Nothing = Await (Yield idP)
idP (Just r) = Pure r
-- show Consumers can just ignore downstream results
consume :: Monad m => Int -> Pipe i o m [i]
consume count0 _ =
go id count0
where
go front 0 = Pure (front [])
go front count = Await $ \mi ->
case mi of
Just i -> go (front . (i:)) (count - 1)
Nothing -> Pure (front [])
-- /show
-- show Finalization
-- Note that we only finalize once downstream completes.
-- We could instead finalize as soon as our input is empty,
-- which would allow for more promptness. Try implementing that
-- change. Make sure that the finalizer only gets called once!
yieldMany :: [o] -> Pipe i o IO r
yieldMany _ (Just r) = M (putStrLn "Finalization" >> return (Pure r))
yieldMany [] Nothing = Yield (yieldMany []) Nothing
yieldMany (o:os) Nothing = Yield (yieldMany os) (Just o)
-- /show
fold :: Monad m => (r -> i -> r) -> r -> Pipe i o m r
fold f r0 _ =
loop r0
where
loop r = Await $ \mi ->
case mi of
Just i -> loop $! f r i
Nothing -> Pure r
runPipe :: Monad m => Pipe () () m r -> m r
runPipe f = runStep (f Nothing)
runStep :: Monad m => Step () () m r -> m r
runStep (Pure r) = return r
runStep (M m) = m >>= runStep
runStep (Await f) = runStep (f Nothing)
runStep (Yield f _) = runPipe f
main = do
runPipe (yieldMany [1..10] >-> idP >-> fold (+) 0) >>= print
runPipe (yieldMany [1..10] >-> idP >-> consume 5) >>= print
That's it, we've moved from automatic termination to manual termination. We can now fold, get prompt finalization, and even modify result values from downstream. Let's play with a few more changes.
Draining upstream
Consider writing a Pipe
that takes precisely 5 values from upstream and passes them downstream. If downstream finishes early, it still takes those 5 values. This is not currently possible in pipes or conduit. Let's try it out in our new framework:
import Control.Monad
data Step i o m r
= Pure r
| M (m (Step i o m r))
| Yield (Pipe i o m r) (Maybe o)
| Await (Maybe i -> Step i o m r)
type Pipe i o m r = Maybe r -> Step i o m r
fuse :: Monad m
=> Pipe a b m r
-> Pipe b c m r
-> Pipe a c m r
fuse up down mr = fuse' up (down mr)
fuse' :: Monad m
=> (Maybe r -> Step a b m r)
-> Step b c m r
-> Step a c m r
fuse' up0 (Pure r) =
go $ up0 $ Just r
where
go (Pure r') = Pure r'
go (M m) = M (liftM go m)
go (Yield up _) = go $ up $ Just r
go (Await up) = Await $ \ma -> go $ up ma
fuse' up (M m) = M (liftM (fuse' up) m)
fuse' up (Yield down o) = Yield (fuse up down) o
fuse' up0 (Await down) =
go $ up0 Nothing
where
go (Pure r) = fuse' (\_ -> Pure r) (down Nothing)
go (M m) = M (liftM go m)
go (Yield up b) = fuse' up (down b)
go (Await up) = Await (go . up)
(>->) :: Monad m
=> Pipe a b m r
-> Pipe b c m r
-> Pipe a c m r
(>->) = fuse
idP :: Monad m => Pipe i i m r
idP Nothing = Await (Yield idP)
idP (Just r) = Pure r
consume :: Monad m => Int -> Pipe i o m [i]
consume count0 _ =
go id count0
where
go front 0 = Pure (front [])
go front count = Await $ \mi ->
case mi of
Just i -> go (front . (i:)) (count - 1)
Nothing -> Pure (front [])
fold :: Monad m => (r -> i -> r) -> r -> Pipe i o m r
fold f r0 _ =
loop r0
where
loop r = Await $ \mi ->
case mi of
Just i -> loop $! f r i
Nothing -> Pure r
runPipe :: Monad m => Pipe () () m r -> m r
runPipe f = runStep (f Nothing)
-- show
takeExactly :: Monad m => Int -> Pipe i i m r
takeExactly 0 (Just r) = Pure r
takeExactly 0 Nothing = Yield (takeExactly 0) Nothing
takeExactly count _ = Await $ \mi -> Yield (takeExactly (count - 1)) mi
-- We can optimize the above a bit by skipping
-- extra Awaits, give it a shot.
yieldMany :: Show o => [o] -> Pipe i o IO r
yieldMany rest (Just r) = M $ do
putStrLn $ "Finalization: " ++ show rest
return (Pure r)
yieldMany [] Nothing = Yield (yieldMany []) Nothing
yieldMany (o:os) Nothing = Yield (yieldMany os) (Just o)
main = runPipe (yieldMany [1..10] >-> takeExactly 5 >-> fold (+) 0) >>= print
-- /show
runStep :: Monad m => Step () () m r -> m r
runStep (Pure r) = return r
runStep (M m) = m >>= runStep
runStep (Await f) = runStep (f Nothing)
runStep (Yield f _) = runPipe f
Downstream result type
There's a nice generalization we can provide here. There's no reason that the upstream and downstream result types need to be the same. In fact, if we just add an extra type parameter (d
for downstream result), the rest of our code can remain the same. Here's an example:
import Control.Monad
-- show
data Step i o d m r
= Pure r
| M (m (Step i o d m r))
| Yield (Pipe i o d m r) (Maybe o)
| Await (Maybe i -> Step i o d m r)
type Pipe i o d m r = Maybe d -> Step i o d m r
-- Notice the way that a->b->c flows downstream.
-- On the other hand, the result values x->y->z
-- "bubble up" from downstream.
fuse :: Monad m
=> Pipe a b y m z
-> Pipe b c x m y
-> Pipe a c x m z
-- /show
fuse up down mr = fuse' up (down mr)
fuse' :: Monad m
=> (Maybe y -> Step a b y m z)
-> Step b c x m y
-> Step a c x m z
fuse' up0 (Pure r) =
go $ up0 $ Just r
where
go (Pure r') = Pure r'
go (M m) = M (liftM go m)
go (Yield up _) = go $ up $ Just r
go (Await up) = Await $ \ma -> go $ up ma
fuse' up (M m) = M (liftM (fuse' up) m)
fuse' up (Yield down o) = Yield (fuse up down) o
fuse' up0 (Await down) =
go $ up0 Nothing
where
go (Pure r) = fuse' (\_ -> Pure r) (down Nothing)
go (M m) = M (liftM go m)
go (Yield up b) = fuse' up (down b)
go (Await up) = Await (go . up)
(>->) :: Monad m
=> Pipe a b y m z
-> Pipe b c x m y
-> Pipe a c x m z
(>->) = fuse
-- show
-- Identity keeps the same stream value (i) and result
-- value (r) for both upstream and downstream.
idP :: Monad m => Pipe i i r m r
idP Nothing = Await (Yield idP)
idP (Just r) = Pure r
-- /show
-- show
-- Consumers can just ignore the downstream result.
consume :: Monad m => Int -> Pipe i o d m [i]
consume count0 _ =
go id count0
where
go front 0 = Pure (front [])
go front count = Await $ \mi ->
case mi of
Just i -> go (front . (i:)) (count - 1)
Nothing -> Pure (front [])
-- /show
fold :: Monad m => (r -> i -> r) -> r -> Pipe i o d m r
fold f r0 _ =
loop r0
where
loop r = Await $ \mi ->
case mi of
Just i -> loop $! f r i
Nothing -> Pure r
runPipe :: Monad m => Pipe () () d m r -> m r
runPipe f = runStep (f Nothing)
runStep :: Monad m => Step () () d m r -> m r
runStep (Pure r) = return r
runStep (M m) = m >>= runStep
runStep (Await f) = runStep (f Nothing)
runStep (Yield f _) = runPipe f
-- show
-- Producers can simply return the downstream result
-- as its own result value.
takeExactly :: Monad m => Int -> Pipe i i r m r
takeExactly 0 (Just r) = Pure r
takeExactly 0 Nothing = Yield (takeExactly 0) Nothing
takeExactly count _ = Await $ \mi -> Yield (takeExactly (count - 1)) mi
yieldMany :: Show o => [o] -> Pipe i o r IO r
yieldMany rest (Just r) = M $ do
putStrLn $ "Finalization: " ++ show rest
return (Pure r)
yieldMany [] Nothing = Yield (yieldMany []) Nothing
yieldMany (o:os) Nothing = Yield (yieldMany os) (Just o)
-- /show
main = runPipe (yieldMany [1..10] >-> takeExactly 5 >-> fold (+) 0) >>= print
However, we can also use this functionality to modify the result type. For example, we may want to return any unused values from our input list in yieldMany
. This would look like:
import Control.Monad
data Step i o d m r
= Pure r
| M (m (Step i o d m r))
| Yield (Pipe i o d m r) (Maybe o)
| Await (Maybe i -> Step i o d m r)
type Pipe i o d m r = Maybe d -> Step i o d m r
fuse :: Monad m
=> Pipe a b y m z
-> Pipe b c x m y
-> Pipe a c x m z
fuse up down mr = fuse' up (down mr)
fuse' :: Monad m
=> (Maybe y -> Step a b y m z)
-> Step b c x m y
-> Step a c x m z
fuse' up0 (Pure r) =
go $ up0 $ Just r
where
go (Pure r') = Pure r'
go (M m) = M (liftM go m)
go (Yield up _) = go $ up $ Just r
go (Await up) = Await $ \ma -> go $ up ma
fuse' up (M m) = M (liftM (fuse' up) m)
fuse' up (Yield down o) = Yield (fuse up down) o
fuse' up0 (Await down) =
go $ up0 Nothing
where
go (Pure r) = fuse' (\_ -> Pure r) (down Nothing)
go (M m) = M (liftM go m)
go (Yield up b) = fuse' up (down b)
go (Await up) = Await (go . up)
(>->) :: Monad m
=> Pipe a b y m z
-> Pipe b c x m y
-> Pipe a c x m z
(>->) = fuse
idP :: Monad m => Pipe i i r m r
idP Nothing = Await (Yield idP)
idP (Just r) = Pure r
consume :: Monad m => Int -> Pipe i o d m [i]
consume count0 _ =
go id count0
where
go front 0 = Pure (front [])
go front count = Await $ \mi ->
case mi of
Just i -> go (front . (i:)) (count - 1)
Nothing -> Pure (front [])
fold :: Monad m => (r -> i -> r) -> r -> Pipe i o d m r
fold f r0 _ =
loop r0
where
loop r = Await $ \mi ->
case mi of
Just i -> loop $! f r i
Nothing -> Pure r
runPipe :: Monad m => Pipe () () d m r -> m r
runPipe f = runStep (f Nothing)
runStep :: Monad m => Step () () d m r -> m r
runStep (Pure r) = return r
runStep (M m) = m >>= runStep
runStep (Await f) = runStep (f Nothing)
runStep (Yield f _) = runPipe f
takeExactly :: Monad m => Int -> Pipe i i r m r
takeExactly 0 (Just r) = Pure r
takeExactly 0 Nothing = Yield (takeExactly 0) Nothing
takeExactly count _ = Await $ \mi -> Yield (takeExactly (count - 1)) mi
-- show
yieldMany :: Monad m => [o] -> Pipe i o r m ([o], r)
yieldMany rest (Just r) = Pure (rest, r)
yieldMany [] Nothing = Yield (yieldMany []) Nothing
yieldMany (o:os) Nothing = Yield (yieldMany os) (Just o)
main = runPipe (yieldMany [1..10] >-> takeExactly 5 >-> fold (+) 0) >>= print
-- /show
-- IGNORED
Leftovers/chunked data
So I promised a solution to leftovers as well. Actually, we can base a solution on that preceding example. When we return a result, we should also return any unconsumed input with it. When we monadically compose two Pipe
s, we want the leftovers from the first to be injected into the second to be used any time the second awaits. We haven't actually shown a Monad
instance yet, and without creating a newtype
in place of our type synonym, we can't. But let's play around with some pipeReturn
and pipeBind
functions:
import Control.Monad
-- show
data Step i o d m r
= Pure [i] r
| M (m (Step i o d m r))
| Yield (Pipe i o d m r) (Maybe o)
| Await (Maybe i -> Step i o d m r)
type Pipe i o d m r = Maybe ([o], d) -> Step i o d m r
pipeReturn :: Monad m => r -> Pipe i o d m r
pipeReturn r _downRes = Pure [] r
pipeBind :: Monad m
=> Pipe i o d m a
-> (a -> Pipe i o d m b)
-> Pipe i o d m b
pipeBind f g mr =
go $ f mr
where
go (Pure is a) = inject is (g a) mr
go (M m) = M (liftM go m)
go (Yield p o) = Yield (pipeBind p g) o
go (Await f) = Await (go . f)
inject :: Monad m => [i] -> Pipe i o d m r -> Pipe i o d m r
inject [] p = p
inject is0 p0 =
go is0 . p0
where
go [] p = p
go is (Pure is' r) = Pure (is' ++ is) r
go is (M m) = M (liftM (go is) m)
go is (Yield p o) = Yield (go is . p) o
go (i:is) (Await p) = go is (p $ Just i)
peek :: Monad m => Pipe i o d m (Maybe i)
peek _ = Await $ \mi ->
case mi of
Nothing -> Pure [] Nothing
Just i -> Pure [i] (Just i)
peekFold :: Monad m => (r -> i -> r) -> r -> Pipe i o d m (Maybe i, r)
peekFold f r = peek `pipeBind` \a -> fold f r `pipeBind` \b -> pipeReturn (a, b)
main = do
runPipe (yieldMany [1..10] >-> peek) >>= print
runPipe (yieldMany [1..10] >-> peekFold (+) 0) >>= print
-- /show
fuse :: Monad m
=> Pipe a b y m z
-> Pipe b c x m y
-> Pipe a c x m z
fuse up down mr = fuse' up (down mr)
fuse' :: Monad m
=> (Maybe ([b], y) -> Step a b y m z)
-> Step b c x m y
-> Step a c x m z
fuse' up0 (Pure cs z) =
go $ up0 $ Just (cs, z)
where
go (Pure bs y) = Pure bs y
go (M m) = M (liftM go m)
go (Yield up _) = go $ up $ Just ([], z)
go (Await up) = Await $ \ma -> go $ up ma
fuse' up (M m) = M (liftM (fuse' up) m)
fuse' up (Yield down o) = Yield (fuse up down) o
fuse' up0 (Await down) =
go $ up0 Nothing
where
go (Pure bs y) = fuse' (\_ -> Pure bs y) (down Nothing)
go (M m) = M (liftM go m)
go (Yield up b) = fuse' up (down b)
go (Await up) = Await (go . up)
(>->) :: Monad m
=> Pipe a b y m z
-> Pipe b c x m y
-> Pipe a c x m z
(>->) = fuse
idP :: Monad m => Pipe i i r m r
idP Nothing = Await (Yield idP)
idP (Just (is, r)) = Pure is r
consume :: Monad m => Int -> Pipe i o d m [i]
consume count0 _ =
go id count0
where
go front 0 = Pure [] (front [])
go front count = Await $ \mi ->
case mi of
Just i -> go (front . (i:)) (count - 1)
Nothing -> Pure [] (front [])
fold :: Monad m => (r -> i -> r) -> r -> Pipe i o d m r
fold f r0 _ =
loop r0
where
loop r = Await $ \mi ->
case mi of
Just i -> loop $! f r i
Nothing -> Pure [] r
runPipe :: Monad m => Pipe () () d m r -> m r
runPipe f = runStep (f Nothing)
runStep :: Monad m => Step () () d m r -> m r
runStep (Pure _ r) = return r
runStep (M m) = m >>= runStep
runStep (Await f) = runStep (f Nothing)
runStep (Yield f _) = runPipe f
takeExactly :: Monad m => Int -> Pipe i i r m r
-- We specifically ignore leftovers, since we want to ensure
-- that we consumed exactly the given number of elements
-- from the stream.
takeExactly 0 (Just (_, r)) = Pure [] r
takeExactly 0 Nothing = Yield (takeExactly 0) Nothing
takeExactly count _ = Await $ \mi -> Yield (takeExactly (count - 1)) mi
yieldMany :: Monad m => [o] -> Pipe i o r m ([o], [o], r)
yieldMany rest (Just (os, r)) = Pure [] (rest, os, r)
yieldMany [] Nothing = Yield (yieldMany []) Nothing
yieldMany (o:os) Nothing = Yield (yieldMany os) (Just o)
Early termination
So we've done all of this work to get rid of early termination. But in practice, it's often very convenient to have early termination. For example, a common pipes and conduit idiom to create a producer from an infinite list is:
mapM_ yield [1..]
Without some kind of early termination, this will never exit. Fortunately, having early exit from a Monad
is a solved problem. We could either layer something like MaybeT
in our transformer stack, or add a new constructor to our Step
datatype. There are a few complications introduced by this, notably that we have an extra type parameter (t
for termination) which must be unified with our result type, but the approach works.
import Control.Monad
-- show
data Step i o d t m r
= Pure [i] r
| M (m (Step i o d t m r))
| Yield (Pipe i o d t m r) (Maybe o)
| Await (Maybe i -> Step i o d t m r)
| Stop [i] t
type Pipe i o d t m r = Maybe ([o], d) -> Step i o d t m r
yield :: Monad m => o -> Pipe i o d d m ()
yield _ (Just (_, d)) = Stop [] d
yield o Nothing = Yield (pipeReturn ()) (Just o)
stop :: Monad m => Pipe i o d d m r
stop (Just (_, r)) = Stop [] r
stop Nothing = Yield stop Nothing
pipeMapM_ :: Monad m
=> (a -> Pipe i o d t m ())
-> [a]
-> Pipe i o d t m ()
pipeMapM_ _ [] = pipeReturn ()
pipeMapM_ f (x:xs) = f x `pipeBind` (const (pipeMapM_ f xs))
main :: IO ()
main = do
let producer =
pipeMapM_ yield [1..] `pipeBind`
const stop
runPipe (producer >-> takeExactly 10 >-> fold (+) 0)
>>= print
-- /show
pipeReturn :: Monad m => r -> Pipe i o d t m r
pipeReturn r _downRes = Pure [] r
pipeBind :: Monad m
=> Pipe i o d t m a
-> (a -> Pipe i o d t m b)
-> Pipe i o d t m b
pipeBind f g mr =
go $ f mr
where
go (Pure is a) = inject is (g a) mr
go (M m) = M (liftM go m)
go (Yield p o) = Yield (pipeBind p g) o
go (Await p) = Await (go . p)
go (Stop is t) = Stop is t
inject :: Monad m => [i] -> Pipe i o d t m r -> Pipe i o d t m r
inject [] p = p
inject is0 p0 =
go is0 . p0
where
go [] p = p
go is (Pure is' r) = Pure (is' ++ is) r
go is (M m) = M (liftM (go is) m)
go is (Yield p o) = Yield (go is . p) o
go (i:is) (Await p) = go is (p $ Just i)
go is (Stop is' r) = Stop (is' ++ is) r
peek :: Monad m => Pipe i o d t m (Maybe i)
peek _ = Await $ \mi ->
case mi of
Nothing -> Pure [] Nothing
Just i -> Pure [i] (Just i)
peekFold :: Monad m => (r -> i -> r) -> r -> Pipe i o d t m (Maybe i, r)
peekFold f r = peek `pipeBind` \a -> fold f r `pipeBind` \b -> pipeReturn (a, b)
fuse :: Monad m
=> Pipe a b y z m z
-> Pipe b c x y m y
-> Pipe a c x z m z
fuse up down mr = fuse' up (down mr)
fuse' :: Monad m
=> (Maybe ([b], y) -> Step a b y z m z)
-> Step b c x y m y
-> Step a c x z m z
fuse' up0 (Pure cs z) =
go $ up0 $ Just (cs, z)
where
go (Pure bs y) = Pure bs y
go (M m) = M (liftM go m)
go (Yield up _) = go $ up $ Just ([], z)
go (Await up) = Await $ \ma -> go $ up ma
go (Stop bs y) = Stop bs y
fuse' up (M m) = M (liftM (fuse' up) m)
fuse' up (Yield down o) = Yield (fuse up down) o
fuse' up0 (Await down) =
go $ up0 Nothing
where
go (Pure bs y) = fuse' (\_ -> Pure bs y) (down Nothing)
go (M m) = M (liftM go m)
go (Yield up b) = fuse' up (down b)
go (Await up) = Await (go . up)
go (Stop bs y) = fuse' (\_ -> Stop bs y) (down Nothing)
fuse' up0 (Stop cs z) = fuse' up0 (Pure cs z)
(>->) :: Monad m
=> Pipe a b y z m z
-> Pipe b c x y m y
-> Pipe a c x z m z
(>->) = fuse
idP :: Monad m => Pipe i i r t m r
idP Nothing = Await (Yield idP)
idP (Just (is, r)) = Pure is r
consume :: Monad m => Int -> Pipe i o d t m [i]
consume count0 _ =
go id count0
where
go front 0 = Pure [] (front [])
go front count = Await $ \mi ->
case mi of
Just i -> go (front . (i:)) (count - 1)
Nothing -> Pure [] (front [])
fold :: Monad m => (r -> i -> r) -> r -> Pipe i o d t m r
fold f r0 _ =
loop r0
where
loop r = Await $ \mi ->
case mi of
Just i -> loop $! f r i
Nothing -> Pure [] r
runPipe :: Monad m => Pipe () () d r m r -> m r
runPipe f = runStep (f Nothing)
runStep :: Monad m => Step () () d r m r -> m r
runStep (Pure _ r) = return r
runStep (M m) = m >>= runStep
runStep (Await f) = runStep (f Nothing)
runStep (Yield f _) = runPipe f
runStep (Stop _ r) = return r
takeExactly :: Monad m => Int -> Pipe i i r t m r
-- We specifically ignore leftovers, since we want to ensure
-- that we consumed exactly the given number of elements
-- from the stream.
takeExactly 0 (Just (_, r)) = Pure [] r
takeExactly 0 Nothing = Yield (takeExactly 0) Nothing
takeExactly count _ = Await $ \mi -> Yield (takeExactly (count - 1)) mi
yieldMany :: Monad m => [o] -> Pipe i o r t m ([o], [o], r)
yieldMany rest (Just (os, r)) = Pure [] (rest, os, r)
yieldMany [] Nothing = Yield (yieldMany []) Nothing
yieldMany (o:os) Nothing = Yield (yieldMany os) (Just o)
Conclusion
The approach I've shown here is not optimized. A faster way of implementing this is to collapse Pipe
and Step
back into a single data type, and have a dedicated constructor to asking for downstream results. It's also more efficient to skip all of the Maybe
wrappers. Instead of Await
taking a Maybe i
, Await
can have two fields: one for when a value is available from upstream, and the other for when no such value is available.
Though it's currently highly experimental, I've implemented these ideas in an experimental branch of conduit. A quick translation from our approach is:
- Stop is called Terminate.
- Await has been deconstructed into two fields.
- Instead of
Yield
providing aMaybe o
, it provides ano
. The functionality provided by yieldingNothing
is now handled by theEmpty
constructor. Check
is used for checking for downstream results.Yield
andEmpty
are also able to check for downstream results. In the case ofEmpty
, we are guaranteed that downstream must provide a result. In the case ofYield
, we include the extra field as an optimization, since composingYield
andCheck
is otherwise so common.
I've been able to replicate the current conduit API on top of this. The full test suite passes, including some tests for identity and associativity that failed previously. And the benchmarks show this approach as comparable to previous versions.
This approach seems very promising to me, but I also think we need to take time to analyze it properly. I look forward to hearing what the rest of the community thinks.