Skip to content

Commit

Permalink
Merge pull request #18 from ibm-messaging/1.0.2-beta
Browse files Browse the repository at this point in the history
Beta release of 1.0.2
  • Loading branch information
AndrewJSchofield authored Jan 21, 2019
2 parents e14f8d9 + a63bcee commit 0ca9325
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 49 deletions.
18 changes: 3 additions & 15 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
* Copyright 2017, 2018 IBM Corporation
* Copyright 2017, 2018, 2019 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,7 @@
<groupId>com.ibm.eventstreams.connect</groupId>
<artifactId>kafka-connect-mq-source</artifactId>
<packaging>jar</packaging>
<version>1.0.1</version>
<version>1.0.2-beta</version>
<name>kafka-connect-mq-source</name>
<organization>
<name>IBM Corporation</name>
Expand Down Expand Up @@ -63,7 +63,7 @@
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>com.ibm.mq.allclient</artifactId>
<version>9.1.0.0</version>
<version>9.1.1.0</version>
</dependency>

<dependency>
Expand All @@ -78,18 +78,6 @@
<version>1.7.25</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<version>1.60</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.60</version>
</dependency>

</dependencies>

<build>
Expand Down
14 changes: 3 additions & 11 deletions src/main/java/com/ibm/eventstreams/connect/mqsource/JMSReader.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017, 2018 IBM Corporation
* Copyright 2017, 2018, 2019 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -302,16 +302,8 @@ public void commit() {
public void close() {
log.trace("[{}] Entry {}.close", Thread.currentThread().getId(), this.getClass().getName());

try {
JMSContext ctxt = jmsCtxt;
closeNow.set(true);
if (ctxt != null) {
ctxt.close();
}
}
catch (JMSRuntimeException jmse) {
;
}
closeNow.set(true);
closeInternal();

log.trace("[{}] Exit {}.close", Thread.currentThread().getId(), this.getClass().getName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017, 2018 IBM Corporation
* Copyright 2017, 2018, 2019 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -96,7 +96,7 @@ public class MQSourceConnector extends SourceConnector {
public static final String CONFIG_DOCUMENTATION_TOPIC = "The name of the target Kafka topic.";
public static final String CONFIG_DISPLAY_TOPIC = "Target Kafka topic";

public static String VERSION = "1.0.1";
public static String VERSION = "1.0.2-beta";

private Map<String, String> configProps;

Expand Down
150 changes: 129 additions & 21 deletions src/main/java/com/ibm/eventstreams/connect/mqsource/MQSourceTask.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2017, 2018 IBM Corporation
* Copyright 2017, 2018, 2019 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,8 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -30,12 +32,13 @@
public class MQSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(MQSourceTask.class);

private static int BATCH_SIZE = 100;
private static int MAX_UNCOMMITTED_MSGS = 10000;
private static int MAX_UNCOMMITTED_MSGS_DELAY_MS = 500;
private static int BATCH_SIZE = 250; // The maximum number of records returned per call to poll()
private CountDownLatch batchCompleteSignal = null; // Used to signal completion of a batch
private AtomicInteger pollCycle = new AtomicInteger(1); // Incremented each time poll() is called
private int lastCommitPollCycle = 0; // The value of pollCycle the last time commit() was called
private AtomicBoolean stopNow = new AtomicBoolean(); // Whether stop has been requested

private JMSReader reader;
private AtomicInteger uncommittedMessages = new AtomicInteger(0);

public MQSourceTask() {
}
Expand Down Expand Up @@ -87,35 +90,51 @@ public MQSourceTask() {

final List<SourceRecord> msgs = new ArrayList<>();
int messageCount = 0;
int uncommittedMessagesInt = this.uncommittedMessages.get();

if (uncommittedMessagesInt < MAX_UNCOMMITTED_MSGS) {
log.info("Polling for records");
// Resolve any in-flight transaction, committing unless there has been an error between
// receiving the message from MQ and converting it
if (batchCompleteSignal != null) {
log.debug("Awaiting batch completion signal");
batchCompleteSignal.await();

log.debug("Committing records");
reader.commit();
}

// Increment the counter for the number of times poll is called so we can ensure we don't get stuck waiting for
// commitRecord callbacks to trigger the batch complete signal
int currentPollCycle = pollCycle.incrementAndGet();
log.debug("Starting poll cycle {}", currentPollCycle);

if (!stopNow.get()) {
log.info("Polling for records");
SourceRecord src;
do {
// For the first message in the batch, wait a while if no message
src = reader.receive(messageCount == 0);
if (src != null) {
msgs.add(src);
messageCount++;
uncommittedMessagesInt = this.uncommittedMessages.incrementAndGet();
}
} while ((src != null) && (messageCount < BATCH_SIZE) && (uncommittedMessagesInt < MAX_UNCOMMITTED_MSGS));

log.debug("Poll returning {} records", messageCount);
} while ((src != null) && (messageCount < BATCH_SIZE) && !stopNow.get());
}
else {
log.info("Uncommitted message limit reached");
Thread.sleep(MAX_UNCOMMITTED_MSGS_DELAY_MS);

synchronized(this) {
if (messageCount > 0) {
batchCompleteSignal = new CountDownLatch(messageCount);
}
else {
batchCompleteSignal = null;
}
}

log.debug("Poll returning {} records", messageCount);

log.trace("[{}] Exit {}.poll, retval={}", Thread.currentThread().getId(), this.getClass().getName(), messageCount);
return msgs;
}


/**
/**
* <p>
* Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This
* method should block until the commit is complete.
Expand All @@ -129,9 +148,49 @@ public MQSourceTask() {
public void commit() throws InterruptedException {
log.trace("[{}] Entry {}.commit", Thread.currentThread().getId(), this.getClass().getName());

log.debug("Committing records");
reader.commit();
this.uncommittedMessages.set(0);
// This callback is simply used to ensure that the mechanism to use commitRecord callbacks
// to check that all messages in a batch are complete is not getting stuck. If this callback
// is being called, it means that Kafka Connect believes that all outstanding messages have
// been completed. That should mean that commitRecord has been called for all of them too.
// However, if too few calls to commitRecord are received, the connector could wait indefinitely.
// If this commit callback is called twice without the poll cycle increasing, trigger the
// batch complete signal directly.
int currentPollCycle = pollCycle.get();
log.debug("Commit starting in poll cycle {}", currentPollCycle);
boolean willShutdown = false;

if (lastCommitPollCycle == currentPollCycle)
{
synchronized (this) {
if (batchCompleteSignal != null) {
log.debug("Bumping batch complete signal by {}", batchCompleteSignal.getCount());

// This means we're waiting for the signal in the poll() method and it's been
// waiting for at least two calls to this commit callback. It's stuck.
while (batchCompleteSignal.getCount() > 0) {
batchCompleteSignal.countDown();
}
}
else if (stopNow.get()) {
log.debug("Shutting down with empty batch after delay");
willShutdown = true;
}
}
}
else {
lastCommitPollCycle = currentPollCycle;

synchronized (this) {
if ((batchCompleteSignal == null) && stopNow.get()) {
log.debug("Shutting down with empty batch");
willShutdown = true;
}
}
}

if (willShutdown) {
shutdown();
}

log.trace("[{}] Exit {}.commit", Thread.currentThread().getId(), this.getClass().getName());
}
Expand All @@ -149,10 +208,59 @@ public void commit() throws InterruptedException {
@Override public void stop() {
log.trace("[{}] Entry {}.stop", Thread.currentThread().getId(), this.getClass().getName());

stopNow.set(true);

boolean willShutdown = false;

synchronized(this) {
if (batchCompleteSignal == null) {
willShutdown = true;
}
}

if (willShutdown) {
shutdown();
}

log.trace("[{}] Exit {}.stop", Thread.currentThread().getId(), this.getClass().getName());
}

/**
* <p>
* Commit an individual {@link SourceRecord} when the callback from the producer client is received, or if a record is filtered by a transformation.
* </p>
* <p>
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
* </p>
*
* @param record {@link SourceRecord} that was successfully sent via the producer.
* @throws InterruptedException
*/
@Override public void commitRecord(SourceRecord record) throws InterruptedException {
log.trace("[{}] Entry {}.commitRecord, record={}", Thread.currentThread().getId(), this.getClass().getName(), record);

synchronized (this) {
batchCompleteSignal.countDown();
}

log.trace("[{}] Exit {}.commitRecord", Thread.currentThread().getId(), this.getClass().getName());
}

/**
* <p>
* Shuts down the task, releasing any resource held by the task.
* </p>
*/
private void shutdown() {
log.trace("[{}] Entry {}.shutdown", Thread.currentThread().getId(), this.getClass().getName());

// Close the connection to MQ to clean up
if (reader != null) {
reader.close();
}

log.trace("[{}] Exit {}.stop", Thread.currentThread().getId(), this.getClass().getName());
log.trace("[{}] Exit {}.shutdown", Thread.currentThread().getId(), this.getClass().getName());
}
}

0 comments on commit 0ca9325

Please sign in to comment.