From fdd87969e34e6e831a420aeb9cfbc6776ecaeec7 Mon Sep 17 00:00:00 2001 From: risk <399284508@qq.com> Date: Mon, 11 Dec 2023 23:28:06 +0800 Subject: [PATCH 1/3] Add condition for transforming non-data values --- .../eventmesh/transformer/JsonPathParser.java | 14 ++++++++++---- .../eventmesh/transformer/TransformTest.java | 10 ++++++++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java index a0ebde12d2..86b825696f 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java +++ b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/JsonPathParser.java @@ -48,12 +48,18 @@ public JsonPathParser(String jsonPathString) { Map.Entry entry = fields.next(); String name = entry.getKey(); JsonNode valueNode = entry.getValue(); - if (valueNode.isValueNode()) { - variablesList.add(new Variable(name, valueNode.asText())); - } else { - throw new TransformException("invalid config:" + jsonPathString); + + if (!valueNode.isValueNode()) { + throw new TransformException("invalid config: " + jsonPathString); } + String valueText = valueNode.asText(); + if (valueText.startsWith("$") && !valueText.startsWith("$.data")) { + throw new TransformException("invalid config: unsupported value " + jsonPathString); + } + + variablesList.add(new Variable(name, valueText)); + } } diff --git a/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java b/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java index a55cde0baf..904671cdac 100644 --- a/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java +++ b/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java @@ -139,4 +139,14 @@ public void testTemplateTransFormerWithConstant() throws JsonProcessingException output); } + @Test + public void testTemplateTransFormerWithNonDataItem() { + String extractJson = "{\"name\":\"$.data.name\",\"time\":\"$.time\"" + "}"; + String template = "Transformers test:data name is ${name}, time is ${time}"; + Throwable exception = Assertions.assertThrows(TransformException.class, ()->TransformerBuilder.buildTemplateTransFormer(extractJson, template)); + Assertions.assertEquals("invalid config: unsupported value {\"name\":\"$.data.name\",\"time\":\"$.time\"}" , exception.getMessage()); + } + + + } From a2d7da225f9fe88f937b501c6c01dd9b6adb2f6e Mon Sep 17 00:00:00 2001 From: pmupkin <399284508@qq.com> Date: Thu, 14 Dec 2023 00:04:19 +0800 Subject: [PATCH 2/3] fix CI issue v1 --- .../java/org/apache/eventmesh/transformer/TransformTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java b/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java index 904671cdac..80022d8571 100644 --- a/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java +++ b/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java @@ -143,6 +143,7 @@ public void testTemplateTransFormerWithConstant() throws JsonProcessingException public void testTemplateTransFormerWithNonDataItem() { String extractJson = "{\"name\":\"$.data.name\",\"time\":\"$.time\"" + "}"; String template = "Transformers test:data name is ${name}, time is ${time}"; + Throwable exception = Assertions.assertThrows(TransformException.class, ()->TransformerBuilder.buildTemplateTransFormer(extractJson, template)); Assertions.assertEquals("invalid config: unsupported value {\"name\":\"$.data.name\",\"time\":\"$.time\"}" , exception.getMessage()); } From f9cd80447577a4471e07414b5fa18985fd0fbe2d Mon Sep 17 00:00:00 2001 From: pmupkin <399284508@qq.com> Date: Sun, 17 Dec 2023 23:45:16 +0800 Subject: [PATCH 3/3] fix CI issue v2 --- .../lark/server/LarkConnectServer.java | 2 +- .../connector/lark/sink/ImServiceHandler.java | 164 +++++++++--------- .../lark/sink/config/SinkConnectorConfig.java | 10 +- .../sink/connector/LarkSinkConnector.java | 28 +-- .../lark/sink/ImServiceHandlerTest.java | 8 +- .../lark/sink/LarkSinkConnectorTest.java | 2 +- .../connector/WeChatSinkConnectorTest.java | 10 +- .../eventmesh/openconnect/SourceWorker.java | 4 +- .../transformer/TransformerBuilder.java | 1 - .../transformer/TransformerType.java | 1 + .../eventmesh/transformer/TransformTest.java | 7 +- 11 files changed, 117 insertions(+), 120 deletions(-) diff --git a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/server/LarkConnectServer.java b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/server/LarkConnectServer.java index 656beb8ccf..c6ec73c5a4 100644 --- a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/server/LarkConnectServer.java +++ b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/server/LarkConnectServer.java @@ -28,7 +28,7 @@ public class LarkConnectServer { public static void main(String[] args) throws Exception { LarkConnectServerConfig larkConnectServerConfig = ConfigUtil.parse(LarkConnectServerConfig.class, - Constants.CONNECT_SERVER_CONFIG_FILE_NAME); + Constants.CONNECT_SERVER_CONFIG_FILE_NAME); if (larkConnectServerConfig.isSinkEnable()) { Application application = new Application(); diff --git a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java index f97452e675..e61940ce3d 100644 --- a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java +++ b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandler.java @@ -93,14 +93,13 @@ public static ImServiceHandler create(SinkConnectorConfig sinkConnectorConfig) { ImServiceHandler imServiceHandler = new ImServiceHandler(); imServiceHandler.sinkConnectorConfig = sinkConnectorConfig; imServiceHandler.imService = Client.newBuilder(sinkConnectorConfig.getAppId(), sinkConnectorConfig.getAppSecret()) - .httpTransport(new OkHttpTransport(new OkHttpClient().newBuilder() - .callTimeout(3L, TimeUnit.SECONDS) - .build()) - ) - .disableTokenCache() - .requestTimeout(3, TimeUnit.SECONDS) - .build() - .im(); + .httpTransport(new OkHttpTransport(new OkHttpClient().newBuilder() + .callTimeout(3L, TimeUnit.SECONDS) + .build())) + .disableTokenCache() + .requestTimeout(3, TimeUnit.SECONDS) + .build() + .im(); long fixedWait = Long.parseLong(sinkConnectorConfig.getRetryDelayInMills()); int maxRetryTimes = Integer.parseInt(sinkConnectorConfig.getMaxRetryTimes()) + 1; @@ -128,24 +127,25 @@ public static ImServiceHandler create(SinkConnectorConfig sinkConnectorConfig) { }); } else { imServiceHandler.retryer = RetryerBuilder.newBuilder() - .retryIfException() - .retryIfResult(Objects::nonNull) - .withWaitStrategy(WaitStrategies.fixedWait(fixedWait, TimeUnit.MILLISECONDS)) - .withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes)) - .withRetryListener(new RetryListener() { - @SneakyThrows - @Override - public void onRetry(Attempt attempt) { - - long times = attempt.getAttemptNumber(); - if (times > 1) { - redoSinkNum.increment(); - log.info("Total redo sink task num : [{}]", redoSinkNum.sum()); - log.warn("Retry sink event to lark | times=[{}]", attempt.getAttemptNumber() - 1); - } + .retryIfException() + .retryIfResult(Objects::nonNull) + .withWaitStrategy(WaitStrategies.fixedWait(fixedWait, TimeUnit.MILLISECONDS)) + .withStopStrategy(StopStrategies.stopAfterAttempt(maxRetryTimes)) + .withRetryListener(new RetryListener() { + + @SneakyThrows + @Override + public void onRetry(Attempt attempt) { + + long times = attempt.getAttemptNumber(); + if (times > 1) { + redoSinkNum.increment(); + log.info("Total redo sink task num : [{}]", redoSinkNum.sum()); + log.warn("Retry sink event to lark | times=[{}]", attempt.getAttemptNumber() - 1); } - }) - .build(); + } + }) + .build(); } return imServiceHandler; @@ -156,9 +156,9 @@ public void sink(ConnectRecord connectRecord) throws ExecutionException, RetryEx headers.put("Content-Type", Lists.newArrayList("application/json; charset=utf-8")); RequestOptions requestOptions = RequestOptions.newBuilder() - .tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(), sinkConnectorConfig.getAppSecret())) - .headers(headers) - .build(); + .tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(), sinkConnectorConfig.getAppSecret())) + .headers(headers) + .build(); retryer.call(() -> { CreateMessageReq createMessageReq = convertCreateMessageReq(connectRecord); @@ -176,9 +176,9 @@ public void sinkAsync(ConnectRecord connectRecord) { headers.put("Content-Type", Lists.newArrayList("application/json; charset=utf-8")); RequestOptions requestOptions = RequestOptions.newBuilder() - .tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(), sinkConnectorConfig.getAppSecret())) - .headers(headers) - .build(); + .tenantAccessToken(getTenantAccessToken(sinkConnectorConfig.getAppId(), sinkConnectorConfig.getAppSecret())) + .headers(headers) + .build(); CreateMessageReq createMessageReq = convertCreateMessageReq(connectRecord); @@ -187,30 +187,30 @@ public void sinkAsync(ConnectRecord connectRecord) { LongAdder cnt = new LongAdder(); AtomicBoolean isAck = new AtomicBoolean(false); Runnable task = () -> CompletableFuture - .supplyAsync(() -> { - try { - cnt.increment(); - return imService.message().create(createMessageReq, requestOptions); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, sinkAsyncWorker) - .whenCompleteAsync((resp, e) -> { - if (cnt.sum() > 1) { - redoSinkNum.increment(); - log.info("Total redo sink task num : [{}]", redoSinkNum.sum()); - log.warn("Retry sink event to lark | times=[{}]", cnt.sum() - 1); - } - if (Objects.nonNull(e)) { - log.error("eventmesh-connector-lark internal exception.", e); - return; - } - if (resp.getCode() != 0) { - log.warn("Sinking event to lark failure | code:[{}] | msg:[{}] | err:[{}]", resp.getCode(), resp.getMsg(), resp.getError()); - return; - } - isAck.set(true); - }); + .supplyAsync(() -> { + try { + cnt.increment(); + return imService.message().create(createMessageReq, requestOptions); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, sinkAsyncWorker) + .whenCompleteAsync((resp, e) -> { + if (cnt.sum() > 1) { + redoSinkNum.increment(); + log.info("Total redo sink task num : [{}]", redoSinkNum.sum()); + log.warn("Retry sink event to lark | times=[{}]", cnt.sum() - 1); + } + if (Objects.nonNull(e)) { + log.error("eventmesh-connector-lark internal exception.", e); + return; + } + if (resp.getCode() != 0) { + log.warn("Sinking event to lark failure | code:[{}] | msg:[{}] | err:[{}]", resp.getCode(), resp.getMsg(), resp.getError()); + return; + } + isAck.set(true); + }); ScheduledFuture future = retryWorker.scheduleAtFixedRate(task, 0L, fixedWait, TimeUnit.MILLISECONDS); cleanerWorker.submit(() -> { @@ -226,8 +226,8 @@ public void sinkAsync(ConnectRecord connectRecord) { private CreateMessageReq convertCreateMessageReq(ConnectRecord connectRecord) { CreateMessageReqBody.Builder bodyBuilder = CreateMessageReqBody.newBuilder() - .receiveId(sinkConnectorConfig.getReceiveId()) - .uuid(UUID.randomUUID().toString()); + .receiveId(sinkConnectorConfig.getReceiveId()) + .uuid(UUID.randomUUID().toString()); String templateTypeKey = connectRecord.getExtension(ConnectRecordExtensionKeys.TEMPLATE_TYPE_4_LARK); if (null == templateTypeKey || "null".equals(templateTypeKey)) { @@ -236,18 +236,18 @@ private CreateMessageReq convertCreateMessageReq(ConnectRecord connectRecord) { LarkMessageTemplateType templateType = LarkMessageTemplateType.of(templateTypeKey); if (LarkMessageTemplateType.PLAIN_TEXT == templateType) { bodyBuilder.content(createTextContent(connectRecord)) - .msgType(MsgTypeEnum.MSG_TYPE_TEXT.getValue()); + .msgType(MsgTypeEnum.MSG_TYPE_TEXT.getValue()); } else if (LarkMessageTemplateType.MARKDOWN == templateType) { String title = Optional.ofNullable(connectRecord.getExtension(ConnectRecordExtensionKeys.MARKDOWN_MESSAGE_TITLE_4_LARK)) - .orElse("EventMesh-Message"); + .orElse("EventMesh-Message"); bodyBuilder.content(createInteractiveContent(connectRecord, title)) - .msgType(MsgTypeEnum.MSG_TYPE_INTERACTIVE.getValue()); + .msgType(MsgTypeEnum.MSG_TYPE_INTERACTIVE.getValue()); } return CreateMessageReq.newBuilder() - .receiveIdType(sinkConnectorConfig.getReceiveIdType()) - .createMessageReqBody(bodyBuilder.build()) - .build(); + .receiveIdType(sinkConnectorConfig.getReceiveIdType()) + .createMessageReqBody(bodyBuilder.build()) + .build(); } private String createTextContent(ConnectRecord connectRecord) { @@ -287,26 +287,26 @@ private String createInteractiveContent(ConnectRecord connectRecord, String titl sb.append(new String((byte[]) connectRecord.getData())); MessageCardConfig config = MessageCardConfig.newBuilder() - .enableForward(true) - .wideScreenMode(true) - .updateMulti(true) - .build(); + .enableForward(true) + .wideScreenMode(true) + .updateMulti(true) + .build(); // header MessageCardHeader header = MessageCardHeader.newBuilder() - .template(MessageCardHeaderTemplateEnum.BLUE) - .title(MessageCardPlainText.newBuilder() - .content(title) - .build()) - .build(); + .template(MessageCardHeaderTemplateEnum.BLUE) + .title(MessageCardPlainText.newBuilder() + .content(title) + .build()) + .build(); MessageCard content = MessageCard.newBuilder() - .config(config) - .header(header) - .elements(new MessageCardElement[]{ - MessageCardMarkdown.newBuilder().content(sb.toString()).build() - }) - .build(); + .config(config) + .header(header) + .elements(new MessageCardElement[]{ + MessageCardMarkdown.newBuilder().content(sb.toString()).build() + }) + .build(); return content.String(); } @@ -328,7 +328,7 @@ private String needAtUser(ConnectRecord connectRecord) { */ private void atAll(StringBuilder sb) { sb.append("") - .append(""); + .append(""); } /** @@ -339,8 +339,8 @@ private void atAll(StringBuilder sb) { */ private void atUser(StringBuilder sb, String userId) { sb.append("") - .append(""); + .append(userId) + .append(">") + .append(""); } } \ No newline at end of file diff --git a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/config/SinkConnectorConfig.java b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/config/SinkConnectorConfig.java index 84d4cc64ef..cde3aa6737 100644 --- a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/config/SinkConnectorConfig.java +++ b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/config/SinkConnectorConfig.java @@ -70,12 +70,12 @@ public void validateSinkConfiguration() { // validate receiveIdType if (!StringUtils.containsAny(receiveIdType, ReceiveIdTypeEnum.CHAT_ID.getValue(), - ReceiveIdTypeEnum.EMAIL.getValue(), - ReceiveIdTypeEnum.OPEN_ID.getValue(), - ReceiveIdTypeEnum.USER_ID.getValue(), - ReceiveIdTypeEnum.UNION_ID.getValue())) { + ReceiveIdTypeEnum.EMAIL.getValue(), + ReceiveIdTypeEnum.OPEN_ID.getValue(), + ReceiveIdTypeEnum.USER_ID.getValue(), + ReceiveIdTypeEnum.UNION_ID.getValue())) { throw new IllegalArgumentException( - String.format("sinkConnectorConfig.receiveIdType=[%s], Invalid.", receiveIdType)); + String.format("sinkConnectorConfig.receiveIdType=[%s], Invalid.", receiveIdType)); } } } diff --git a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java index b5c134f667..d1ee1caa40 100644 --- a/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-lark/src/main/java/org/apache/eventmesh/connector/lark/sink/connector/LarkSinkConnector.java @@ -44,7 +44,6 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; - @Slf4j public class LarkSinkConnector implements Sink { @@ -60,11 +59,11 @@ public class LarkSinkConnector implements Sink { * you can try to modify it. */ public static final Cache AUTH_CACHE = CacheBuilder.newBuilder() - .initialCapacity(12) - .maximumSize(10) - .concurrencyLevel(5) - .expireAfterWrite(30, TimeUnit.MINUTES) - .build(); + .initialCapacity(12) + .maximumSize(10) + .concurrencyLevel(5) + .expireAfterWrite(30, TimeUnit.MINUTES) + .build(); private LarkSinkConfig sinkConfig; @@ -78,7 +77,8 @@ public Class configClass() { } @Override - public void init(Config config) {} + public void init(Config config) { + } @Override public void init(ConnectorContext connectorContext) { @@ -136,15 +136,15 @@ public static String getTenantAccessToken(String appId, String appSecret) { return AUTH_CACHE.get(TENANT_ACCESS_TOKEN, () -> { Client client = Client.newBuilder(appId, appSecret) - .appType(AppType.SELF_BUILT) - .logReqAtDebug(true) - .build(); + .appType(AppType.SELF_BUILT) + .logReqAtDebug(true) + .build(); TenantAccessTokenResp resp = client.ext().getTenantAccessTokenBySelfBuiltApp( - SelfBuiltTenantAccessTokenReq.newBuilder() - .appSecret(appSecret) - .appId(appId) - .build()); + SelfBuiltTenantAccessTokenReq.newBuilder() + .appSecret(appSecret) + .appId(appId) + .build()); return resp.getTenantAccessToken(); }); } diff --git a/eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandlerTest.java b/eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandlerTest.java index 1b5fefa42f..f72232a533 100644 --- a/eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandlerTest.java +++ b/eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/ImServiceHandlerTest.java @@ -80,8 +80,8 @@ private void init() throws Exception { when(message.create(any(), any())).thenReturn(new CreateMessageResp()); when(imService.message()).thenReturn(message); Field imServiceField = ReflectionSupport.findFields(imServiceHandler.getClass(), - (f) -> f.getName().equals("imService"), - HierarchyTraversalMode.BOTTOM_UP).get(0); + (f) -> f.getName().equals("imService"), + HierarchyTraversalMode.BOTTOM_UP).get(0); imServiceField.setAccessible(true); imServiceField.set(imServiceHandler, imService); } @@ -106,7 +106,7 @@ private void regularSink() throws Exception { RecordPartition partition = new RecordPartition(); RecordOffset offset = new RecordOffset(); ConnectRecord connectRecord = new ConnectRecord(partition, offset, - System.currentTimeMillis(), "test-lark".getBytes(StandardCharsets.UTF_8)); + System.currentTimeMillis(), "test-lark".getBytes(StandardCharsets.UTF_8)); if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) { imServiceHandler.sinkAsync(connectRecord); long retryDelayInMills = Long.parseLong(sinkConnectorConfig.getRetryDelayInMills()); @@ -148,7 +148,7 @@ private void retrySink() throws Exception { RecordPartition partition = new RecordPartition(); RecordOffset offset = new RecordOffset(); ConnectRecord connectRecord = new ConnectRecord(partition, offset, - System.currentTimeMillis(), "test-lark".getBytes(StandardCharsets.UTF_8)); + System.currentTimeMillis(), "test-lark".getBytes(StandardCharsets.UTF_8)); if (Boolean.parseBoolean(sinkConnectorConfig.getSinkAsync())) { imServiceHandler.sinkAsync(connectRecord); diff --git a/eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/LarkSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/LarkSinkConnectorTest.java index 566bc1f273..658fa89223 100644 --- a/eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/LarkSinkConnectorTest.java +++ b/eventmesh-connectors/eventmesh-connector-lark/src/test/java/org/apache/eventmesh/connector/lark/sink/LarkSinkConnectorTest.java @@ -85,7 +85,7 @@ public void testPut() throws Exception { RecordPartition partition = new RecordPartition(); RecordOffset offset = new RecordOffset(); ConnectRecord connectRecord = new ConnectRecord(partition, offset, - System.currentTimeMillis(), "test-lark".getBytes(StandardCharsets.UTF_8)); + System.currentTimeMillis(), "test-lark".getBytes(StandardCharsets.UTF_8)); connectRecords.add(connectRecord); } larkSinkConnector.put(connectRecords); diff --git a/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java b/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java index 5920417ea7..d993468c18 100644 --- a/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java +++ b/eventmesh-connectors/eventmesh-connector-wechat/src/test/java/org/apache/eventmesh/connector/wechat/sink/connector/WeChatSinkConnectorTest.java @@ -55,8 +55,6 @@ import okhttp3.Response; import okhttp3.ResponseBody; - - @ExtendWith(MockitoExtension.class) public class WeChatSinkConnectorTest { @@ -106,8 +104,8 @@ public void testSendMessageToWeChat() throws Exception { .body(sendMessageBody) .message("ok") .build(); - ArgumentMatcher sendMessageMatcher = (anyRequest) -> - sendMessageRequest.url().encodedPath().startsWith(anyRequest.url().encodedPath()); + ArgumentMatcher sendMessageMatcher = + (anyRequest) -> sendMessageRequest.url().encodedPath().startsWith(anyRequest.url().encodedPath()); Call sendMessageRequestCall = Mockito.mock(Call.class); Mockito.doReturn(sendMessageRequestCall).when(okHttpClient).newCall(Mockito.argThat(sendMessageMatcher)); Mockito.doReturn(sendMessageResponse).when(sendMessageRequestCall).execute(); @@ -137,8 +135,8 @@ public void testSendMessageToWeChatAbnormally() throws Exception { .body(sendMessageBody) .message("ok") .build(); - ArgumentMatcher sendMessageMatcher = (anyRequest) -> - sendMessageRequest.url().encodedPath().startsWith(anyRequest.url().encodedPath()); + ArgumentMatcher sendMessageMatcher = + (anyRequest) -> sendMessageRequest.url().encodedPath().startsWith(anyRequest.url().encodedPath()); Call sendMessageRequestCall = Mockito.mock(Call.class); Mockito.doReturn(sendMessageRequestCall).when(okHttpClient).newCall(Mockito.argThat(sendMessageMatcher)); Mockito.doReturn(sendMessageResponse).when(sendMessageRequestCall).execute(); diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java index 2445382e72..bf9878e486 100644 --- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java +++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/SourceWorker.java @@ -330,7 +330,7 @@ public boolean commitOffsets() { log.info("{} Committing offsets for {} acknowledged messages", this, committableOffsets.numCommittableMessages()); if (committableOffsets.hasPending()) { log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. " - + "The source partition with the most pending messages is {}, with {} pending messages", + + "The source partition with the most pending messages is {}, with {} pending messages", this, committableOffsets.numUncommittableMessages(), committableOffsets.numDeques(), @@ -338,7 +338,7 @@ public boolean commitOffsets() { committableOffsets.largestDequeSize()); } else { log.debug("{} There are currently no pending messages for this offset commit; " - + "all messages dispatched to the task's producer since the last commit have been acknowledged", + + "all messages dispatched to the task's producer since the last commit have been acknowledged", this); } } diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java index 6e007b34b3..e7277af73c 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java +++ b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerBuilder.java @@ -32,7 +32,6 @@ public static Transformer buildTransformer(TransformerParam transformerParam) { } } - public static Transformer buildTemplateTransFormer(String jsonContent, String template) { JsonPathParser jsonPathParser = new JsonPathParser(jsonContent); Template templateEntry = new Template(template); diff --git a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java index 8ad06e48bb..2dc7809478 100644 --- a/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java +++ b/eventmesh-transformer/src/main/java/org/apache/eventmesh/transformer/TransformerType.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonValue; public enum TransformerType { + ORIGINAL(1, "original"), CONSTANT(2, "constant"), TEMPLATE(3, "template"); diff --git a/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java b/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java index 80022d8571..25419a2379 100644 --- a/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java +++ b/eventmesh-transformer/src/test/java/org/apache/eventmesh/transformer/TransformTest.java @@ -144,10 +144,9 @@ public void testTemplateTransFormerWithNonDataItem() { String extractJson = "{\"name\":\"$.data.name\",\"time\":\"$.time\"" + "}"; String template = "Transformers test:data name is ${name}, time is ${time}"; - Throwable exception = Assertions.assertThrows(TransformException.class, ()->TransformerBuilder.buildTemplateTransFormer(extractJson, template)); - Assertions.assertEquals("invalid config: unsupported value {\"name\":\"$.data.name\",\"time\":\"$.time\"}" , exception.getMessage()); + Throwable exception = + Assertions.assertThrows(TransformException.class, () -> TransformerBuilder.buildTemplateTransFormer(extractJson, template)); + Assertions.assertEquals("invalid config: unsupported value {\"name\":\"$.data.name\",\"time\":\"$.time\"}", exception.getMessage()); } - - }