configProps;
@@ -229,6 +235,11 @@ public class MQSourceConnector extends SourceConnector {
CONFIG_DOCUMENTATION_MQ_SSL_PEER_NAME, CONFIG_GROUP_MQ, 13, Width.MEDIUM,
CONFIG_DISPLAY_MQ_SSL_PEER_NAME);
+ config.define(CONFIG_NAME_MQ_BATCH_SIZE, Type.INT, CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT,
+ ConfigDef.Range.atLeast(CONFIG_VALUE_MQ_BATCH_SIZE_MINIMUM), Importance.LOW,
+ CONFIG_DOCUMENTATION_MQ_BATCH_SIZE, CONFIG_GROUP_MQ, 14, Width.MEDIUM,
+ CONFIG_DISPLAY_MQ_BATCH_SIZE);
+
config.define(CONFIG_NAME_TOPIC, Type.STRING, null, Importance.HIGH,
CONFIG_DOCUMENTATION_TOPIC, null, 0, Width.MEDIUM,
CONFIG_DISPLAY_TOPIC);
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java
index 53c394a..b62a73a 100644
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java
@@ -32,10 +32,12 @@
public class MQSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(MQSourceTask.class);
- private static int BATCH_SIZE = 250; // The maximum number of records returned per call to poll()
+ // The maximum number of records returned per call to poll()
+ private int batchSize = MQSourceConnector.CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT;
private CountDownLatch batchCompleteSignal = null; // Used to signal completion of a batch
private AtomicInteger pollCycle = new AtomicInteger(1); // Incremented each time poll() is called
private int lastCommitPollCycle = 0; // The value of pollCycle the last time commit() was called
+ private AtomicBoolean receivingMessages = new AtomicBoolean(); // Whether currently receiving messages
private AtomicBoolean stopNow = new AtomicBoolean(); // Whether stop has been requested
private JMSReader reader;
@@ -69,6 +71,11 @@ public MQSourceTask() {
log.debug("Task props entry {} : {}", entry.getKey(), value);
}
+ String strBatchSize = props.get(MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE);
+ if (strBatchSize != null) {
+ batchSize = Integer.parseInt(strBatchSize);
+ }
+
// Construct a reader to interface with MQ
reader = new JMSReader();
reader.configure(props);
@@ -106,22 +113,40 @@ public MQSourceTask() {
int currentPollCycle = pollCycle.incrementAndGet();
log.debug("Starting poll cycle {}", currentPollCycle);
- if (!stopNow.get()) {
- log.info("Polling for records");
- SourceRecord src;
- do {
- // For the first message in the batch, wait a while if no message
- src = reader.receive(messageCount == 0);
- if (src != null) {
- msgs.add(src);
- messageCount++;
- }
- } while ((src != null) && (messageCount < BATCH_SIZE) && !stopNow.get());
+ try {
+ receivingMessages.set(true);
+
+ if (!stopNow.get()) {
+ log.info("Polling for records");
+ SourceRecord src;
+ do {
+ // For the first message in the batch, wait a while if no message
+ src = reader.receive(messageCount == 0);
+ if (src != null) {
+ msgs.add(src);
+ messageCount++;
+ }
+ } while ((src != null) && (messageCount < batchSize) && !stopNow.get());
+ }
+ else {
+ log.info("Stopping polling for records");
+ }
+ }
+ finally {
+ receivingMessages.set(false);
}
synchronized(this) {
if (messageCount > 0) {
- batchCompleteSignal = new CountDownLatch(messageCount);
+ if (!stopNow.get()) {
+ batchCompleteSignal = new CountDownLatch(messageCount);
+ }
+ else {
+ // Discard this batch - we've rolled back when the connection to MQ was closed in stop()
+ log.debug("Discarding a batch of {} records as task is stopping", messageCount);
+ msgs.clear();
+ batchCompleteSignal = null;
+ }
}
else {
batchCompleteSignal = null;
@@ -157,7 +182,6 @@ public void commit() throws InterruptedException {
// batch complete signal directly.
int currentPollCycle = pollCycle.get();
log.debug("Commit starting in poll cycle {}", currentPollCycle);
- boolean willShutdown = false;
if (lastCommitPollCycle == currentPollCycle)
{
@@ -171,25 +195,10 @@ public void commit() throws InterruptedException {
batchCompleteSignal.countDown();
}
}
- else if (stopNow.get()) {
- log.debug("Shutting down with empty batch after delay");
- willShutdown = true;
- }
}
}
else {
lastCommitPollCycle = currentPollCycle;
-
- synchronized (this) {
- if ((batchCompleteSignal == null) && stopNow.get()) {
- log.debug("Shutting down with empty batch");
- willShutdown = true;
- }
- }
- }
-
- if (willShutdown) {
- shutdown();
}
log.trace("[{}] Exit {}.commit", Thread.currentThread().getId(), this.getClass().getName());
@@ -210,16 +219,20 @@ else if (stopNow.get()) {
stopNow.set(true);
- boolean willShutdown = false;
+ boolean willClose = false;
synchronized(this) {
- if (batchCompleteSignal == null) {
- willShutdown = true;
+ if (receivingMessages.get()) {
+ log.debug("Will close connection");
+ willClose = true;
}
}
- if (willShutdown) {
- shutdown();
+ if (willClose) {
+ // Close the connection to MQ to clean up
+ if (reader != null) {
+ reader.close();
+ }
}
log.trace("[{}] Exit {}.stop", Thread.currentThread().getId(), this.getClass().getName());
@@ -247,20 +260,4 @@ else if (stopNow.get()) {
log.trace("[{}] Exit {}.commitRecord", Thread.currentThread().getId(), this.getClass().getName());
}
-
- /**
- *
- * Shuts down the task, releasing any resource held by the task.
- *
- */
- private void shutdown() {
- log.trace("[{}] Entry {}.shutdown", Thread.currentThread().getId(), this.getClass().getName());
-
- // Close the connection to MQ to clean up
- if (reader != null) {
- reader.close();
- }
-
- log.trace("[{}] Exit {}.shutdown", Thread.currentThread().getId(), this.getClass().getName());
- }
}
\ No newline at end of file
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java
index 839b3cd..fbcc13f 100644
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/BaseRecordBuilder.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2018 IBM Corporation
+ * Copyright 2018, 2019 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -115,6 +115,8 @@ SchemaAndValue getKey(JMSContext context, String topic, Message message) throws
keySchema = Schema.OPTIONAL_BYTES_SCHEMA;
key = message.getJMSCorrelationIDAsBytes();
break;
+ default:
+ break;
}
return new SchemaAndValue(keySchema, key);
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java
index b7d1847..142726c 100644
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/DefaultRecordBuilder.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2017, 2018 IBM Corporation
+ * Copyright 2017, 2018, 2019 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +24,6 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java
index 593123b..0b1691d 100644
--- a/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java
+++ b/src/main/java/com/ibm/eventstreams/connect/mqsource/builders/JsonRecordBuilder.java
@@ -1,5 +1,5 @@
/**
- * Copyright 2017, 2018 IBM Corporation
+ * Copyright 2017, 2018, 2019 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,11 +24,9 @@
import javax.jms.Message;
import javax.jms.TextMessage;
-import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
-import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;