diff --git a/airframe-codec/src/main/scala/wvlet/airframe/codec/MessageCodec.scala b/airframe-codec/src/main/scala/wvlet/airframe/codec/MessageCodec.scala index 6060aab031..b5dd4cd6bf 100644 --- a/airframe-codec/src/main/scala/wvlet/airframe/codec/MessageCodec.scala +++ b/airframe-codec/src/main/scala/wvlet/airframe/codec/MessageCodec.scala @@ -41,6 +41,7 @@ trait MessageCodec[A] extends LogSupport { } catch { case e: Throwable => throw unpackError(e) } + // TODO: Check v.isNull v.getError match { case Some(err) => throw unpackError(err) diff --git a/airframe-codec/src/main/scala/wvlet/airframe/codec/PrimitiveCodec.scala b/airframe-codec/src/main/scala/wvlet/airframe/codec/PrimitiveCodec.scala index d8cc633a49..c7a0c8293a 100644 --- a/airframe-codec/src/main/scala/wvlet/airframe/codec/PrimitiveCodec.scala +++ b/airframe-codec/src/main/scala/wvlet/airframe/codec/PrimitiveCodec.scala @@ -22,6 +22,7 @@ import wvlet.airframe.msgpack.spi._ import wvlet.airframe.surface.{Primitive, Surface} import wvlet.airframe.ulid.ULID +import java.util.concurrent.ConcurrentHashMap import scala.util.Try /** @@ -46,7 +47,7 @@ object PrimitiveCodec { // JSON types Surface.of[JSONValue] -> JSONValueCodec, Surface.of[Json] -> RawJsonCodec, - Surface.of[Any] -> AnyCodec + Surface.of[Any] -> AnyCodec.default ) val primitiveArrayCodec = Map( @@ -872,7 +873,7 @@ object PrimitiveCodec { override def pack(p: Packer, v: Array[Any]): Unit = { p.packArrayHeader(v.length) v.foreach { x => - AnyCodec.pack(p, x) + AnyCodec.default.pack(p, x) } } override def unpack( @@ -884,7 +885,7 @@ object PrimitiveCodec { val b = Array.newBuilder[Any] b.sizeHint(len) (0 until len).foreach { i => - AnyCodec.unpack(u, v) + AnyCodec.default.unpack(u, v) if (v.isNull) { b += null // or report error? } else { @@ -909,6 +910,10 @@ object PrimitiveCodec { } } + object AnyCodec { + val default: AnyCodec = new AnyCodec() + } + /** * Codec for Any values. This only supports very basic types to enable packing/unpacking collections like Seq[Any], * Map[Any, Any] at ease. @@ -916,7 +921,10 @@ object PrimitiveCodec { * Another option to implement AnyCodec is packing pairs of (type, value), but we will not take this approach as this * will require many bytes to fully encode type names. */ - object AnyCodec extends MessageCodec[Any] { + class AnyCodec(knownSurfaces: Seq[Surface] = Seq.empty) extends MessageCodec[Any] { + + private val knownSurfaceTable = knownSurfaces.map(s => s.rawType -> s).toMap[Class[_], Surface] + override def pack(p: Packer, v: Any): Unit = { v match { case null => p.packNil @@ -982,12 +990,18 @@ object PrimitiveCodec { ThrowableCodec.pack(p, v) case _ => val cl = v.getClass - wvlet.airframe.codec.Compat.codecOfClass(cl) match { - case Some(codec) => - codec.asInstanceOf[MessageCodec[Any]].pack(p, v) + knownSurfaceTable.get(cl) match { + case Some(surface) => + val codec = MessageCodec.ofSurface(surface).asInstanceOf[MessageCodec[Any]] + codec.pack(p, v) case None => - // Pack as a string for unknown types - StringCodec.pack(p, v.toString) + wvlet.airframe.codec.Compat.codecOfClass(cl) match { + case Some(codec) => + codec.asInstanceOf[MessageCodec[Any]].pack(p, v) + case None => + // Pack as a string for unknown types + StringCodec.pack(p, v.toString) + } } } } diff --git a/airframe-codec/src/main/scala/wvlet/airframe/codec/ScalaStandardCodec.scala b/airframe-codec/src/main/scala/wvlet/airframe/codec/ScalaStandardCodec.scala index 61ac4da088..cab0c3c83c 100644 --- a/airframe-codec/src/main/scala/wvlet/airframe/codec/ScalaStandardCodec.scala +++ b/airframe-codec/src/main/scala/wvlet/airframe/codec/ScalaStandardCodec.scala @@ -23,7 +23,8 @@ object ScalaStandardCodec { case class OptionCodec[A](elementCodec: MessageCodec[A]) extends MessageCodec[Option[A]] { override def pack(p: Packer, v: Option[A]): Unit = { v match { - case None => p.packNil + case None => + p.packNil case Some(x) => elementCodec.pack(p, x) } diff --git a/airframe-control/src/main/scala/wvlet/airframe/control/Resource.scala b/airframe-control/src/main/scala/wvlet/airframe/control/Resource.scala index 0a450df5ec..9c1cf973ff 100644 --- a/airframe-control/src/main/scala/wvlet/airframe/control/Resource.scala +++ b/airframe-control/src/main/scala/wvlet/airframe/control/Resource.scala @@ -24,6 +24,20 @@ trait Resource[A] extends AutoCloseable { def get: A def close(): Unit + /** + * Use the resource within the limited scope. After existing the scope, the resource will be closed + * @param body + * @tparam U + * @return + */ + def use[U](body: A => U): U = { + try { + body(get) + } finally { + close() + } + } + /** * Wrap a Future with this resource. After the future completes, the resource will be closed */ diff --git a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/Parquet.scala b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/Parquet.scala index 6f42882260..cce842bf6b 100644 --- a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/Parquet.scala +++ b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/Parquet.scala @@ -16,6 +16,8 @@ object Parquet extends ParquetCompat with LogSupport { * Create a Parquet writer that accepts records represented in Map, Array, JSON, MsgPack, etc. * @param path * @param schema + * @param knownSurfaces + * surfaces of objects that will be used for wrigin records * @param hadoopConf * @param config * @return @@ -23,11 +25,12 @@ object Parquet extends ParquetCompat with LogSupport { def newRecordWriter( path: String, schema: MessageType, + knownSurfaces: Seq[Surface] = Seq.empty, hadoopConf: Configuration = new Configuration(), config: ParquetWriterAdapter.RecordWriterBuilder => ParquetWriterAdapter.RecordWriterBuilder = identity[ParquetWriterAdapter.RecordWriterBuilder](_) ): ParquetWriter[Any] = { - val b = ParquetWriterAdapter.recordWriterBuilder(path, schema, hadoopConf) + val b = ParquetWriterAdapter.recordWriterBuilder(path, schema, knownSurfaces, hadoopConf) val builder = config(b) builder.build() } diff --git a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetObjectWriter.scala b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetObjectWriter.scala index 219aaf796b..c3ac903622 100644 --- a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetObjectWriter.scala +++ b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetObjectWriter.scala @@ -19,7 +19,7 @@ import org.apache.parquet.schema.{MessageType, Type} import wvlet.airframe.codec.{JSONCodec, MessageCodec, MessageCodecException} import wvlet.airframe.codec.PrimitiveCodec.{AnyCodec, ValueCodec} import wvlet.airframe.json.JSONParseException -import wvlet.airframe.msgpack.spi.Value.{ArrayValue, BinaryValue, MapValue, StringValue} +import wvlet.airframe.msgpack.spi.Value.{ArrayValue, BinaryValue, MapValue, NilValue, StringValue} import wvlet.airframe.msgpack.spi.{Code, MessagePack, MsgPack, Value} import wvlet.airframe.surface.{CName, Parameter, Surface} import wvlet.log.LogSupport @@ -111,7 +111,7 @@ class ParquetSeqWriter(elementCodec: ParquetWriteCodec) extends ParquetWriteCode } case _ => // Write unknown value as binary - val msgpack = AnyCodec.toMsgPack(v) + val msgpack = AnyCodec.default.toMsgPack(v) recordConsumer.addBinary(Binary.fromConstantByteArray(msgpack)) } } @@ -185,6 +185,8 @@ case class ParquetObjectWriter(paramWriters: Seq[ParquetFieldWriter], params: Se parquetCodecTable.get(columnName) match { case Some(parameterCodec) => v match { + case NilValue => + // skip case m: MapValue if m.isEmpty => // skip case a: ArrayValue if a.isEmpty => diff --git a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetRecordWriter.scala b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetRecordWriter.scala index 08322d76d5..4a6c4a369c 100644 --- a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetRecordWriter.scala +++ b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetRecordWriter.scala @@ -15,8 +15,9 @@ package wvlet.airframe.parquet import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.MessageType -import wvlet.airframe.codec.PrimitiveCodec.ValueCodec +import wvlet.airframe.codec.PrimitiveCodec.{AnyCodec, ValueCodec} import wvlet.airframe.codec.{MessageCodec, MessageCodecException} +import wvlet.airframe.surface.Surface import wvlet.log.LogSupport /** @@ -24,18 +25,18 @@ import wvlet.log.LogSupport * * @param schema */ -class ParquetRecordWriter(schema: MessageType) extends LogSupport { +class ParquetRecordWriter(schema: MessageType, knownSurfaces: Seq[Surface] = Seq.empty) extends LogSupport { private val parquetCodec: ParquetWriteCodec = { val surface = ParquetSchema.buildSurfaceFromParquetSchema(schema) ParquetWriteCodec.parquetCodecOf(schema, surface, ValueCodec).asRoot } - private val anyCodec = MessageCodec.of[Any] + private val codec = new AnyCodec(knownSurfaces) def pack(obj: Any, recordConsumer: RecordConsumer): Unit = { val msgpack = try { - anyCodec.toMsgPack(obj) + codec.toMsgPack(obj) } catch { case e: MessageCodecException => throw new IllegalArgumentException(s"Cannot convert the input into MsgPack: ${obj}", e) diff --git a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetWriteCodec.scala b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetWriteCodec.scala index 0696db745b..29a90d4425 100644 --- a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetWriteCodec.scala +++ b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetWriteCodec.scala @@ -19,7 +19,15 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.{MessageType, Type} import org.apache.parquet.schema.Type.Repetition import wvlet.airframe.codec.MessageCodec -import wvlet.airframe.codec.PrimitiveCodec.{BooleanCodec, DoubleCodec, FloatCodec, IntCodec, LongCodec, StringCodec} +import wvlet.airframe.codec.PrimitiveCodec.{ + BooleanCodec, + DoubleCodec, + FloatCodec, + IntCodec, + LongCodec, + StringCodec, + ValueCodec +} import wvlet.airframe.msgpack.spi.MsgPack import wvlet.airframe.surface.Surface import wvlet.log.LogSupport diff --git a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetWriterAdapter.scala b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetWriterAdapter.scala index 60d1ffc37c..249e07260f 100644 --- a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetWriterAdapter.scala +++ b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetWriterAdapter.scala @@ -47,18 +47,23 @@ object ParquetWriterAdapter extends LogSupport { } } - class RecordWriterBuilder(schema: MessageType, file: OutputFile) + class RecordWriterBuilder(schema: MessageType, file: OutputFile, knownSurfaces: Seq[Surface]) extends ParquetWriter.Builder[Any, RecordWriterBuilder](file: OutputFile) { override def self(): RecordWriterBuilder = this override def getWriteSupport(conf: Configuration): WriteSupport[Any] = { - new ParquetRecordWriterSupportAdapter(schema) + new ParquetRecordWriterSupportAdapter(schema, knownSurfaces) } } - def recordWriterBuilder(path: String, schema: MessageType, conf: Configuration): RecordWriterBuilder = { + def recordWriterBuilder( + path: String, + schema: MessageType, + knownSurfaces: Seq[Surface], + conf: Configuration + ): RecordWriterBuilder = { val fsPath = new Path(path) val file = HadoopOutputFile.fromPath(fsPath, conf) - val b = new RecordWriterBuilder(schema, file).withConf(conf) + val b = new RecordWriterBuilder(schema, file, knownSurfaces).withConf(conf) // Use snappy by default b.withCompressionCodec(CompressionCodecName.SNAPPY) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) @@ -90,7 +95,9 @@ class ParquetWriteSupportAdapter[A](surface: Surface) extends WriteSupport[A] wi } } -class ParquetRecordWriterSupportAdapter(schema: MessageType) extends WriteSupport[Any] with LogSupport { +class ParquetRecordWriterSupportAdapter(schema: MessageType, knownSurfaces: Seq[Surface]) + extends WriteSupport[Any] + with LogSupport { private var recordConsumer: RecordConsumer = null override def init(configuration: Configuration): WriteContext = { @@ -102,7 +109,7 @@ class ParquetRecordWriterSupportAdapter(schema: MessageType) extends WriteSuppor this.recordConsumer = recordConsumer } - private val codec = new ParquetRecordWriter(schema) + private val codec = new ParquetRecordWriter(schema, knownSurfaces) override def write(record: Any): Unit = { require(recordConsumer != null) diff --git a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/RecordBuilder.scala b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/RecordBuilder.scala index 72aebadffb..5e80e5d8a2 100644 --- a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/RecordBuilder.scala +++ b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/RecordBuilder.scala @@ -42,6 +42,7 @@ class RecordBuilderImpl extends RecordBuilder with LogSupport { } } } + def toMap: Map[String, Any] = { holder } diff --git a/airframe-parquet/src/test/scala/wvlet/airframe/parquet/NestedRecordWriteTest.scala b/airframe-parquet/src/test/scala/wvlet/airframe/parquet/NestedRecordWriteTest.scala index 9a3fbf9da6..6f9631de0c 100644 --- a/airframe-parquet/src/test/scala/wvlet/airframe/parquet/NestedRecordWriteTest.scala +++ b/airframe-parquet/src/test/scala/wvlet/airframe/parquet/NestedRecordWriteTest.scala @@ -14,6 +14,7 @@ package wvlet.airframe.parquet import wvlet.airframe.control.Control.withResource +import wvlet.airframe.control.Resource import wvlet.airframe.msgpack.spi.{Value, ValueFactory} import wvlet.airframe.surface.Surface import wvlet.airframe.ulid.ULID @@ -42,7 +43,7 @@ object NestedRecordWriteTest extends AirSpec { debug(s"write target schema: ${schema}") IOUtil.withTempFile("target/tmp-nested-record", ".parquet") { file => - withResource(Parquet.newRecordWriter(file.getPath, schema)) { writer => + withResource(Parquet.newRecordWriter(file.getPath, schema, knownSurfaces = Seq(Surface.of[ColStats]))) { writer => writer.write(m1) writer.write(m2) } @@ -109,7 +110,13 @@ object NestedRecordWriteTest extends AirSpec { test("write records with Option/Seq using record writer") { val p0 = Partition(id = ULID.newULID, metadata = Map("xxx" -> "yyy")) IOUtil.withTempFile("target/tmp-nested-opt-record", ".parquet") { file => - withResource(Parquet.newRecordWriter(file.getPath, Parquet.toParquetSchema(Surface.of[Partition]))) { writer => + withResource( + Parquet.newRecordWriter( + file.getPath, + Parquet.toParquetSchema(Surface.of[Partition]), + knownSurfaces = Seq(Surface.of[Partition]) + ) + ) { writer => writer.write(p0) } withResource(Parquet.newReader[Partition](file.getPath)) { reader => diff --git a/airframe-parquet/src/test/scala/wvlet/airframe/parquet/ParquetRecordWriterTest.scala b/airframe-parquet/src/test/scala/wvlet/airframe/parquet/ParquetRecordWriterTest.scala index 43ab0c2dab..822f829fd9 100644 --- a/airframe-parquet/src/test/scala/wvlet/airframe/parquet/ParquetRecordWriterTest.scala +++ b/airframe-parquet/src/test/scala/wvlet/airframe/parquet/ParquetRecordWriterTest.scala @@ -35,7 +35,7 @@ object ParquetRecordWriterTest extends AirSpec { writer.write(Array(2, "yui")) writer.write("""{"id":3, "name":"aina"}""") writer.write("""[4, "ruri"]""") - writer.write(AnyCodec.toMsgPack(Map("id" -> 5, "name" -> "xxx"))) + writer.write(AnyCodec.default.toMsgPack(Map("id" -> 5, "name" -> "xxx"))) } withResource(Parquet.newReader[Map[String, Any]](file.getPath)) { reader => @@ -86,10 +86,11 @@ object ParquetRecordWriterTest extends AirSpec { test("write records with Option") { IOUtil.withTempFile("target/tmp-record-opt", ".parquet") { file => - withResource(Parquet.newRecordWriter(file.getPath, schema2)) { writer => - writer.write(RecordOpt(1, Some(1))) - writer.write(RecordOpt(2, None)) - writer.write("""{"id":"3"}""") + withResource(Parquet.newRecordWriter(file.getPath, schema2, knownSurfaces = Seq(Surface.of[RecordOpt]))) { + writer => + writer.write(RecordOpt(1, Some(1))) + writer.write(RecordOpt(2, None)) + writer.write("""{"id":"3"}""") } withResource(Parquet.newReader[Map[String, Any]](file.getPath)) { reader => diff --git a/docs/airframe-parquet.md b/docs/airframe-parquet.md index 893be04e5a..4866154651 100644 --- a/docs/airframe-parquet.md +++ b/docs/airframe-parquet.md @@ -66,7 +66,7 @@ val schema = new MessageType( Types.optional(PrimitiveTypeName.BINARY).as(stringType).named("name") ) // Create a record writer for the given schema -val recordWriter = Parquet.newRecordWriter(path = "record.parquet", schema) +val recordWriter = Parquet.newRecordWriter(path = "record.parquet", schema = schema) // Write a record using Map (column name -> value) recordWriter.write(Map("id" -> 1, "name" -> "leo")) // Write a record using JSON object @@ -75,10 +75,23 @@ recordWriter.write("""{"id":2, "name":"yui"}""") recordWriter.write(Seq(3, "aina")) // Write a record using JSON array recordWriter.write("""[4, "xxx"]""") -// You can use case classes as input as well -recordWriter.write(MyEntry(5, "yyy")) - recordWriter.close() + + +// In case you need to write dynamic recoreds containing case classes, +// register the Surfaces of these classes +case class Nested(id:Int, entry:MyEntry) +val nestedRecordWriter = Parquet.newRecordWriter( + path = "nested.parquet", + // You can build a Parquet schema matching to Surface + schema = Parquet.toParquetSchema(Surface.of[Nested]), + knownSurfaces = Seq(Surface.of[MyEntry]) // required to serialize MyEntry +) + +// Write dynamic records +nestedRecordWriter.write(Map("id" -> 1, "entry" -> MyEntry(1, "yyy")) +nestedRecordWriter.write(Map("id" -> 2, "entry" -> MyEntry(2, "zzz")) +nestedRecordWriter.close() ``` ### Using with AWS S3