Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4646] Add condition for transforming non-data values #4637

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class LarkConnectServer {
public static void main(String[] args) throws Exception {

LarkConnectServerConfig larkConnectServerConfig = ConfigUtil.parse(LarkConnectServerConfig.class,
Constants.CONNECT_SERVER_CONFIG_FILE_NAME);
Constants.CONNECT_SERVER_CONFIG_FILE_NAME);

if (larkConnectServerConfig.isSinkEnable()) {
Application application = new Application();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,13 @@
ImServiceHandler imServiceHandler = new ImServiceHandler();
imServiceHandler.sinkConnectorConfig = sinkConnectorConfig;
imServiceHandler.imService = Client.newBuilder(sinkConnectorConfig.getAppId(), sinkConnectorConfig.getAppSecret())
.httpTransport(new OkHttpTransport(new OkHttpClient().newBuilder()
.callTimeout(3L, TimeUnit.SECONDS)
.build())
)
.disableTokenCache()
.requestTimeout(3, TimeUnit.SECONDS)
.build()
.im();
.httpTransport(new OkHttpTransport(new OkHttpClient().newBuilder()
.callTimeout(3L, TimeUnit.SECONDS)
.build()))
.disableTokenCache()
.requestTimeout(3, TimeUnit.SECONDS)
.build()
.im();

long fixedWait = Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
int maxRetryTimes = Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1;
Expand Down Expand Up @@ -128,24 +127,25 @@
});
} else {
imServiceHandler.retryer = RetryerBuilder.<ConnectRecord>newBuilder()
.retryIfException()
.retryIfResult(Objects::nonNull)
.withWaitStrategy(WaitStrategies.fixedWait(fixedWait, TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
.withRetryListener(new RetryListener() {
@SneakyThrows
@Override
public <V> void onRetry(Attempt<V> attempt) {

long times = attempt.getAttemptNumber();
if (times > 1) {
redoSinkNum.increment();
log.info("Total redo sink task num : [{}]", redoSinkNum.sum());
log.warn("Retry sink event to lark | times=[{}]", attempt.getAttemptNumber() - 1);
}
.retryIfException()
.retryIfResult(Objects::nonNull)
.withWaitStrategy(WaitStrategies.fixedWait(fixedWait, TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes))
.withRetryListener(new RetryListener() {

@SneakyThrows
@Override
public <V> void onRetry(Attempt<V> attempt) {

long times = attempt.getAttemptNumber();
if (times > 1) {
redoSinkNum.increment();
log.info("Total redo sink task num : [{}]", redoSinkNum.sum());
log.warn("Retry sink event to lark | times=[{}]", attempt.getAttemptNumber() - 1);
}
})
.build();
}
})
.build();
}

return imServiceHandler;
Expand All @@ -156,9 +156,9 @@
headers.put("Content-Type", Lists.newArrayList("application/json; charset=utf-8"));

RequestOptions requestOptions = RequestOptions.newBuilder()
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(), sinkConnectorConfig.getAppSecret()))
.headers(headers)
.build();
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(), sinkConnectorConfig.getAppSecret()))
.headers(headers)
.build();

retryer.call(() -> {
CreateMessageReq createMessageReq = convertCreateMessageReq(connectRecord);
Expand All @@ -176,9 +176,9 @@
headers.put("Content-Type", Lists.newArrayList("application/json; charset=utf-8"));

RequestOptions requestOptions = RequestOptions.newBuilder()
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(), sinkConnectorConfig.getAppSecret()))
.headers(headers)
.build();
.tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(), sinkConnectorConfig.getAppSecret()))
.headers(headers)
.build();

CreateMessageReq createMessageReq = convertCreateMessageReq(connectRecord);

Expand All @@ -187,30 +187,30 @@
LongAdder cnt = new LongAdder();
AtomicBoolean isAck = new AtomicBoolean(false);
Runnable task = () -> CompletableFuture
.supplyAsync(() -> {
try {
cnt.increment();
return imService.message().create(createMessageReq, requestOptions);
} catch (Exception e) {
throw new RuntimeException(e);
}
}, sinkAsyncWorker)
.whenCompleteAsync((resp, e) -> {
if (cnt.sum() > 1) {
redoSinkNum.increment();
log.info("Total redo sink task num : [{}]", redoSinkNum.sum());
log.warn("Retry sink event to lark | times=[{}]", cnt.sum() - 1);
}
if (Objects.nonNull(e)) {
log.error("eventmesh-connector-lark internal exception.", e);
return;
}
if (resp.getCode() != 0) {
log.warn("Sinking event to lark failure | code:[{}] | msg:[{}] | err:[{}]", resp.getCode(), resp.getMsg(), resp.getError());
return;
}
isAck.set(true);
});
.supplyAsync(() -> {
try {
cnt.increment();
return imService.message().create(createMessageReq, requestOptions);
} catch (Exception e) {
throw new RuntimeException(e);

Check warning on line 195 in eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java#L194-L195

Added lines #L194 - L195 were not covered by tests
}
}, sinkAsyncWorker)
.whenCompleteAsync((resp, e) -> {
if (cnt.sum() > 1) {
redoSinkNum.increment();
log.info("Total redo sink task num : [{}]", redoSinkNum.sum());
log.warn("Retry sink event to lark | times=[{}]", cnt.sum() - 1);
}
if (Objects.nonNull(e)) {
log.error("eventmesh-connector-lark internal exception.", e);
return;

Check warning on line 206 in eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java#L205-L206

Added lines #L205 - L206 were not covered by tests
}
if (resp.getCode() != 0) {
log.warn("Sinking event to lark failure | code:[{}] | msg:[{}] | err:[{}]", resp.getCode(), resp.getMsg(), resp.getError());
return;
}
isAck.set(true);
});

ScheduledFuture<?> future = retryWorker.scheduleAtFixedRate(task, 0L, fixedWait, TimeUnit.MILLISECONDS);
cleanerWorker.submit(() -> {
Expand All @@ -226,8 +226,8 @@

private CreateMessageReq convertCreateMessageReq(ConnectRecord connectRecord) {
CreateMessageReqBody.Builder bodyBuilder = CreateMessageReqBody.newBuilder()
.receiveId(sinkConnectorConfig.getReceiveId())
.uuid(UUID.randomUUID().toString());
.receiveId(sinkConnectorConfig.getReceiveId())
.uuid(UUID.randomUUID().toString());

String templateTypeKey = connectRecord.getExtension(ConnectRecordExtensionKeys.TEMPLATE_TYPE_4_LARK);
if (null == templateTypeKey || "null".equals(templateTypeKey)) {
Expand All @@ -236,18 +236,18 @@
LarkMessageTemplateType templateType = LarkMessageTemplateType.of(templateTypeKey);
if (LarkMessageTemplateType.PLAIN_TEXT == templateType) {
bodyBuilder.content(createTextContent(connectRecord))
.msgType(MsgTypeEnum.MSG_TYPE_TEXT.getValue());
.msgType(MsgTypeEnum.MSG_TYPE_TEXT.getValue());
} else if (LarkMessageTemplateType.MARKDOWN == templateType) {
String title = Optional.ofNullable(connectRecord.getExtension(ConnectRecordExtensionKeys.MARKDOWN_MESSAGE_TITLE_4_LARK))
.orElse("EventMesh-Message");
.orElse("EventMesh-Message");

Check warning on line 242 in eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java#L242

Added line #L242 was not covered by tests
bodyBuilder.content(createInteractiveContent(connectRecord, title))
.msgType(MsgTypeEnum.MSG_TYPE_INTERACTIVE.getValue());
.msgType(MsgTypeEnum.MSG_TYPE_INTERACTIVE.getValue());

Check warning on line 244 in eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java#L244

Added line #L244 was not covered by tests
}

return CreateMessageReq.newBuilder()
.receiveIdType(sinkConnectorConfig.getReceiveIdType())
.createMessageReqBody(bodyBuilder.build())
.build();
.receiveIdType(sinkConnectorConfig.getReceiveIdType())
.createMessageReqBody(bodyBuilder.build())
.build();
}

private String createTextContent(ConnectRecord connectRecord) {
Expand Down Expand Up @@ -287,26 +287,26 @@
sb.append(new String((byte[]) connectRecord.getData()));

MessageCardConfig config = MessageCardConfig.newBuilder()
.enableForward(true)
.wideScreenMode(true)
.updateMulti(true)
.build();
.enableForward(true)
.wideScreenMode(true)
.updateMulti(true)
.build();

Check warning on line 293 in eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java#L290-L293

Added lines #L290 - L293 were not covered by tests

// header
MessageCardHeader header = MessageCardHeader.newBuilder()
.template(MessageCardHeaderTemplateEnum.BLUE)
.title(MessageCardPlainText.newBuilder()
.content(title)
.build())
.build();
.template(MessageCardHeaderTemplateEnum.BLUE)
.title(MessageCardPlainText.newBuilder()
.content(title)
.build())
.build();

Check warning on line 301 in eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java#L297-L301

Added lines #L297 - L301 were not covered by tests

MessageCard content = MessageCard.newBuilder()
.config(config)
.header(header)
.elements(new MessageCardElement[]{
MessageCardMarkdown.newBuilder().content(sb.toString()).build()
})
.build();
.config(config)
.header(header)
.elements(new MessageCardElement[]{
MessageCardMarkdown.newBuilder().content(sb.toString()).build()

Check warning on line 307 in eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java#L304-L307

Added lines #L304 - L307 were not covered by tests
})
.build();

Check warning on line 309 in eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java#L309

Added line #L309 was not covered by tests

return content.String();
}
Expand All @@ -328,7 +328,7 @@
*/
private void atAll(StringBuilder sb) {
sb.append("<at id=all>")
.append("</at>");
.append("</at>");

Check warning on line 331 in eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java#L331

Added line #L331 was not covered by tests
}

/**
Expand All @@ -339,8 +339,8 @@
*/
private void atUser(StringBuilder sb, String userId) {
sb.append("<at id=")
.append(userId)
.append(">")
.append("</at>");
.append(userId)
.append(">")
.append("</at>");

Check warning on line 344 in eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java#L342-L344

Added lines #L342 - L344 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@

// validate receiveIdType
if (!StringUtils.containsAny(receiveIdType, ReceiveIdTypeEnum.CHAT_ID.getValue(),
ReceiveIdTypeEnum.EMAIL.getValue(),
ReceiveIdTypeEnum.OPEN_ID.getValue(),
ReceiveIdTypeEnum.USER_ID.getValue(),
ReceiveIdTypeEnum.UNION_ID.getValue())) {
ReceiveIdTypeEnum.EMAIL.getValue(),
ReceiveIdTypeEnum.OPEN_ID.getValue(),
ReceiveIdTypeEnum.USER_ID.getValue(),
ReceiveIdTypeEnum.UNION_ID.getValue())) {
throw new IllegalArgumentException(
String.format("sinkConnectorConfig.receiveIdType=[%s], Invalid.", receiveIdType));
String.format("sinkConnectorConfig.receiveIdType=[%s], Invalid.", receiveIdType));

Check warning on line 78 in eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/config/SinkConnectorConfig.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/config/SinkConnectorConfig.java#L78

Added line #L78 was not covered by tests
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class LarkSinkConnector implements Sink {

Expand All @@ -60,11 +59,11 @@
* you can try to modify it.
*/
public static final Cache<String, String> AUTH_CACHE = CacheBuilder.newBuilder()
.initialCapacity(12)
.maximumSize(10)
.concurrencyLevel(5)
.expireAfterWrite(30, TimeUnit.MINUTES)
.build();
.initialCapacity(12)
.maximumSize(10)
.concurrencyLevel(5)
.expireAfterWrite(30, TimeUnit.MINUTES)
.build();

private LarkSinkConfig sinkConfig;

Expand All @@ -78,7 +77,8 @@
}

@Override
public void init(Config config) {}
public void init(Config config) {
}

Check warning on line 81 in eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java#L81

Added line #L81 was not covered by tests

@Override
public void init(ConnectorContext connectorContext) {
Expand Down Expand Up @@ -136,15 +136,15 @@
return AUTH_CACHE.get(TENANT_ACCESS_TOKEN, () -> {

Client client = Client.newBuilder(appId, appSecret)
.appType(AppType.SELF_BUILT)
.logReqAtDebug(true)
.build();
.appType(AppType.SELF_BUILT)
.logReqAtDebug(true)
.build();

Check warning on line 141 in eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java#L139-L141

Added lines #L139 - L141 were not covered by tests

TenantAccessTokenResp resp = client.ext().getTenantAccessTokenBySelfBuiltApp(
SelfBuiltTenantAccessTokenReq.newBuilder()
.appSecret(appSecret)
.appId(appId)
.build());
SelfBuiltTenantAccessTokenReq.newBuilder()
.appSecret(appSecret)
.appId(appId)
.build());

Check warning on line 147 in eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java#L144-L147

Added lines #L144 - L147 were not covered by tests
return resp.getTenantAccessToken();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ private void init() throws Exception {
when(message.create(any(), any())).thenReturn(new CreateMessageResp());
when(imService.message()).thenReturn(message);
Field imServiceField = ReflectionSupport.findFields(imServiceHandler.getClass(),
(f) -> f.getName().equals("imService"),
HierarchyTraversalMode.BOTTOM_UP).get(0);
(f) -> f.getName().equals("imService"),
HierarchyTraversalMode.BOTTOM_UP).get(0);
imServiceField.setAccessible(true);
imServiceField.set(imServiceHandler, imService);
}
Expand All @@ -106,7 +106,7 @@ private void regularSink() throws Exception {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(), "test-lark".getBytes(StandardCharsets.UTF_8));
System.currentTimeMillis(), "test-lark".getBytes(StandardCharsets.UTF_8));
if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
imServiceHandler.sinkAsync(connectRecord);
long retryDelayInMills = Long.parseLong(sinkConnectorConfig.getRetryDelayInMills());
Expand Down Expand Up @@ -148,7 +148,7 @@ private void retrySink() throws Exception {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(), "test-lark".getBytes(StandardCharsets.UTF_8));
System.currentTimeMillis(), "test-lark".getBytes(StandardCharsets.UTF_8));
if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) {
imServiceHandler.sinkAsync(connectRecord);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testPut() throws Exception {
RecordPartition partition = new RecordPartition();
RecordOffset offset = new RecordOffset();
ConnectRecord connectRecord = new ConnectRecord(partition, offset,
System.currentTimeMillis(), "test-lark".getBytes(StandardCharsets.UTF_8));
System.currentTimeMillis(), "test-lark".getBytes(StandardCharsets.UTF_8));
connectRecords.add(connectRecord);
}
larkSinkConnector.put(connectRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@
import okhttp3.Response;
import okhttp3.ResponseBody;



@ExtendWith(MockitoExtension.class)
public class WeChatSinkConnectorTest {

Expand Down Expand Up @@ -106,8 +104,8 @@ public void testSendMessageToWeChat() throws Exception {
.body(sendMessageBody)
.message("ok")
.build();
ArgumentMatcher<Request> sendMessageMatcher = (anyRequest) ->
sendMessageRequest.url().encodedPath().startsWith(anyRequest.url().encodedPath());
ArgumentMatcher<Request> sendMessageMatcher =
(anyRequest) -> sendMessageRequest.url().encodedPath().startsWith(anyRequest.url().encodedPath());
Call sendMessageRequestCall = Mockito.mock(Call.class);
Mockito.doReturn(sendMessageRequestCall).when(okHttpClient).newCall(Mockito.argThat(sendMessageMatcher));
Mockito.doReturn(sendMessageResponse).when(sendMessageRequestCall).execute();
Expand Down Expand Up @@ -137,8 +135,8 @@ public void testSendMessageToWeChatAbnormally() throws Exception {
.body(sendMessageBody)
.message("ok")
.build();
ArgumentMatcher<Request> sendMessageMatcher = (anyRequest) ->
sendMessageRequest.url().encodedPath().startsWith(anyRequest.url().encodedPath());
ArgumentMatcher<Request> sendMessageMatcher =
(anyRequest) -> sendMessageRequest.url().encodedPath().startsWith(anyRequest.url().encodedPath());
Call sendMessageRequestCall = Mockito.mock(Call.class);
Mockito.doReturn(sendMessageRequestCall).when(okHttpClient).newCall(Mockito.argThat(sendMessageMatcher));
Mockito.doReturn(sendMessageResponse).when(sendMessageRequestCall).execute();
Expand Down
Loading
Loading