Skip to content

Commit

Permalink
Add separate writeChunk/writeStream methods
Browse files Browse the repository at this point in the history
  • Loading branch information
grouzen committed Dec 12, 2023
1 parent f1a42f7 commit d424c64
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ import org.apache.parquet.io.OutputFile
import org.apache.parquet.schema.{ MessageType, Type }
import zio._
import zio.schema.Schema
import zio.stream._

trait ParquetWriter[-A <: Product] {

def write(data: Chunk[A]): Task[Unit]
def writeChunk(data: Chunk[A]): Task[Unit]

def writeStream[R](data: ZStream[R, Throwable, A]): RIO[R, Unit]

def close: Task[Unit]

Expand All @@ -25,14 +28,22 @@ final class ParquetWriterLive[A <: Product](
)(implicit encoder: ValueEncoder[A])
extends ParquetWriter[A] {

override def write(data: Chunk[A]): Task[Unit] =
override def writeChunk(data: Chunk[A]): Task[Unit] =
ZIO.foreachDiscard(data) { value =>
for {
record <- encoder.encodeZIO(value)
_ <- ZIO.attemptBlockingIO(underlying.write(record.asInstanceOf[RecordValue]))
} yield ()
}

override def writeStream[R](stream: ZStream[R, Throwable, A]): RIO[R, Unit] =
stream.runForeach { value =>
for {
record <- encoder.encodeZIO(value)
_ <- ZIO.attemptBlockingIO(underlying.write(record.asInstanceOf[RecordValue]))
} yield ()
}

override def close: Task[Unit] =
ZIO.attemptBlockingIO(underlying.close())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package me.mnedokushev.zio.apache.parquet.core.hadoop
import me.mnedokushev.zio.apache.parquet.core.codec._
import zio._
import zio.schema._
import zio.stream._
import zio.test.TestAspect._
import zio.test._

Expand All @@ -29,7 +30,21 @@ object ParquetIOSpec extends ZIOSpecDefault {

override def spec: Spec[TestEnvironment with Scope, Any] =
suite("ParquetIOSpec")(
test("write and read") {
test("write and read - chunk") {
val payload = Chunk(
Record(1, "foo", None, List(1, 2), Map("first" -> 1, "second" -> 2)),
Record(2, "bar", Some(3L), List.empty, Map("third" -> 3))
)

for {
writer <- ZIO.service[ParquetWriter[Record]]
reader <- ZIO.service[ParquetReader[Record]]
_ <- writer.writeChunk(payload)
_ <- writer.close // force to flush parquet data on disk
result <- ZIO.scoped[Any](reader.readChunk(tmpPath))
} yield assertTrue(result == payload)
} @@ after(cleanTmpFile(tmpDir)),
test("write and read - stream") {
val payload = Chunk(
Record(1, "foo", None, List(1, 2), Map("first" -> 1, "second" -> 2)),
Record(2, "bar", Some(3L), List.empty, Map("third" -> 3))
Expand All @@ -38,16 +53,15 @@ object ParquetIOSpec extends ZIOSpecDefault {
for {
writer <- ZIO.service[ParquetWriter[Record]]
reader <- ZIO.service[ParquetReader[Record]]
_ <- writer.write(payload)
_ <- writer.writeStream(ZStream.fromChunk(payload))
_ <- writer.close // force to flush parquet data on disk
resultStream <- ZIO.scoped[Any](reader.readStream(tmpPath).runCollect)
resultChunk <- ZIO.scoped[Any](reader.readChunk(tmpPath))
} yield assertTrue(resultStream == payload, resultChunk == payload)
}.provide(
ParquetWriter.configured[Record](tmpPath),
ParquetReader.configured[Record]()
) @@ after(cleanTmpFile(tmpDir))
)
} yield assertTrue(resultStream == payload)
} @@ after(cleanTmpFile(tmpDir))
).provide(
ParquetWriter.configured[Record](tmpPath),
ParquetReader.configured[Record]()
) @@ sequential

private def cleanTmpFile(path: Path) =
for {
Expand Down

0 comments on commit d424c64

Please sign in to comment.