Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #6205] Asynchronize DefaultPullConsumer pull method #6341

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
/**
* Pull thread number
*/
private int pullThreadNums = 20;
private int pullThreadNums = 1;

/**
* Minimum commit offset interval time in milliseconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
Expand Down Expand Up @@ -857,7 +858,7 @@ public long searchOffset(MessageQueue mq, long timestamp) throws MQClientExcepti
return this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
}

public class PullTaskImpl implements Runnable {
private class PullTaskImpl implements Runnable {
private final MessageQueue messageQueue;
private volatile boolean cancelled = false;
private Thread currentThread;
Expand Down Expand Up @@ -963,27 +964,40 @@ public void run() {
subscriptionData = FilterAPI.buildSubscriptionData(topic, subExpression4Assign);
}

PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
if (this.isCancelled() || processQueue.isDropped()) {
return;
}
switch (pullResult.getPullStatus()) {
case FOUND:
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
synchronized (objLock) {
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
processQueue.putMessage(pullResult.getMsgFoundList());
submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
}
pullAsync(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize(), new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
DefaultLitePullConsumerImpl.this.pullAPIWrapper.processPullResult(messageQueue, pullResult, subscriptionData);
if (PullTaskImpl.this.isCancelled() || processQueue.isDropped()) {
return;
}
break;
case OFFSET_ILLEGAL:
log.warn("The pull request offset illegal, {}", pullResult.toString());
break;
default:
break;
}
updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
switch (pullResult.getPullStatus()) {
case FOUND:
final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
synchronized (objLock) {
if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
processQueue.putMessage(pullResult.getMsgFoundList());
submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
}
}
break;
case OFFSET_ILLEGAL:
log.warn("The pull request offset illegal, {}", pullResult.toString());
break;
default:
break;
}
updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue);
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(PullTaskImpl.this, 0L, TimeUnit.MILLISECONDS);
}

@Override
public void onException(Throwable e) {
log.warn("execute the pull request exception", e);
DefaultLitePullConsumerImpl.this.scheduledThreadPoolExecutor.schedule(PullTaskImpl.this, pullTimeDelayMillsWhenException, TimeUnit.MILLISECONDS);
}
});

} catch (InterruptedException interruptedException) {
log.warn("Polling thread was interrupted.", interruptedException);
} catch (Throwable e) {
Expand Down Expand Up @@ -1016,19 +1030,20 @@ public MessageQueue getMessageQueue() {
}
}

private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums)
private PullResult pullAsync(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums,
PullCallback pullCallback)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return pull(mq, subscriptionData, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis());
return pull(mq, subscriptionData, offset, maxNums, this.defaultLitePullConsumer.getConsumerPullTimeoutMillis(), CommunicationMode.ASYNC, pullCallback);
}

private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, long timeout)
private PullResult pull(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, long timeout,
CommunicationMode communicationMode, PullCallback pullCallback)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, timeout);
return this.pullImpl(mq, subscriptionData, offset, maxNums, true, timeout, communicationMode, pullCallback);
}

private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums,
boolean block,
long timeout)
private PullResult pullImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums,
boolean block, long timeout, CommunicationMode communicationMode, PullCallback pullCallback)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

if (null == mq) {
Expand All @@ -1043,6 +1058,10 @@ private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionDa
throw new MQClientException("maxNums <= 0", null);
}

if (CommunicationMode.ASYNC == communicationMode && pullCallback == null) {
throw new MQClientException("Async communication mode but callback is null", null);
}

int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false, true);

long timeoutMillis = block ? this.defaultLitePullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
Expand All @@ -1059,8 +1078,8 @@ private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionDa
0,
this.defaultLitePullConsumer.getBrokerSuspendMaxTimeMillis(),
timeoutMillis,
CommunicationMode.SYNC,
null
communicationMode,
pullCallback
);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
return pullResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -725,7 +724,7 @@ private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsume
field.set(litePullConsumerImpl, offsetStore);

when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
anyLong(), any(CommunicationMode.class), any(PullCallback.class)))
.thenAnswer(new Answer<PullResult>() {
@Override
public PullResult answer(InvocationOnMock mock) throws Throwable {
Expand All @@ -739,6 +738,7 @@ public PullResult answer(InvocationOnMock mock) throws Throwable {
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
Expand Down Expand Up @@ -785,7 +785,7 @@ private void initDefaultLitePullConsumerWithTag(DefaultLitePullConsumer litePull
field.set(litePullConsumerImpl, offsetStore);

when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
anyLong(), any(CommunicationMode.class), any(PullCallback.class)))
.thenAnswer(new Answer<PullResult>() {
@Override
public PullResult answer(InvocationOnMock mock) throws Throwable {
Expand All @@ -800,6 +800,7 @@ public PullResult answer(InvocationOnMock mock) throws Throwable {
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -226,7 +225,7 @@ private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsume
traceProducer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());

when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
anyLong(), any(CommunicationMode.class), any(PullCallback.class)))
.thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
Expand All @@ -240,6 +239,7 @@ public Object answer(InvocationOnMock mock) throws Throwable {
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
Expand Down