Here a simplification of my code:
import Database.PostgreSQL.Simple (Connection)
import qualified Streaming.Prelude as S
import Streaming.ByteString.Char8 as C
import Streaming.Zip (gunzip)
import Streaming
main :: IO ()
main = do
res <- runResourceT $ calculateA myLinesStream
return ()
type MyLinesStream m r = S.Stream (S.Of String) m r
connect :: IO Connection
connect = undefined
close :: Connection -> IO ()
close = undefined
calculateA :: MonadIO m => MyLinesStream m r -> m ()
calculateA stream = liftIO (bracket connect close (go stream))
where
go :: MonadIO m => MyLinesStream m r -> Connection -> m ()
go stream conn = stream & S.length_ >>= liftIO . print
myLinesStream :: (MonadIO m, MonadResource m) => MyLinesStream m ()
myLinesStream = do
S.each ["1.zip", "2.zip"]
& S.mapM (\fileName -> C.readFile fileName & gunzip)
& S.mconcat
& C.lines
& mapsM (S.toList . C.unpack)
& void
There is a type error on the following line on the go stream:
calculateA stream = liftIO (bracket connect close (go stream))
The error says:
Couldn't match type ‘m’ with ‘IO’
‘m’ is a rigid type variable bound by
the type signature for:
calculateA :: forall (m :: * -> *) r.
MonadIO m =>
MyLinesStream m r -> m ()
Expected type: Connection -> IO ()
Actual type: Connection -> m ()
Questions
- What to do to make this code typecheck and still make it secure for releasing resources in the
calculateAfunction? - I'm reading multiple files using
C.readFileand then wrapping it insiderunResourceT. Will this properly release all the file handles? - Is the composition good? (Note that I need the
calculateAfunction separately from themyLinesStream)
The problem is that you are trying to use
bracketwith a monad that is too general.brackethas signature:It takes
IOactions as parameters. However, thegofunction that you pass tobracketneeds to work in a generic base monadmchosen by the caller ofcalculateA(you later make that monad beResourceT IOinmain).The
bracketfrom base andResourceTdon't mix very well. Instead, you need to turn to special functions from the resourcet package likeallocateandrelease, and use them to define a helper like:How does it work? If you have a stream of
Xs, it prepends an allocation action at the beginning of the stream (registering the corresponding cleanup action to be called in case of abnormal termination, like exceptions) and it also adds an explicit call to the cleanup action when the stream is exhausted:You wrote:
Yes. With
ResourceT, resources are freed either when an explicit cleanup action is performed, or when when we "exit" theResourceTwithrunResourceT(perhaps abnormally, with an exception).So, if we read a stream of Xs followed by a stream of Ys, we would have:
That is, the resource that produces Xs would be released before allocating the resource that produces Ys.