diff --git a/dubbo-rpc-extensions/dubbo-rpc-rocketmq/pom.xml b/dubbo-rpc-extensions/dubbo-rpc-rocketmq/pom.xml index fe7aefb4b..b1641a49b 100644 --- a/dubbo-rpc-extensions/dubbo-rpc-rocketmq/pom.xml +++ b/dubbo-rpc-extensions/dubbo-rpc-rocketmq/pom.xml @@ -75,5 +75,10 @@ rocketmq-client ${rocketmq.version} + + org.apache.dubbo + dubbo + true + diff --git a/dubbo-rpc-extensions/dubbo-rpc-rocketmq/src/main/java/org/apache/dubbo/rpc/rocketmq/codec/DecodeableRpcInvocation.java b/dubbo-rpc-extensions/dubbo-rpc-rocketmq/src/main/java/org/apache/dubbo/rpc/rocketmq/codec/DecodeableRpcInvocation.java index 28610b567..f894e9212 100644 --- a/dubbo-rpc-extensions/dubbo-rpc-rocketmq/src/main/java/org/apache/dubbo/rpc/rocketmq/codec/DecodeableRpcInvocation.java +++ b/dubbo-rpc-extensions/dubbo-rpc-rocketmq/src/main/java/org/apache/dubbo/rpc/rocketmq/codec/DecodeableRpcInvocation.java @@ -18,15 +18,7 @@ package org.apache.dubbo.rpc.rocketmq.codec; -import static org.apache.dubbo.common.BaseServiceMetadata.keyWithoutGroup; -import static org.apache.dubbo.common.URL.buildKey; -import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; -import static org.apache.dubbo.rpc.Constants.SERIALIZATION_ID_KEY; -import static org.apache.dubbo.rpc.Constants.SERIALIZATION_SECURITY_CHECK_KEY; - +import org.apache.dubbo.common.URL; import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.serialize.Cleanable; @@ -40,6 +32,7 @@ import org.apache.dubbo.remoting.Decodeable; import org.apache.dubbo.remoting.exchange.Request; import org.apache.dubbo.remoting.transport.CodecSupport; +import org.apache.dubbo.remoting.utils.UrlUtils; import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.model.ApplicationModel; import org.apache.dubbo.rpc.model.FrameworkModel; @@ -48,6 +41,7 @@ import org.apache.dubbo.rpc.model.ModuleModel; import org.apache.dubbo.rpc.model.ProviderModel; import org.apache.dubbo.rpc.model.ServiceDescriptor; +import org.apache.dubbo.rpc.protocol.tri.TripleConstant; import org.apache.dubbo.rpc.support.RpcUtils; import java.io.IOException; @@ -56,6 +50,15 @@ import java.util.List; import java.util.Map; +import static org.apache.dubbo.common.BaseServiceMetadata.keyWithoutGroup; +import static org.apache.dubbo.common.URL.buildKey; +import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; +import static org.apache.dubbo.rpc.Constants.SERIALIZATION_ID_KEY; +import static org.apache.dubbo.rpc.Constants.SERIALIZATION_SECURITY_CHECK_KEY; + @SuppressWarnings({"deprecation", "serial"}) public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable { @@ -104,8 +107,9 @@ public void encode(Channel channel, OutputStream output, Object message) throws @Override public Object decode(Channel channel, InputStream input) throws IOException { - ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) - .deserialize(channel.getUrl(), input); + URL url = channel.getUrl(); + ObjectInput in = CodecSupport.getSerialization(url) + .deserialize(url, input); this.put(SERIALIZATION_ID_KEY, serializationType); String dubboVersion = in.readUTF(); @@ -124,9 +128,6 @@ public Object decode(Channel channel, InputStream input) throws IOException { ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader(); try { - if (Boolean.parseBoolean(System.getProperty(SERIALIZATION_SECURITY_CHECK_KEY, "true"))) { - CodecSupport.checkSerialization(frameworkModel.getServiceRepository(), path, version, serializationType); - } Object[] args = RocketMQCodec.EMPTY_OBJECT_ARRAY; Class[] pts = RocketMQCodec.EMPTY_CLASS_ARRAY; if (desc.length() > 0) { @@ -222,5 +223,11 @@ public Object decode(Channel channel, InputStream input) throws IOException { } return this; } + private static String convertHessianFromWrapper(String serializeType) { + if (TripleConstant.HESSIAN4.equals(serializeType)) { + return TripleConstant.HESSIAN2; + } + return serializeType; + } } diff --git a/dubbo-rpc-extensions/dubbo-rpc-rocketmq/src/main/java/org/apache/dubbo/rpc/rocketmq/codec/DecodeableRpcResult.java b/dubbo-rpc-extensions/dubbo-rpc-rocketmq/src/main/java/org/apache/dubbo/rpc/rocketmq/codec/DecodeableRpcResult.java index a4e5205f6..c21f72fc4 100644 --- a/dubbo-rpc-extensions/dubbo-rpc-rocketmq/src/main/java/org/apache/dubbo/rpc/rocketmq/codec/DecodeableRpcResult.java +++ b/dubbo-rpc-extensions/dubbo-rpc-rocketmq/src/main/java/org/apache/dubbo/rpc/rocketmq/codec/DecodeableRpcResult.java @@ -84,7 +84,7 @@ public Object decode(Channel channel, InputStream input) throws IOException { if (invocation != null && invocation.getServiceModel() != null) { Thread.currentThread().setContextClassLoader(invocation.getServiceModel().getClassLoader()); } - ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) + ObjectInput in = CodecSupport.getSerialization(channel.getUrl()) .deserialize(channel.getUrl(), input); byte flag = in.readByte(); diff --git a/dubbo-rpc-extensions/dubbo-rpc-rocketmq/src/main/java/org/apache/dubbo/rpc/rocketmq/codec/RocketMQCodec.java b/dubbo-rpc-extensions/dubbo-rpc-rocketmq/src/main/java/org/apache/dubbo/rpc/rocketmq/codec/RocketMQCodec.java index edfb32dd3..0db43c884 100644 --- a/dubbo-rpc-extensions/dubbo-rpc-rocketmq/src/main/java/org/apache/dubbo/rpc/rocketmq/codec/RocketMQCodec.java +++ b/dubbo-rpc-extensions/dubbo-rpc-rocketmq/src/main/java/org/apache/dubbo/rpc/rocketmq/codec/RocketMQCodec.java @@ -17,11 +17,6 @@ package org.apache.dubbo.rpc.rocketmq.codec; -import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY; -import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; - import org.apache.dubbo.common.Version; import org.apache.dubbo.common.io.Bytes; import org.apache.dubbo.common.logger.Logger; @@ -45,6 +40,11 @@ import java.io.IOException; import java.io.InputStream; +import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; + /** * Dubbo codec. @@ -95,7 +95,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro data = decodeEventData(channel, in, eventPayload); } } else { - DecodeableRpcResult result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto); + DecodeableRpcResult result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(channel, res, id), proto); result.decode(); data = result; }