Skip to content

Commit

Permalink
Merge pull request zio#40 from regiskuckaertz/rk-partraverse
Browse files Browse the repository at this point in the history
Add IO.parTraverse
  • Loading branch information
jdegoes authored Jun 21, 2018
2 parents a8dd7c3 + cc55bc3 commit 329a19e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
8 changes: 8 additions & 0 deletions core/jvm/src/test/scala/scalaz/zio/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class IOSpec extends Specification with GenIO with RTS with ScalaCheck {
`IO.traverse` returns the list of Ints in the same order. $t2
Create a list of String and pass an f: String => IO[String, Int]:
`IO.traverse` fails with a NumberFormatException exception. $t3
Create a list of Strings and pass an f: String => IO[String, Int]:
`IO.parTraverse` returns the list of Ints in any order. $t4
"""

def functionIOGen: Gen[String => IO[Throwable, Int]] =
Expand All @@ -42,4 +44,10 @@ class IOSpec extends Specification with GenIO with RTS with ScalaCheck {
res must beAFailedTry.withThrowable[NumberFormatException]
}

def t4 = {
val list = List("1", "2", "3")
val res = unsafePerformIO(IO.parTraverse(list)(x => IO.point[String, Int](x.toInt)))
res must containTheSameElementsAs(List(1, 2, 3))
}

}
23 changes: 23 additions & 0 deletions core/shared/src/main/scala/scalaz/zio/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package scalaz.zio

import scala.annotation.switch
import scala.annotation.tailrec
import scala.concurrent.duration._
import Errors._

Expand Down Expand Up @@ -792,6 +793,28 @@ object IO {
in.foldLeft(point[E, mutable.Builder[B, M[B]]](cbf(in)))((io, b) => io.zipWith(fn(b))(_ += _))
.map(_.result())

/**
* Evaluate the elements of a traversable data structure in parallel
* and collect the results.
*
* _Note_: ordering in the input collection is not preserved
*/
def parTraverse[E, A, B, M[X] <: TraversableOnce[X]](
in: M[A]
)(fn: A => IO[E, B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): IO[E, M[B]] = {
@tailrec def parTraverse_rec(as: Iterator[A], ioref: IO[E, IORef[List[B]]]): IO[E, IORef[List[B]]] =
if (!as.hasNext)
ioref
else {
val a = as.next
parTraverse_rec(as, ioref.par(fn(a)).flatMap { case (ref, b) => ref.modify(b :: _) *> point(ref) })
}

parTraverse_rec(in.toIterator, IORef[E, List[B]](Nil))
.flatMap(_.read)
.map(_.foldLeft(cbf(in))(_ += _).result())
}

/**
* Evaluate each effect in the structure from left to right, and collect
* the results.
Expand Down

0 comments on commit 329a19e

Please sign in to comment.