Skip to content

Commit

Permalink
WIP split
Browse files Browse the repository at this point in the history
  • Loading branch information
turion committed Jan 6, 2025
1 parent 71111a0 commit d350ac8
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 18 deletions.
14 changes: 10 additions & 4 deletions automaton/src/Data/Automaton.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,12 @@ import Data.These (these)
import Witherable (Filterable (..))

-- automaton
import Data.Stream (StreamT (..))
import Data.Stream (StreamT (..), hoist', runTraversableS, snapshotCompose)
import Data.Stream.Internal (JointState (..))
import Data.Stream.Optimized (
OptimizedStreamT (..),
catMaybeS,
concatS,
hoist',
stepOptimizedStream,
)
import Data.Stream.Optimized qualified as StreamOptimized
Expand Down Expand Up @@ -377,8 +376,8 @@ withAutomaton f = Automaton . StreamOptimized.mapOptimizedStreamT (ReaderT . f .
{-# INLINE withAutomaton #-}

instance (Functor m) => Profunctor (Automaton m) where
dimap f g Automaton {getAutomaton} = Automaton $ g <$> hoist' (withReaderT f) getAutomaton
lmap f Automaton {getAutomaton} = Automaton $ hoist' (withReaderT f) getAutomaton
dimap f g Automaton {getAutomaton} = Automaton $ g <$> StreamOptimized.hoist' (withReaderT f) getAutomaton
lmap f Automaton {getAutomaton} = Automaton $ StreamOptimized.hoist' (withReaderT f) getAutomaton
rmap = fmap

instance (Applicative m) => Choice (Automaton m) where
Expand Down Expand Up @@ -452,6 +451,7 @@ traverseS = traverse'
traverseS_ :: (Monad m, Traversable f) => Automaton m a b -> Automaton m (f a) ()
traverseS_ automaton = traverse' automaton >>> arr (const ())

-- FIXME It's also conceivable to have Automaton (Compose m t) a b -> Automaton m a (t b)
-- TODO But should we use parallelism?
-- https://hackage.haskell.org/package/parallel-3.1.0.1/docs/Control-Parallel-Strategies.html#v:parTraversable

Expand Down Expand Up @@ -564,6 +564,12 @@ then the next 9 inputs will be ignored.
concatS :: (Monad m) => Automaton m a [b] -> Automaton m a b
concatS (Automaton automaton) = Automaton $ Data.Stream.Optimized.concatS automaton

runTraversableS :: (Monad m, Traversable t, Monad t) => Automaton (Compose m t) a b -> Automaton m a (t b)
runTraversableS = handleAutomaton $ Data.Stream.runTraversableS . Data.Stream.hoist' (Compose . ReaderT . fmap getCompose . runReaderT)

snapshot :: Functor m => Automaton m a b -> Automaton m a (m b)
snapshot = handleAutomaton $ hoist' (ReaderT . getCompose) . Data.Stream.snapshotCompose . hoist' (Compose . runReaderT)

-- * Examples

-- | Pass through a value unchanged, and perform a side effect depending on it
Expand Down
59 changes: 46 additions & 13 deletions automaton/src/Data/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ module Data.Stream where

-- base
import Control.Applicative (Alternative (..), Applicative (..), liftA2)
import Control.Monad (forM, (<$!>))
import Control.Monad (join, (<$!>))
import Data.Bifunctor (bimap)
import Data.Function ((&))
import Data.Functor ((<&>))
import Data.Functor.Compose (Compose (..))
import Data.Monoid (Ap (..))
import Prelude hiding (Applicative (..))
Expand Down Expand Up @@ -34,6 +35,9 @@ import Data.Align
import Witherable (Filterable (..), Witherable)

-- automaton

import Control.Arrow ((>>>))
import Control.Monad.Trans.Maybe (MaybeT (..))
import Data.Stream.Internal
import Data.Stream.Recursive (Recursive (..))
import Data.Stream.Result
Expand Down Expand Up @@ -524,33 +528,62 @@ fixA StreamT {state, step} = fixStream (JointState state) $

-- FIXME Generalisation in []
runListS :: (Monad m) => StreamT (Compose m []) a -> StreamT m [a]
runListS StreamT {state, step} =
runListS = runTraversableS

runTraversableS :: (Monad m, Traversable t, Monad t) => StreamT (Compose m t) a -> StreamT m (t a)
runTraversableS StreamT {state, step} =
StreamT
{ state = [state]
{ state = pure state
, step = \states -> do
results <- forM states $ getCompose . step
let flatResults = concat results
return $ Result (resultState <$> flatResults) (output <$> flatResults)
results <- traverse (getCompose . step) states
return $ unzipResult $ join results
}

-- FIXME maybe rewrite with Iso somehow?
handleCompose :: (Functor f, Monad m, Monad composed) => (forall s. s -> f s) -> (forall x. composed x -> m (f x)) -> (forall x. m (f x) -> composed x) -> StreamT composed a -> StreamT m (f a)
handleCompose :: (Functor f, Applicative m, Monad composed) => (forall s. s -> f s) -> (forall x. composed x -> m (f x)) -> (forall x. m (f x) -> composed x) -> StreamT composed a -> StreamT m (f a)
handleCompose pure_ uncompose compose StreamT {state, step} =
StreamT
{ state = pure_ state
, step = \s -> do
results <- uncompose $ do
states <- compose $ pure s
step states
return $! Result (fmap resultState results) (fmap output results)
, step = \s ->
uncompose (compose (pure s) >>= step) <&>
(\results -> Result (fmap resultState results) (fmap output results))
}

-- FIXME all these should go to a separate module
handleExceptT :: (Monad m) => StreamT (ExceptT e m) a -> StreamT m (Either e a)
handleExceptT = handleCompose pure runExceptT ExceptT

-- FIXME handleMaybeT
-- handleExceptT' :: (Monad m) => StreamT (ExceptT e m) a -> StreamT m (Either e a)
-- handleExceptT' = hoist' _ . snapshotCompose . hoist (Compose . runExceptT)

handleMaybeT :: (Monad m) => StreamT (MaybeT m) a -> StreamT m (Maybe a)
handleMaybeT = handleCompose pure runMaybeT MaybeT

{- | Snapshot part of the side effect that was performed at this step.
-}
snapshotCompose :: (Functor m, Functor f) => StreamT (Compose m f) a -> StreamT (Compose m f) (f a)
snapshotCompose StreamT {state, step} =
StreamT
{ state
, step =
step
>>> getCompose
>>> fmap (\result -> flip Result (output <$> result) . resultState <$> result)
>>> Compose
}

-- snapshotCompose' :: (Monad m, Functor f) => StreamT (Compose m f) a -> StreamT m (f a)
-- snapshotCompose' StreamT {state, step} =
-- StreamT
-- { state = pure state
-- , step = pure
-- >>> Compose
-- >>> _
-- }


{- | Snapshot the side effect that was performed at this step.
-}
snapshot :: (Functor m) => StreamT m a -> StreamT m (m a)
snapshot StreamT {state, step} =
StreamT
Expand Down
14 changes: 13 additions & 1 deletion automaton/test/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ import Test.Tasty.HUnit (testCase, (@?=))

-- automaton
import Automaton
import Data.Stream (streamToList, unfold)
import Data.Stream (streamToList, unfold, unfold_, mmap, handleExceptT, handleCompose, snapshotCompose, hoist')
import Data.Stream.Result
import Control.Monad.Trans.Except (throwE)
import Control.Monad (when)
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as NonEmpty
import Data.Functor.Compose (Compose(..))

tests =
testGroup
Expand All @@ -32,4 +37,11 @@ tests =
automaton2 = unfold 1 (\n -> Result (n + 2) (* n))
in take 10 (runIdentity (streamToList (automaton1 <*? automaton2))) @?= [0, 1, 2, 9, 4, 25, 6, 49, 8, 81]
]
, testCase
"handleExceptT" $ let exceptionAfter2 = mmap (\n -> when (n == 2) $ throwE ()) $ unfold_ 0 (+1)
in take 5 (runIdentity (streamToList (handleExceptT exceptionAfter2))) @?= [Right (),Left (),Left (),Left (),Left ()]
, testCase
"snapshotCompose" $ let asManyAsN = hoist' (Compose . Identity) $ mmap (\n -> NonEmpty.fromList [0..n]) $ unfold_ 0 (+1)
in take 5 (runIdentity (streamToList (hoist' (fmap NonEmpty.head . getCompose) (snapshotCompose asManyAsN)))) @?= [0 :| [1],0 :| [1,2],0 :| [1,2,3],0 :| [1,2,3,4],0 :| [1,2,3,4,5]]

]

0 comments on commit d350ac8

Please sign in to comment.