Skip to content

Commit

Permalink
airframe-parquet: Scala 3 support (#2211)
Browse files Browse the repository at this point in the history
- AnyCodec now accepts knownSurfaces parameter to resolve Surface from Class[_] at runtime
- Add Resource.use for using loan pattern
- Parquet.newRecordWriter accepts knownSurface parameter to support nested objects
  • Loading branch information
xerial authored Jun 1, 2022
1 parent 90931d9 commit 8ef9535
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ trait MessageCodec[A] extends LogSupport {
} catch {
case e: Throwable => throw unpackError(e)
}
// TODO: Check v.isNull
v.getError match {
case Some(err) =>
throw unpackError(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import wvlet.airframe.msgpack.spi._
import wvlet.airframe.surface.{Primitive, Surface}
import wvlet.airframe.ulid.ULID

import java.util.concurrent.ConcurrentHashMap
import scala.util.Try

/**
Expand All @@ -46,7 +47,7 @@ object PrimitiveCodec {
// JSON types
Surface.of[JSONValue] -> JSONValueCodec,
Surface.of[Json] -> RawJsonCodec,
Surface.of[Any] -> AnyCodec
Surface.of[Any] -> AnyCodec.default
)

val primitiveArrayCodec = Map(
Expand Down Expand Up @@ -872,7 +873,7 @@ object PrimitiveCodec {
override def pack(p: Packer, v: Array[Any]): Unit = {
p.packArrayHeader(v.length)
v.foreach { x =>
AnyCodec.pack(p, x)
AnyCodec.default.pack(p, x)
}
}
override def unpack(
Expand All @@ -884,7 +885,7 @@ object PrimitiveCodec {
val b = Array.newBuilder[Any]
b.sizeHint(len)
(0 until len).foreach { i =>
AnyCodec.unpack(u, v)
AnyCodec.default.unpack(u, v)
if (v.isNull) {
b += null // or report error?
} else {
Expand All @@ -909,14 +910,21 @@ object PrimitiveCodec {
}
}

object AnyCodec {
val default: AnyCodec = new AnyCodec()
}

/**
* Codec for Any values. This only supports very basic types to enable packing/unpacking collections like Seq[Any],
* Map[Any, Any] at ease.
*
* Another option to implement AnyCodec is packing pairs of (type, value), but we will not take this approach as this
* will require many bytes to fully encode type names.
*/
object AnyCodec extends MessageCodec[Any] {
class AnyCodec(knownSurfaces: Seq[Surface] = Seq.empty) extends MessageCodec[Any] {

private val knownSurfaceTable = knownSurfaces.map(s => s.rawType -> s).toMap[Class[_], Surface]

override def pack(p: Packer, v: Any): Unit = {
v match {
case null => p.packNil
Expand Down Expand Up @@ -982,12 +990,18 @@ object PrimitiveCodec {
ThrowableCodec.pack(p, v)
case _ =>
val cl = v.getClass
wvlet.airframe.codec.Compat.codecOfClass(cl) match {
case Some(codec) =>
codec.asInstanceOf[MessageCodec[Any]].pack(p, v)
knownSurfaceTable.get(cl) match {
case Some(surface) =>
val codec = MessageCodec.ofSurface(surface).asInstanceOf[MessageCodec[Any]]
codec.pack(p, v)
case None =>
// Pack as a string for unknown types
StringCodec.pack(p, v.toString)
wvlet.airframe.codec.Compat.codecOfClass(cl) match {
case Some(codec) =>
codec.asInstanceOf[MessageCodec[Any]].pack(p, v)
case None =>
// Pack as a string for unknown types
StringCodec.pack(p, v.toString)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ object ScalaStandardCodec {
case class OptionCodec[A](elementCodec: MessageCodec[A]) extends MessageCodec[Option[A]] {
override def pack(p: Packer, v: Option[A]): Unit = {
v match {
case None => p.packNil
case None =>
p.packNil
case Some(x) =>
elementCodec.pack(p, x)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ trait Resource[A] extends AutoCloseable {
def get: A
def close(): Unit

/**
* Use the resource within the limited scope. After existing the scope, the resource will be closed
* @param body
* @tparam U
* @return
*/
def use[U](body: A => U): U = {
try {
body(get)
} finally {
close()
}
}

/**
* Wrap a Future with this resource. After the future completes, the resource will be closed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ object Parquet extends ParquetCompat with LogSupport {
* Create a Parquet writer that accepts records represented in Map, Array, JSON, MsgPack, etc.
* @param path
* @param schema
* @param knownSurfaces
* surfaces of objects that will be used for wrigin records
* @param hadoopConf
* @param config
* @return
*/
def newRecordWriter(
path: String,
schema: MessageType,
knownSurfaces: Seq[Surface] = Seq.empty,
hadoopConf: Configuration = new Configuration(),
config: ParquetWriterAdapter.RecordWriterBuilder => ParquetWriterAdapter.RecordWriterBuilder =
identity[ParquetWriterAdapter.RecordWriterBuilder](_)
): ParquetWriter[Any] = {
val b = ParquetWriterAdapter.recordWriterBuilder(path, schema, hadoopConf)
val b = ParquetWriterAdapter.recordWriterBuilder(path, schema, knownSurfaces, hadoopConf)
val builder = config(b)
builder.build()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.apache.parquet.schema.{MessageType, Type}
import wvlet.airframe.codec.{JSONCodec, MessageCodec, MessageCodecException}
import wvlet.airframe.codec.PrimitiveCodec.{AnyCodec, ValueCodec}
import wvlet.airframe.json.JSONParseException
import wvlet.airframe.msgpack.spi.Value.{ArrayValue, BinaryValue, MapValue, StringValue}
import wvlet.airframe.msgpack.spi.Value.{ArrayValue, BinaryValue, MapValue, NilValue, StringValue}
import wvlet.airframe.msgpack.spi.{Code, MessagePack, MsgPack, Value}
import wvlet.airframe.surface.{CName, Parameter, Surface}
import wvlet.log.LogSupport
Expand Down Expand Up @@ -111,7 +111,7 @@ class ParquetSeqWriter(elementCodec: ParquetWriteCodec) extends ParquetWriteCode
}
case _ =>
// Write unknown value as binary
val msgpack = AnyCodec.toMsgPack(v)
val msgpack = AnyCodec.default.toMsgPack(v)
recordConsumer.addBinary(Binary.fromConstantByteArray(msgpack))
}
}
Expand Down Expand Up @@ -185,6 +185,8 @@ case class ParquetObjectWriter(paramWriters: Seq[ParquetFieldWriter], params: Se
parquetCodecTable.get(columnName) match {
case Some(parameterCodec) =>
v match {
case NilValue =>
// skip
case m: MapValue if m.isEmpty =>
// skip
case a: ArrayValue if a.isEmpty =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,28 @@ package wvlet.airframe.parquet

import org.apache.parquet.io.api.RecordConsumer
import org.apache.parquet.schema.MessageType
import wvlet.airframe.codec.PrimitiveCodec.ValueCodec
import wvlet.airframe.codec.PrimitiveCodec.{AnyCodec, ValueCodec}
import wvlet.airframe.codec.{MessageCodec, MessageCodecException}
import wvlet.airframe.surface.Surface
import wvlet.log.LogSupport

/**
* Adjust any input objects into the shape of the Parquet schema
*
* @param schema
*/
class ParquetRecordWriter(schema: MessageType) extends LogSupport {
class ParquetRecordWriter(schema: MessageType, knownSurfaces: Seq[Surface] = Seq.empty) extends LogSupport {
private val parquetCodec: ParquetWriteCodec = {
val surface = ParquetSchema.buildSurfaceFromParquetSchema(schema)
ParquetWriteCodec.parquetCodecOf(schema, surface, ValueCodec).asRoot
}

private val anyCodec = MessageCodec.of[Any]
private val codec = new AnyCodec(knownSurfaces)

def pack(obj: Any, recordConsumer: RecordConsumer): Unit = {
val msgpack =
try {
anyCodec.toMsgPack(obj)
codec.toMsgPack(obj)
} catch {
case e: MessageCodecException =>
throw new IllegalArgumentException(s"Cannot convert the input into MsgPack: ${obj}", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,15 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.{MessageType, Type}
import org.apache.parquet.schema.Type.Repetition
import wvlet.airframe.codec.MessageCodec
import wvlet.airframe.codec.PrimitiveCodec.{BooleanCodec, DoubleCodec, FloatCodec, IntCodec, LongCodec, StringCodec}
import wvlet.airframe.codec.PrimitiveCodec.{
BooleanCodec,
DoubleCodec,
FloatCodec,
IntCodec,
LongCodec,
StringCodec,
ValueCodec
}
import wvlet.airframe.msgpack.spi.MsgPack
import wvlet.airframe.surface.Surface
import wvlet.log.LogSupport
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,23 @@ object ParquetWriterAdapter extends LogSupport {
}
}

class RecordWriterBuilder(schema: MessageType, file: OutputFile)
class RecordWriterBuilder(schema: MessageType, file: OutputFile, knownSurfaces: Seq[Surface])
extends ParquetWriter.Builder[Any, RecordWriterBuilder](file: OutputFile) {
override def self(): RecordWriterBuilder = this
override def getWriteSupport(conf: Configuration): WriteSupport[Any] = {
new ParquetRecordWriterSupportAdapter(schema)
new ParquetRecordWriterSupportAdapter(schema, knownSurfaces)
}
}

def recordWriterBuilder(path: String, schema: MessageType, conf: Configuration): RecordWriterBuilder = {
def recordWriterBuilder(
path: String,
schema: MessageType,
knownSurfaces: Seq[Surface],
conf: Configuration
): RecordWriterBuilder = {
val fsPath = new Path(path)
val file = HadoopOutputFile.fromPath(fsPath, conf)
val b = new RecordWriterBuilder(schema, file).withConf(conf)
val b = new RecordWriterBuilder(schema, file, knownSurfaces).withConf(conf)
// Use snappy by default
b.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
Expand Down Expand Up @@ -90,7 +95,9 @@ class ParquetWriteSupportAdapter[A](surface: Surface) extends WriteSupport[A] wi
}
}

class ParquetRecordWriterSupportAdapter(schema: MessageType) extends WriteSupport[Any] with LogSupport {
class ParquetRecordWriterSupportAdapter(schema: MessageType, knownSurfaces: Seq[Surface])
extends WriteSupport[Any]
with LogSupport {
private var recordConsumer: RecordConsumer = null

override def init(configuration: Configuration): WriteContext = {
Expand All @@ -102,7 +109,7 @@ class ParquetRecordWriterSupportAdapter(schema: MessageType) extends WriteSuppor
this.recordConsumer = recordConsumer
}

private val codec = new ParquetRecordWriter(schema)
private val codec = new ParquetRecordWriter(schema, knownSurfaces)

override def write(record: Any): Unit = {
require(recordConsumer != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class RecordBuilderImpl extends RecordBuilder with LogSupport {
}
}
}

def toMap: Map[String, Any] = {
holder
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package wvlet.airframe.parquet

import wvlet.airframe.control.Control.withResource
import wvlet.airframe.control.Resource
import wvlet.airframe.msgpack.spi.{Value, ValueFactory}
import wvlet.airframe.surface.Surface
import wvlet.airframe.ulid.ULID
Expand Down Expand Up @@ -42,7 +43,7 @@ object NestedRecordWriteTest extends AirSpec {
debug(s"write target schema: ${schema}")

IOUtil.withTempFile("target/tmp-nested-record", ".parquet") { file =>
withResource(Parquet.newRecordWriter(file.getPath, schema)) { writer =>
withResource(Parquet.newRecordWriter(file.getPath, schema, knownSurfaces = Seq(Surface.of[ColStats]))) { writer =>
writer.write(m1)
writer.write(m2)
}
Expand Down Expand Up @@ -109,7 +110,13 @@ object NestedRecordWriteTest extends AirSpec {
test("write records with Option/Seq using record writer") {
val p0 = Partition(id = ULID.newULID, metadata = Map("xxx" -> "yyy"))
IOUtil.withTempFile("target/tmp-nested-opt-record", ".parquet") { file =>
withResource(Parquet.newRecordWriter(file.getPath, Parquet.toParquetSchema(Surface.of[Partition]))) { writer =>
withResource(
Parquet.newRecordWriter(
file.getPath,
Parquet.toParquetSchema(Surface.of[Partition]),
knownSurfaces = Seq(Surface.of[Partition])
)
) { writer =>
writer.write(p0)
}
withResource(Parquet.newReader[Partition](file.getPath)) { reader =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object ParquetRecordWriterTest extends AirSpec {
writer.write(Array(2, "yui"))
writer.write("""{"id":3, "name":"aina"}""")
writer.write("""[4, "ruri"]""")
writer.write(AnyCodec.toMsgPack(Map("id" -> 5, "name" -> "xxx")))
writer.write(AnyCodec.default.toMsgPack(Map("id" -> 5, "name" -> "xxx")))
}

withResource(Parquet.newReader[Map[String, Any]](file.getPath)) { reader =>
Expand Down Expand Up @@ -86,10 +86,11 @@ object ParquetRecordWriterTest extends AirSpec {

test("write records with Option") {
IOUtil.withTempFile("target/tmp-record-opt", ".parquet") { file =>
withResource(Parquet.newRecordWriter(file.getPath, schema2)) { writer =>
writer.write(RecordOpt(1, Some(1)))
writer.write(RecordOpt(2, None))
writer.write("""{"id":"3"}""")
withResource(Parquet.newRecordWriter(file.getPath, schema2, knownSurfaces = Seq(Surface.of[RecordOpt]))) {
writer =>
writer.write(RecordOpt(1, Some(1)))
writer.write(RecordOpt(2, None))
writer.write("""{"id":"3"}""")
}

withResource(Parquet.newReader[Map[String, Any]](file.getPath)) { reader =>
Expand Down
21 changes: 17 additions & 4 deletions docs/airframe-parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ val schema = new MessageType(
Types.optional(PrimitiveTypeName.BINARY).as(stringType).named("name")
)
// Create a record writer for the given schema
val recordWriter = Parquet.newRecordWriter(path = "record.parquet", schema)
val recordWriter = Parquet.newRecordWriter(path = "record.parquet", schema = schema)
// Write a record using Map (column name -> value)
recordWriter.write(Map("id" -> 1, "name" -> "leo"))
// Write a record using JSON object
Expand All @@ -75,10 +75,23 @@ recordWriter.write("""{"id":2, "name":"yui"}""")
recordWriter.write(Seq(3, "aina"))
// Write a record using JSON array
recordWriter.write("""[4, "xxx"]""")
// You can use case classes as input as well
recordWriter.write(MyEntry(5, "yyy"))

recordWriter.close()


// In case you need to write dynamic recoreds containing case classes,
// register the Surfaces of these classes
case class Nested(id:Int, entry:MyEntry)
val nestedRecordWriter = Parquet.newRecordWriter(
path = "nested.parquet",
// You can build a Parquet schema matching to Surface
schema = Parquet.toParquetSchema(Surface.of[Nested]),
knownSurfaces = Seq(Surface.of[MyEntry]) // required to serialize MyEntry
)

// Write dynamic records
nestedRecordWriter.write(Map("id" -> 1, "entry" -> MyEntry(1, "yyy"))
nestedRecordWriter.write(Map("id" -> 2, "entry" -> MyEntry(2, "zzz"))
nestedRecordWriter.close()
```

### Using with AWS S3
Expand Down

0 comments on commit 8ef9535

Please sign in to comment.