Skip to content

Commit

Permalink
⬆️ opt dubbo version to 3.2.9 fix rpc rocketmq
Browse files Browse the repository at this point in the history
  • Loading branch information
songxiaosheng committed Jan 28, 2024
1 parent a2f5096 commit dcfd1cf
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 21 deletions.
5 changes: 5 additions & 0 deletions dubbo-rpc-extensions/dubbo-rpc-rocketmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,10 @@
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,7 @@
package org.apache.dubbo.rpc.rocketmq.codec;


import static org.apache.dubbo.common.BaseServiceMetadata.keyWithoutGroup;
import static org.apache.dubbo.common.URL.buildKey;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.rpc.Constants.SERIALIZATION_ID_KEY;
import static org.apache.dubbo.rpc.Constants.SERIALIZATION_SECURITY_CHECK_KEY;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.serialize.Cleanable;
Expand All @@ -40,6 +32,7 @@
import org.apache.dubbo.remoting.Decodeable;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.transport.CodecSupport;
import org.apache.dubbo.remoting.utils.UrlUtils;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
Expand All @@ -48,6 +41,7 @@
import org.apache.dubbo.rpc.model.ModuleModel;
import org.apache.dubbo.rpc.model.ProviderModel;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
import org.apache.dubbo.rpc.support.RpcUtils;

import java.io.IOException;
Expand All @@ -56,6 +50,15 @@
import java.util.List;
import java.util.Map;

import static org.apache.dubbo.common.BaseServiceMetadata.keyWithoutGroup;
import static org.apache.dubbo.common.URL.buildKey;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.rpc.Constants.SERIALIZATION_ID_KEY;
import static org.apache.dubbo.rpc.Constants.SERIALIZATION_SECURITY_CHECK_KEY;

@SuppressWarnings({"deprecation", "serial"})
public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {

Expand Down Expand Up @@ -104,8 +107,9 @@ public void encode(Channel channel, OutputStream output, Object message) throws

@Override
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
URL url = channel.getUrl();
ObjectInput in = CodecSupport.getSerialization(url)
.deserialize(url, input);
this.put(SERIALIZATION_ID_KEY, serializationType);

String dubboVersion = in.readUTF();
Expand All @@ -124,9 +128,6 @@ public Object decode(Channel channel, InputStream input) throws IOException {

ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();
try {
if (Boolean.parseBoolean(System.getProperty(SERIALIZATION_SECURITY_CHECK_KEY, "true"))) {
CodecSupport.checkSerialization(frameworkModel.getServiceRepository(), path, version, serializationType);
}
Object[] args = RocketMQCodec.EMPTY_OBJECT_ARRAY;
Class<?>[] pts = RocketMQCodec.EMPTY_CLASS_ARRAY;
if (desc.length() > 0) {
Expand Down Expand Up @@ -222,5 +223,11 @@ public Object decode(Channel channel, InputStream input) throws IOException {
}
return this;
}
private static String convertHessianFromWrapper(String serializeType) {
if (TripleConstant.HESSIAN4.equals(serializeType)) {
return TripleConstant.HESSIAN2;
}
return serializeType;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public Object decode(Channel channel, InputStream input) throws IOException {
if (invocation != null && invocation.getServiceModel() != null) {
Thread.currentThread().setContextClassLoader(invocation.getServiceModel().getClassLoader());
}
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
ObjectInput in = CodecSupport.getSerialization(channel.getUrl())
.deserialize(channel.getUrl(), input);

byte flag = in.readByte();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@

package org.apache.dubbo.rpc.rocketmq.codec;

import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;

import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.io.Bytes;
import org.apache.dubbo.common.logger.Logger;
Expand All @@ -45,6 +40,11 @@
import java.io.IOException;
import java.io.InputStream;

import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;


/**
* Dubbo codec.
Expand Down Expand Up @@ -95,7 +95,7 @@ protected Object decodeBody(Channel channel, InputStream is, byte[] header) thro
data = decodeEventData(channel, in, eventPayload);
}
} else {
DecodeableRpcResult result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto);
DecodeableRpcResult result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(channel, res, id), proto);
result.decode();
data = result;
}
Expand Down

0 comments on commit dcfd1cf

Please sign in to comment.