Skip to content

Commit

Permalink
Merge pull request #221 from Gabriel439/gabriel/restore_2
Browse files Browse the repository at this point in the history
Correctly unmask exceptions when running subprocesses
  • Loading branch information
Gabriella439 authored Mar 12, 2017
2 parents 0a04c7a + 5f747ae commit 7f12314
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 17 deletions.
32 changes: 25 additions & 7 deletions src/Turtle/Bytes.hs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,9 @@ system p s = liftIO (do
feedIn restore =
restore (ignoreSIGPIPE (outhandle hIn s))
`Exception.finally` close hIn
Exception.mask_ (Async.withAsyncWithUnmask feedIn (\a -> Process.waitForProcess ph <* halt a) )
Exception.mask (\restore ->
Async.withAsync (feedIn restore) (\a ->
restore (Process.waitForProcess ph) <* halt a ) )
handle (Nothing , ph) = do
Process.waitForProcess ph

Expand Down Expand Up @@ -390,7 +392,9 @@ systemStrict p s = liftIO (do
`Exception.finally` close hIn

Async.concurrently
(Exception.mask_ (Async.withAsyncWithUnmask feedIn (\a -> liftIO (Process.waitForProcess ph) <* halt a)))
(Exception.mask (\restore ->
Async.withAsync (feedIn restore) (\a ->
restore (Process.waitForProcess ph) <* halt a ) ))
(Data.ByteString.hGetContents hOut) ) )

systemStrictWithErr
Expand Down Expand Up @@ -428,7 +432,9 @@ systemStrictWithErr p s = liftIO (do
`Exception.finally` close hIn

runConcurrently $ (,,)
<$> Concurrently (Exception.mask_ (Async.withAsyncWithUnmask feedIn (\a -> liftIO (Process.waitForProcess ph) <* halt a)))
<$> Concurrently (Exception.mask (\restore ->
Async.withAsync (feedIn restore) (\a ->
restore (Process.waitForProcess ph) <* halt a ) ))
<*> Concurrently (Data.ByteString.hGetContents hOut)
<*> Concurrently (Data.ByteString.hGetContents hErr) ) )

Expand Down Expand Up @@ -499,7 +505,10 @@ stream p s = do
liftIO (Data.ByteString.hPut hIn bytes) ) )
`Exception.finally` close hIn

a <- using (Managed.managed (Exception.mask_ . Async.withAsyncWithUnmask feedIn))
a <- using
(Managed.managed (\k ->
Exception.mask (\restore ->
Async.withAsync (feedIn restore) k ) ))
inhandle hOut <|> (liftIO (Process.waitForProcess ph *> halt a) *> empty)

streamWithErr
Expand Down Expand Up @@ -563,9 +572,18 @@ streamWithErr p s = do
x1 <- loop x0 (0 :: Int)
done x1 )

a <- using (Managed.managed (Exception.mask_ . Async.withAsyncWithUnmask feedIn ))
b <- using (Managed.managed (Exception.mask_ . Async.withAsyncWithUnmask forwardOut))
c <- using (Managed.managed (Exception.mask_ . Async.withAsyncWithUnmask forwardErr))
a <- using
(Managed.managed (\k ->
Exception.mask (\restore ->
Async.withAsync (feedIn restore) k ) ))
b <- using
(Managed.managed (\k ->
Exception.mask (\restore ->
Async.withAsync (forwardOut restore) k ) ))
c <- using
(Managed.managed (\k ->
Exception.mask (\restore ->
Async.withAsync (forwardErr restore) k ) ))
let l `also` r = do
_ <- l <|> (r *> STM.retry)
_ <- r
Expand Down
32 changes: 23 additions & 9 deletions src/Turtle/Prelude.hs
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,13 @@ module Turtle.Prelude (
import Control.Applicative
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
(Async, withAsync, withAsyncWithUnmask, waitSTM, concurrently,
(Async, withAsync, waitSTM, concurrently,
Concurrently(..))
import qualified Control.Concurrent.Async
import Control.Concurrent.MVar (newMVar, modifyMVar_)
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TQueue as TQueue
import Control.Exception (Exception, bracket, bracket_, finally, mask_, throwIO)
import Control.Exception (Exception, bracket, bracket_, finally, mask, throwIO)
import Control.Foldl (Fold, FoldM(..), genericLength, handles, list, premap)
import qualified Control.Foldl
import qualified Control.Foldl.Text
Expand Down Expand Up @@ -536,7 +536,9 @@ system p s = liftIO (do
let feedIn :: (forall a. IO a -> IO a) -> IO ()
feedIn restore =
restore (ignoreSIGPIPE (outhandle hIn s)) `finally` close hIn
mask_ (withAsyncWithUnmask feedIn (\a -> Process.waitForProcess ph <* halt a) )
mask (\restore ->
withAsync (feedIn restore) (\a ->
restore (Process.waitForProcess ph) `finally` halt a) )
handle (Nothing , ph) = do
Process.waitForProcess ph

Expand Down Expand Up @@ -575,7 +577,9 @@ systemStrict p s = liftIO (do
restore (ignoreSIGPIPE (outhandle hIn s)) `finally` close hIn

concurrently
(mask_ (withAsyncWithUnmask feedIn (\a -> liftIO (Process.waitForProcess ph) <* halt a)))
(mask (\restore ->
withAsync (feedIn restore) (\a ->
restore (liftIO (Process.waitForProcess ph)) `finally` halt a ) ))
(Text.hGetContents hOut) ) )

systemStrictWithErr
Expand Down Expand Up @@ -611,7 +615,9 @@ systemStrictWithErr p s = liftIO (do
restore (ignoreSIGPIPE (outhandle hIn s)) `finally` close hIn

runConcurrently $ (,,)
<$> Concurrently (mask_ (withAsyncWithUnmask feedIn (\a -> liftIO (Process.waitForProcess ph) <* halt a)))
<$> Concurrently (mask (\restore ->
withAsync (feedIn restore) (\a ->
restore (liftIO (Process.waitForProcess ph)) `finally` halt a ) ))
<*> Concurrently (Text.hGetContents hOut)
<*> Concurrently (Text.hGetContents hErr) ) )

Expand Down Expand Up @@ -676,7 +682,9 @@ stream p s = do
let feedIn :: (forall a. IO a -> IO a) -> IO ()
feedIn restore = restore (outhandle hIn s) `finally` close hIn

a <- using (managed (mask_ . withAsyncWithUnmask feedIn))
a <- using
(managed (\k ->
mask (\restore -> withAsync (feedIn restore) (restore . k))))
inhandle hOut <|> (liftIO (Process.waitForProcess ph *> halt a) *> empty)

streamWithErr
Expand Down Expand Up @@ -736,9 +744,15 @@ streamWithErr p s = do
x1 <- loop x0 (0 :: Int)
done x1 )

a <- using (managed (mask_ . withAsyncWithUnmask feedIn ))
b <- using (managed (mask_ . withAsyncWithUnmask forwardOut))
c <- using (managed (mask_ . withAsyncWithUnmask forwardErr))
a <- using
(managed (\k ->
mask (\restore -> withAsync (feedIn restore) (restore . k)) ))
b <- using
(managed (\k ->
mask (\restore -> withAsync (forwardOut restore) (restore . k)) ))
c <- using
(managed (\k ->
mask (\restore -> withAsync (forwardErr restore) (restore . k)) ))
let l `also` r = do
_ <- l <|> (r *> STM.retry)
_ <- r
Expand Down
12 changes: 12 additions & 0 deletions test/RegressionMaskingException.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{-# LANGUAGE OverloadedStrings #-}

import Turtle

import qualified System.Timeout

-- This test fails by hanging
main :: IO ()
main = runManaged (do
_ <- fork (shells "while true; do sleep 1; done" empty)
sleep 1
return () )
12 changes: 11 additions & 1 deletion turtle.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,17 @@ test-suite regression-broken-pipe
GHC-Options: -Wall -threaded
Default-Language: Haskell2010
Build-Depends:
base >= 4 && < 5 ,
base >= 4 && < 5,
turtle

test-suite regression-masking-exception
Type: exitcode-stdio-1.0
HS-Source-Dirs: test
Main-Is: RegressionMaskingException.hs
GHC-Options: -Wall -threaded
Default-Language: Haskell2010
Build-Depends:
base >= 4 && < 5,
turtle

benchmark bench
Expand Down

0 comments on commit 7f12314

Please sign in to comment.