我有两个管道源A和B,我想将它们合并为一个产量:
data Result = Left Int | Value Int | Right Int
merge :: Monad m => Source m Int -> Source m Int -> Source Result
merge a b = undefined
如:
a
和的值b
Value Int
a
或者b
可能具有leftover
Left
或Right
值,(取决于哪个原始源仍具有值),直到两个源都被耗尽为止我尝试用以下方式实现它ZipSource
:
getZipSource (ZipSource (a =$= CL.map Left) <* ZipSource (b =$= CL.map Right))
但是我无法弄清楚如何使其在两个源之间交替(当我执行两个await
s时)以及如何以我上面描述的方式处理剩余物。
我也看了看,sequenceSources
但似乎也无济于事。
可以用Conduit构建类似的东西吗?
一个具体的例子是:
Int
来源min
值,从最大值中减去该值,然后将其余值放回其流中预期的输出将是:
runConduit $ merge (CL.sourceList [10, 20, 30]) (CL.sourceList [6, 4, 20]) $$ CL.take 10
Value 6 -- 10-6 = 4, 6 yielded, 4 goes back to "a"
Value 4 -- 4-4 = 0, both values are fully consumed
Value 20 -- 20-20 = 0, both values are fully consumed
Left 30 -- "b" has no values, but "a" still yielding
[更新]到目前为止,我发现的最好方法是编写类似于以下内容的东西zipSources
:
go (Done ()) (HaveOutput src close y) = HaveOutput (go (Done ()) src) close (Nothing, Just y)
go (HaveOutput src close x) (Done ()) = HaveOutput (go src (Done ())) close (Just x, Nothing)
这是正确的方法吗?
我最终这样做:
data MergedValue a v b = BackL a v | MergedValue v | BackR v b
data JoinResult a v b = LeftoverL a | JoinValue v | LeftoverR b
joinSources :: Monad m
=> (a -> b -> MergedValue a v b)
-> Source m a
-> Source m b
-> Source m (JoinResult a v b)
joinSources f as bs =
go (newResumableSource as) (newResumableSource bs)
where
go ras rbs = do
(ras', ma) <- lift $ ras $$++ await
(rbs', mb) <- lift $ rbs $$++ await
case (ma, mb) of
(Nothing, Nothing) -> pure ()
(Nothing, Just b) -> yield (LeftoverR b) >> go ras' rbs'
(Just a, Nothing) -> yield (LeftoverL a) >> go ras' rbs'
(Just a, Just b) -> case f a b of
BackL x v -> do
yield (JoinValue v)
(nxt, _) <- lift $ ras' $$++ leftover x
go nxt rbs'
BackR v x -> do
yield (JoinValue v)
(nxt, _) <- lift $ rbs' $$++ leftover x
go ras' nxt
MergedValue v -> yield (JoinValue v) >> go ras' rbs'
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句