Open
Description
Search before asking
- I had searched in the issues and found no similar issues.
Version
Java fury 0.10.2
<!-- 采用 jdk8 -->
[bolt-common-error.log](https://github.com/user-attachments/files/20453090/bolt-common-error.log)
[stable-6.2.130.60-bolt-common-error.log](https://github.com/user-attachments/files/20453089/stable-6.2.130.60-bolt-common-error.log)
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-rpc-all</artifactId>
<version>5.13.3</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
<version>1.6.10</version>
</dependency>
<dependency>
<groupId>org.apache.fury</groupId>
<artifactId>fury-core</artifactId>
<version>0.10.2</version>
</dependency>
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.30.2-GA</version>
</dependency>
Component(s)
Java
Minimal reproduce step
- SofaResponse
package com.alipay.sofa.rpc.core.response;
import com.alipay.sofa.rpc.transport.AbstractByteBuf;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
/**
* Sofa RPC Response class
*
* @author <a href=mailto:[email protected]>HongWei Yi</a>
*/
public final class SofaResponse implements Serializable {
private static final long serialVersionUID = -4364536436151723421L;
/**
* 框架异常
*/
private boolean isError = false;
/**
* 框架异常的消息
*/
private String errorMsg;
/**
* 业务返回或者业务异常
*/
private Object appResponse;
/**
* extensional properties
*/
private Map<String, String> responseProps;
//====================== 下面是非传递属性 ===============
/**
* 序列化类型
*/
private transient byte serializeType;
/**
* 数据
*/
private transient AbstractByteBuf data;
/**
* Gets app response.
*
* @return the app response
*/
public Object getAppResponse() {
return appResponse;
}
/**
* Sets app response.
*
* @param response the response
*/
public void setAppResponse(Object response) {
appResponse = response;
}
/**
* Is error boolean.
*
* @return the boolean
*/
public boolean isError() {
return isError;
}
/**
* Gets error msg.
*
* @return the error msg
*/
public String getErrorMsg() {
return errorMsg;
}
/**
* Sets error msg.
*
* @param error the error
*/
public void setErrorMsg(String error) {
if (error == null) {
return;
}
errorMsg = error;
isError = true;
}
/**
* Gets response prop.
*
* @param key the key
* @return the response prop
*/
public Object getResponseProp(String key) {
return responseProps == null ? null : responseProps.get(key);
}
/**
* Add response prop.
*
* @param key the key
* @param value the value
*/
public void addResponseProp(String key, String value) {
if (responseProps == null) {
responseProps = new HashMap<String, String>(16);
}
if (key != null && value != null) {
responseProps.put(key, value);
}
}
/**
* Remove response props.
*
* @param key the key
*/
public void removeResponseProp(String key) {
if (responseProps != null && key != null) {
responseProps.remove(key);
}
}
/**
* Gets response props.
*
* @return the response props
*/
public Map<String, String> getResponseProps() {
return responseProps;
}
/**
* Sets response props.
*
* @param responseProps the response props
*/
public void setResponseProps(Map<String, String> responseProps) {
this.responseProps = responseProps;
}
/**
* Gets serialize type.
*
* @return the serialize type
*/
public byte getSerializeType() {
return serializeType;
}
/**
* Sets serialize type.
*
* @param serializeType the serialize type
* @return the serialize type
*/
public SofaResponse setSerializeType(byte serializeType) {
this.serializeType = serializeType;
return this;
}
/**
* Gets data.
*
* @return the data
*/
public AbstractByteBuf getData() {
return data;
}
/**
* Sets data.
*
* @param data the data
* @return the data
*/
public SofaResponse setData(AbstractByteBuf data) {
this.data = data;
return this;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(128);
sb.append("SofaResponse[");
sb.append("sofa-rpc exception=").append(isError).append(", ");
sb.append("sofa-rpc errorMsg=").append(errorMsg).append(", ");
sb.append("appResponse=").append(appResponse).append("]");
return sb.toString();
}
}
- PollingResponse
public class PollingResponse implements Serializable {
public static int ERROR_CODE_NOT_READY_FOR_SERVICE = 1;
public static int ERROR_CODE_SERVER_REJECT = 2;
private List<String> nameList;
private List<VipDomain> vipDomains;
private Map<String, Object> extensionParams;
private long startTime;
private long acceptTime;
private int errorCode = 0;
private String errorMsg;
public void serialExtensionListToString() {
if (null == extensionParams || extensionParams.isEmpty()) {
extensionParams = new HashMap<String, Object>();
return;
}
for (Entry<String, Object> entry : extensionParams.entrySet()) {
if (entry.getValue() instanceof List<?>) {
extensionParams.put(entry.getKey(),
StringUtils.join((List<String>) entry.getValue(), ","));
}
}
}
public long getTransmissionTime() {
return this.acceptTime - this.startTime;
}
public boolean isSuccess() {
return this.errorCode == 0;
}
public void checkSuccess() throws AntVipResponseException {
if (!this.isSuccess()) {
throw new AntVipResponseException(this.getPrettyErrorMsg());
}
}
public String getPrettyErrorMsg() {
return this.errorMsg + "(errorCode:" + this.errorCode + ")";
}
@Override
public String toString() {
return String.format(
"PollingResponse [errorCode=%s, errorMsg=%s, nameList'size=%s, vipDomains'size=%s, startTime=%s, acceptTime=%s, extensionParams=%s]",
this.errorCode, this.errorMsg, (this.nameList != null ? this.nameList.size() : null),
(this.vipDomains != null ? this.vipDomains.size() : null), this.startTime,
this.acceptTime, this.extensionParams);
}
}
- VipDomain
public class VipDomain implements Serializable {
private String name;
private transient String app;
private transient String zone;
private transient String station;
private transient String env;
private int protectThreshold = 50;
private int idcDisasterProtect;
private transient boolean enable = false;
private HealthCheckType healthCheckType = HealthCheckType.TCP;
private int healthCheckDefaultPort = 8080;
private int healthCheckTimeout = 2000;
private int healthCheckInterval = 5000;
private int healthCheckRaise = 1;
private int healthCheckFall = 3;
private Map<String, Object> healthCheckPayload;
private Boolean healthCheckEnable;
private long version = 1;
private transient Date lastAccessTime;
private transient String checksumForClient;
private transient String checksumForClientNewVersion;
private transient String checksumForClientCompatible;
private transient String checksumForServer;
private List<RealNode> realNodes;
private Map<String, String> domainLabels;
private Map<String, Object> span;
public void addSpan(String spanKey, Object spanVal) {
if (span == null) {
this.span = new ConcurrentHashMap<>();
}
this.span.put(spanKey, spanVal);
}
@Override
public String toString() {
return "VipDomain [name=" + name + ", app=" + app + ", zone=" + zone + ", station="
+ station + ", env=" + env + ", protectThreshold=" + protectThreshold
+ ", idcDisasterProtect=" + idcDisasterProtect + ", enable=" + enable
+ ", healthCheckType=" + healthCheckType + ", healthCheckDefaultPort="
+ healthCheckDefaultPort + ", healthCheckTimeout=" + healthCheckTimeout
+ ", healthCheckInterval=" + healthCheckInterval + ", healthCheckRaise="
+ healthCheckRaise + ", healthCheckFall=" + healthCheckFall + ", healthCheckPayload="
+ healthCheckPayload + ", healthCheckEnable=" + healthCheckEnable + ", version="
+ version + ", lastAccessTime=" + lastAccessTime + ", checksumForClient="
+ checksumForClient + ", checksumForClientNewVersion=" + checksumForClientNewVersion
+ ", checksumForClientCompatible=" + checksumForClientCompatible
+ ", checksumForServer=" + checksumForServer + ", realNodes=" + realNodes
+ ", domainLabels=" + domainLabels + ", span=" + span + "]";
}
}
- RealNode
public class RealNode implements Serializable {
private transient String domainName;
private String ip;
private transient String fqdn;
private int weight = 1;
private Integer healthCheckPort = 0;
private transient boolean enable = false;
private String zone;
private volatile Boolean available;
private long roundTripTime = -1;
private String reason;
private Date lastHealthCheckTime;
private transient AtomicInteger raisingCount = new AtomicInteger(0);
private transient AtomicInteger fallingCount = new AtomicInteger(0);
private transient VipDomain vipDomain;
private Map<String, String> rnLabels;
public boolean isFalling() {
return this.fallingCount.get() > 0;
}
public boolean isRaising() {
return this.raisingCount.get() > 0;
}
public String getEffectiveHealtchCheckHost() {
return this.ip + ":" + this.getEffectiveHealthCheckPort();
}
public int getEffectiveHealthCheckPort() {
return this.healthCheckPort <= 0 ? this.getVipDomain().getHealthCheckDefaultPort()
: this.healthCheckPort;
}
// FIXME 此处不应该打印 vipDomain,否则会循环打印,报错 java.lang.StackOverflowError
@Override
public String toString() {
return "RealNode [domainName=" + domainName + ", ip=" + ip + ", fqdn=" + fqdn + ", weight="
+ weight + ", healthCheckPort=" + healthCheckPort + ", enable=" + enable + ", zone="
+ zone + ", available=" + available + ", roundTripTime=" + roundTripTime
+ ", reason=" + reason + ", lastHealthCheckTime=" + lastHealthCheckTime
+ ", raisingCount=" + raisingCount + ", fallingCount=" + fallingCount
+ ", vipDomain=" + vipDomain + ", rnLabels=" + rnLabels + "]";
}
}
- Fury010Serializer
import com.alipay.antvip.server.sofarpc.fury.serialize.SofaRequestFurySerializer;
import com.alipay.antvip.server.sofarpc.fury.serialize.SofaResponseFurySerializer;
import com.alipay.sofa.common.config.SofaConfigs;
import com.alipay.sofa.rpc.codec.AbstractSerializer;
import com.alipay.sofa.rpc.codec.CustomSerializer;
import com.alipay.sofa.rpc.codec.common.BlackAndWhiteListFileLoader;
import com.alipay.sofa.rpc.codec.common.SerializeCheckStatus;
import com.alipay.sofa.rpc.common.config.RpcConfigKeys;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.ext.Extension;
import com.alipay.sofa.rpc.transport.AbstractByteBuf;
import com.alipay.sofa.rpc.transport.ByteArrayWrapperByteBuf;
import org.apache.fury.Fury;
import org.apache.fury.ThreadLocalFury;
import org.apache.fury.ThreadSafeFury;
import org.apache.fury.config.Language;
import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.resolver.AllowListChecker;
import java.util.List;
import java.util.Map;
import static org.apache.fury.config.CompatibleMode.COMPATIBLE;
/**
* Fury010Serializer.
* copy from sofa-rpc-all 5.13.3 and change `io.fury.*` to `org.apache.fury.*`
* <p>
* <li>自定义序列化 apache fury 0.10.0 版本,后续再升级 sofa-rpc 集成 apache fury 1.0(有 Hessian 作回切然后再切 fury 1.0 版本);</li>
* <li>修复 sofa-rpc 一个问题,它会清理掉 Fury 造成下次 ThreadLocalFury 又要构造 Fury 对象; fury.clearClassLoader(contextClassLoader); </li>
* </p>
*
* @link resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.codec.Serializer
* @see com.alipay.sofa.rpc.codec.fury.FurySerializer
*/
@Extension(value = "fury010", code = 122)
public class Fury010Serializer extends AbstractSerializer {
protected final ThreadSafeFury fury;
private final String checkerMode = SofaConfigs.getOrDefault(RpcConfigKeys.SERIALIZE_CHECKER_MODE);
public Fury010Serializer() {
fury = new ThreadLocalFury(classLoader -> {
Fury f = Fury.builder().withLanguage(Language.JAVA)
.withRefTracking(true)
.withCodegen(true)
.withNumberCompressed(true)
.withCompatibleMode(COMPATIBLE)
.requireClassRegistration(false)
.withClassLoader(classLoader)
.withAsyncCompilation(true)
.build();
// Do not use any configuration
if (checkerMode.equalsIgnoreCase(SerializeCheckStatus.DISABLE.name())) {
AllowListChecker noChecker = new AllowListChecker(AllowListChecker.CheckLevel.DISABLE);
f.getClassResolver().setClassChecker(noChecker);
return f;
} else if (checkerMode.equalsIgnoreCase(SerializeCheckStatus.WARN.name())) {
AllowListChecker blackListChecker = new AllowListChecker(AllowListChecker.CheckLevel.WARN);
List<String> blackList = BlackAndWhiteListFileLoader.SOFA_SERIALIZE_BLACK_LIST;
// To setting checker
f.getClassResolver().setClassChecker(blackListChecker);
blackListChecker.addListener(f.getClassResolver());
// BlackList classes use wildcards
for (String key : blackList) {
blackListChecker.disallowClass(key + "*");
}
} else if (checkerMode.equalsIgnoreCase(SerializeCheckStatus.STRICT.name())) {
AllowListChecker blackAndWhiteListChecker = new AllowListChecker(AllowListChecker.CheckLevel.STRICT);
List<String> whiteList = BlackAndWhiteListFileLoader.SOFA_SERIALIZER_WHITE_LIST;
// To setting checker
f.getClassResolver().setClassChecker(blackAndWhiteListChecker);
blackAndWhiteListChecker.addListener(f.getClassResolver());
// WhiteList classes use wildcards
for (String key : whiteList) {
blackAndWhiteListChecker.allowClass(key + "*");
}
List<String> blackList = BlackAndWhiteListFileLoader.SOFA_SERIALIZE_BLACK_LIST;
// To setting checker
f.getClassResolver().setClassChecker(blackAndWhiteListChecker);
blackAndWhiteListChecker.addListener(f.getClassResolver());
// BlackList classes use wildcards
for (String key : blackList) {
blackAndWhiteListChecker.disallowClass(key + "*");
}
}
f.register(SofaRequest.class);
f.register(SofaResponse.class);
f.register(SofaRpcException.class);
return f;
});
addCustomSerializer(SofaRequest.class, new SofaRequestFurySerializer(fury));
addCustomSerializer(SofaResponse.class, new SofaResponseFurySerializer(fury));
}
@Override
public AbstractByteBuf encode(final Object object, final Map<String, String> context) throws SofaRpcException {
if (object == null) {
throw buildSerializeError("Unsupported null message!");
}
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
fury.setClassLoader(contextClassLoader);
CustomSerializer customSerializer = getObjCustomSerializer(object);
if (customSerializer != null) {
return customSerializer.encodeObject(object, context);
} else {
MemoryBuffer writeBuffer = MemoryBuffer.newHeapBuffer(32);
writeBuffer.writerIndex(0);
fury.serialize(writeBuffer, object);
return new ByteArrayWrapperByteBuf(writeBuffer.getBytes(0, writeBuffer.writerIndex()));
}
} catch (Exception e) {
throw buildSerializeError(e.getMessage(), e);
//} finally {
//fury.clearClassLoader(contextClassLoader);
}
}
@Override
public Object decode(final AbstractByteBuf data, final Class clazz, final Map<String, String> context)
throws SofaRpcException {
if (data.readableBytes() <= 0 || clazz == null) {
throw buildDeserializeError("Deserialized array is empty.");
}
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
fury.setClassLoader(contextClassLoader);
CustomSerializer customSerializer = getCustomSerializer(clazz);
if (customSerializer != null) {
return customSerializer.decodeObject(data, context);
} else {
MemoryBuffer readBuffer = MemoryBuffer.fromByteArray(data.array());
return fury.deserialize(readBuffer);
}
} catch (Exception e) {
throw buildDeserializeError(e.getMessage(), e);
//} finally {
//fury.clearClassLoader(contextClassLoader);
}
}
@Override
public void decode(final AbstractByteBuf data, final Object template, final Map<String, String> context)
throws SofaRpcException {
if (template == null) {
throw buildDeserializeError("template is null!");
}
ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
fury.setClassLoader(contextClassLoader);
CustomSerializer customSerializer = getObjCustomSerializer(template);
if (customSerializer != null) {
customSerializer.decodeObjectByTemplate(data, context, template);
} else {
throw buildDeserializeError("Only support decode from SofaRequest and SofaResponse template");
}
} catch (Exception e) {
throw buildDeserializeError(e.getMessage(), e);
//} finally {
//fury.clearClassLoader(contextClassLoader);
}
}
}
- src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.codec.Serializer
fury010=com.alipay.antvip.server.sofarpc.fury.Fury010Serializer
- testFury010SerializerReproduce
@Test
public void testFury010SerializerReproduce() {
System.setProperty("sofa.rpc.codec.serialize.checkMode", "DISABLE");
System.setProperty("FURY_CODE_DIR", ".code");
Fury010Serializer serializer = new Fury010Serializer();
SofaResponse sofaResponse = new SofaResponse();
VipDomain vipDomain = new VipDomain();
vipDomain.setName("obcrmcore-dev-pool.stable.alipay.net");
vipDomain.setVersion(38350);
vipDomain.setProtectThreshold(30);
vipDomain.setHealthCheckDefaultPort(12200);
vipDomain.setHealthCheckPayload(Collections.emptyMap());
vipDomain.setHealthCheckEnable(false);
vipDomain.addSpan("dbTs", 1747817528000L);
vipDomain.addSpan("clientSendTs", 1747817529332L);
vipDomain.addSpan("healthCheckInitTs", 1747817529306L);
vipDomain.addSpan("action", "modify");
vipDomain.addSpan("webSyncRecvTs", 1747817529306L);
vipDomain.addSpan("type", "domainChange");
RealNode realNode1 = new RealNode();
realNode1.setIp("11.122.210.60");
realNode1.setEnable(false);
realNode1.setZone("gz00b");
realNode1.setAvailable(false);
realNode1.setRoundTripTime(2000L);
realNode1.setVipDomain(vipDomain);
realNode1.setRnLabels(Collections.singletonMap("sofa_group", "GROUP_20221025160042"));
RealNode realNode2 = new RealNode();
realNode2.setIp("6.0.124.11");
realNode2.setEnable(false);
realNode2.setZone("gz00b");
realNode2.setAvailable(false);
realNode2.setRoundTripTime(27L);
realNode2.setVipDomain(vipDomain);
vipDomain.setRealNodes(Arrays.asList(realNode1, realNode2));
PollingResponse response = new PollingResponse();
response.setStartTime(1747817529332L);
response.setVipDomains(Collections.singletonList(vipDomain));
sofaResponse.setAppResponse(response);
AbstractByteBuf encoded = serializer.encode(sofaResponse, null);
Object decodeObj = serializer.decode(encoded, SofaResponse.class, null);
System.out.println("SofaResponse decodeObj: " + decodeObj);
}
What did you expect to see?
Normal deserialization
What did you see instead?
deserialization exception
Anything Else?
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 4
at org.apache.fury.collection.IntArray.pop(IntArray.java:54)
at org.apache.fury.resolver.MapRefResolver.reference(MapRefResolver.java:192)
at com.alipay.antvip.common.model.VipDomainFuryRefCodecMetaShared4_0.read(VipDomainFuryRefCodecMetaShared4_0.java:352)
at com.alipay.antvip.common.transport.PollingResponseFuryRefCodecMetaShared3_0.readFields1$(PollingResponseFuryRefCodecMetaShared3_0.java:130)
at com.alipay.antvip.common.transport.PollingResponseFuryRefCodecMetaShared3_0.read(PollingResponseFuryRefCodecMetaShared3_0.java:245)
at com.alipay.sofa.rpc.core.response.SofaResponseFuryRefCodecMetaShared2_0.read(SofaResponseFuryRefCodecMetaShared2_0.java:110)
at org.apache.fury.Fury.readDataInternal(Fury.java:990)
at org.apache.fury.Fury.readRef(Fury.java:874)
at org.apache.fury.Fury.deserialize(Fury.java:806)
... 12 more
Caused by: java.lang.ClassCastException: class com.alipay.sofa.rpc.core.response.SofaResponseFuryRefCodecMetaShared2_0 cannot be cast to class org.apache.fury.serializer.collection.AbstractMapSerializer (com.alipay.sofa.rpc.core.response.SofaResponseFuryRefCodecMetaShared2_0 and org.apache.fury.serializer.collection.AbstractMapSerializer are in module java.base of loader sun.misc.Launcher$AppClassLoader)
at com.alipay.antvip.common.transport.PollingResponseFuryRefCodecMetaShared3_0.readFields2$(PollingResponseFuryRefCodecMetaShared3_0.java:197)
at com.alipay.antvip.common.transport.PollingResponseFuryRefCodecMetaShared3_0.read(PollingResponseFuryRefCodecMetaShared3_0.java:243)
at com.alipay.sofa.rpc.core.response.SofaResponseFuryRefCodecMetaShared2_0.read(SofaResponseFuryRefCodecMetaShared2_0.java:107)
at org.apache.fury.Fury.readDataInternal(Fury.java:990)
at org.apache.fury.Fury.readRef(Fury.java:874)
at org.apache.fury.Fury.deserialize(Fury.java:806)
... 12 more
Are you willing to submit a PR?
- I'm willing to submit a PR!