Skip to content

Commit

Permalink
Merge branch 'release/0.12.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed May 13, 2020
2 parents aa72b16 + 1d6bbd9 commit 003f24b
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 8 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 0.12.1 (2020-05-13)
---------------------------
Fix NullPointerException before resharding (#156)

Version 0.12.0 (2020-03-17)
---------------------------
Add badges to README (#73)
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ $ sbt compile
The Snowplow Elasticsearch Loader has the following command-line interface:

```
snowplow-elasticsearch-loader 0.12.0
snowplow-elasticsearch-loader 0.12.1
Usage: snowplow-elasticsearch-loader [options]
Expand All @@ -52,7 +52,7 @@ aws {
Next, start the loader, making sure to specify your new config file:

```bash
$ java -jar snowplow-elasticsearch-loader-http-0.12.0.jar --config my.conf
$ java -jar snowplow-elasticsearch-loader-http-0.12.1.jar --config my.conf
```

## Find out more
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ lazy val commonDependencies = Seq(
lazy val buildSettings = Seq(
organization := "com.snowplowanalytics",
name := "snowplow-elasticsearch-loader",
version := "0.12.0",
version := "0.12.1",
description := "Load the contents of a Kinesis stream or NSQ topic to Elasticsearch",
scalaVersion := "2.12.10",
scalacOptions := BuildSettings.compilerOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,16 @@ class Emitter(
@throws[IOException]
private def attemptEmit(records: List[EmitterJsonInput]): List[EmitterJsonInput] = {
if (records.isEmpty) {
null
Nil
} else {
val (validRecords: List[EmitterJsonInput], invalidRecords: List[EmitterJsonInput]) =
records.partition(_._2.isValid)
// Send all valid records to stdout / Sink and return those rejected by it
val rejects = goodSink match {
case Some(s) =>
validRecords.foreach {
case (_, record) => record.map(r => s.store(r.json.toString, None, true))
case (_, Validated.Valid(r)) => s.store(r.json.toString, None, true)
case _ => ()
}
Nil
case None if validRecords.isEmpty => Nil
Expand Down Expand Up @@ -153,7 +154,10 @@ class Emitter(
/**
* Closes the Sink client when the KinesisConnectorRecordProcessor is shut down
*/
override def shutdown(): Unit = bulkSender.close()
override def shutdown(): Unit = {
println("Shutting down emitter")
bulkSender.close()
}

/**
* Handles records rejected by the JsonTransformer or by Sink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ trait BulkSender[A] {
* Terminate the application in a way the KCL cannot stop, prevents shutdown hooks from running
*/
protected def forceShutdown(): Unit = {
log.info("BulkSender force shutdown")
tracker.foreach { t =>
// TODO: Instead of waiting a fixed time, use synchronous tracking or futures (when the tracker supports futures)
SnowplowTracking.trackApplicationShutdown(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ class ElasticsearchBulkSender(
}

// do not close the es client, otherwise it will fail when resharding
override def close(): Unit = ()
override def close(): Unit =
log.info("Closing BulkSender")

override def send(records: List[EmitterJsonInput]): List[EmitterJsonInput] = {
val connectionAttemptStartTime = System.currentTimeMillis()
Expand Down Expand Up @@ -170,7 +171,7 @@ class ElasticsearchBulkSender(

/** Logs the cluster health */
override def logHealth(): Unit =
client.execute(clusterHealth) onComplete {
client.execute(clusterHealth).onComplete {
case SSuccess(health) =>
health match {
case response =>
Expand Down

0 comments on commit 003f24b

Please sign in to comment.