Skip to content

Commit 3256a96

Browse files
authored
Fix null pointer exception in deserialize blob (#585)
1 parent 14e523a commit 3256a96

File tree

3 files changed

+159
-14
lines changed

3 files changed

+159
-14
lines changed

src/main/java/com/uber/cadence/internal/common/InternalUtils.java

+50-11
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.uber.cadence.internal.common;
1919

2020
import com.google.common.base.Defaults;
21+
import com.google.common.collect.Lists;
2122
import com.uber.cadence.DataBlob;
2223
import com.uber.cadence.History;
2324
import com.uber.cadence.HistoryEvent;
@@ -31,7 +32,6 @@
3132
import com.uber.cadence.workflow.WorkflowMethod;
3233
import java.lang.reflect.Method;
3334
import java.nio.ByteBuffer;
34-
import java.util.ArrayList;
3535
import java.util.Arrays;
3636
import java.util.HashMap;
3737
import java.util.List;
@@ -153,23 +153,41 @@ public static SearchAttributes convertMapToSearchAttributes(
153153
return new SearchAttributes().setIndexedFields(mapOfByteBuffer);
154154
}
155155

156-
// This method deserialize the DataBlob data to the HistoriyEvent data
157-
public static History DeserializeFromBlobToHistoryEvents(
156+
// This method serializes history to blob data
157+
public static DataBlob SerializeFromHistoryToBlobData(History history) {
158+
159+
// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
160+
TSerializer serializer = new TSerializer();
161+
DataBlob blob = new DataBlob();
162+
try {
163+
blob.setData(serializer.serialize(history));
164+
} catch (org.apache.thrift.TException err) {
165+
throw new RuntimeException("Serialize history to blob data failed", err);
166+
}
167+
168+
return blob;
169+
}
170+
171+
// This method deserialize the DataBlob data to the History data
172+
public static History DeserializeFromBlobDataToHistory(
158173
List<DataBlob> blobData, HistoryEventFilterType historyEventFilterType) throws TException {
159174

160-
List<HistoryEvent> events = new ArrayList<HistoryEvent>();
175+
// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
176+
TDeserializer deSerializer = new TDeserializer();
177+
List<HistoryEvent> events = Lists.newArrayList();
161178
for (DataBlob data : blobData) {
162179
History history = new History();
163180
try {
164181
byte[] dataByte = data.getData();
165-
dataByte = Arrays.copyOfRange(dataByte, 1, dataByte.length);
182+
// TODO: verify the beginning index
183+
dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length);
166184
deSerializer.deserialize(history, dataByte);
167185

168186
if (history == null || history.getEvents() == null || history.getEvents().size() == 0) {
169187
return null;
170188
}
171189
} catch (org.apache.thrift.TException err) {
172-
throw new TException("Deserialize blob data to history event failed with unknown error");
190+
throw new TException("Deserialize blob data to history failed with unknown error");
173191
}
174192

175193
events.addAll(history.getEvents());
@@ -184,22 +202,43 @@ public static History DeserializeFromBlobToHistoryEvents(
184202

185203
// This method serializes history event to blob data
186204
public static List<DataBlob> SerializeFromHistoryEventToBlobData(List<HistoryEvent> events) {
187-
List<DataBlob> blobs = new ArrayList<>(events.size());
205+
206+
// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
207+
TSerializer serializer = new TSerializer();
208+
List<DataBlob> blobs = Lists.newArrayListWithCapacity(events.size());
188209
for (HistoryEvent event : events) {
189210
DataBlob blob = new DataBlob();
190211
try {
191212
blob.setData(serializer.serialize(event));
192213
} catch (org.apache.thrift.TException err) {
193-
throw new RuntimeException("Serialize to blob data failed", err);
214+
throw new RuntimeException("Serialize history event to blob data failed", err);
194215
}
195216
blobs.add(blob);
196217
}
197-
198218
return blobs;
199219
}
200220

201-
private static final TDeserializer deSerializer = new TDeserializer();
202-
private static final TSerializer serializer = new TSerializer();
221+
// This method serializes blob data to history event
222+
public static List<HistoryEvent> DeserializeFromBlobDataToHistoryEvents(List<DataBlob> blobData)
223+
throws TException {
224+
225+
// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
226+
TDeserializer deSerializer = new TDeserializer();
227+
List<HistoryEvent> events = Lists.newArrayList();
228+
for (DataBlob data : blobData) {
229+
try {
230+
HistoryEvent event = new HistoryEvent();
231+
byte[] dataByte = data.getData();
232+
// TODO: verify the beginning index
233+
dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length);
234+
deSerializer.deserialize(event, dataByte);
235+
events.add(event);
236+
} catch (org.apache.thrift.TException err) {
237+
throw new TException("Deserialize blob data to history event failed with unknown error");
238+
}
239+
}
240+
return events;
241+
}
203242

204243
/** Prohibit instantiation */
205244
private InternalUtils() {}

src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ private GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
574574
GetWorkflowExecutionHistoryResponse res = result.getSuccess();
575575
if (res.getRawHistory() != null) {
576576
History history =
577-
InternalUtils.DeserializeFromBlobToHistoryEvents(
577+
InternalUtils.DeserializeFromBlobDataToHistory(
578578
res.getRawHistory(), getRequest.getHistoryEventFilterType());
579579
res.setHistory(history);
580580
}
@@ -2154,7 +2154,7 @@ private void getWorkflowExecutionHistory(
21542154
GetWorkflowExecutionHistoryResponse res = result.getSuccess();
21552155
if (res.getRawHistory() != null) {
21562156
History history =
2157-
InternalUtils.DeserializeFromBlobToHistoryEvents(
2157+
InternalUtils.DeserializeFromBlobDataToHistory(
21582158
res.getRawHistory(), getRequest.getHistoryEventFilterType());
21592159
res.setHistory(history);
21602160
}

src/test/java/com/uber/cadence/internal/common/InternalUtilsTest.java

+107-1
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,23 @@
1717

1818
package com.uber.cadence.internal.common;
1919

20+
import static com.uber.cadence.EventType.WorkflowExecutionStarted;
2021
import static junit.framework.TestCase.assertEquals;
22+
import static org.junit.Assert.assertNotNull;
2123

22-
import com.uber.cadence.SearchAttributes;
24+
import com.google.common.collect.Lists;
25+
import com.googlecode.junittoolbox.MultithreadingTester;
26+
import com.googlecode.junittoolbox.RunnableAssert;
27+
import com.uber.cadence.*;
2328
import com.uber.cadence.converter.DataConverterException;
2429
import com.uber.cadence.workflow.WorkflowUtils;
2530
import java.io.FileOutputStream;
31+
import java.time.LocalDateTime;
32+
import java.time.ZoneOffset;
2633
import java.util.HashMap;
34+
import java.util.List;
2735
import java.util.Map;
36+
import junit.framework.TestCase;
2837
import org.junit.Test;
2938

3039
public class InternalUtilsTest {
@@ -47,4 +56,101 @@ public void testConvertMapToSearchAttributesException() throws Throwable {
4756
attr.put("InvalidValue", new FileOutputStream("dummy"));
4857
InternalUtils.convertMapToSearchAttributes(attr);
4958
}
59+
60+
@Test
61+
public void testSerialization_History() {
62+
63+
RunnableAssert r =
64+
new RunnableAssert("history_serialization") {
65+
@Override
66+
public void run() {
67+
HistoryEvent event =
68+
new HistoryEvent()
69+
.setEventId(1)
70+
.setVersion(1)
71+
.setEventType(WorkflowExecutionStarted)
72+
.setTimestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC))
73+
.setWorkflowExecutionStartedEventAttributes(
74+
new WorkflowExecutionStartedEventAttributes()
75+
.setAttempt(1)
76+
.setFirstExecutionRunId("test"));
77+
78+
List<HistoryEvent> historyEvents = Lists.newArrayList(event);
79+
History history = new History().setEvents(historyEvents);
80+
DataBlob blob = InternalUtils.SerializeFromHistoryToBlobData(history);
81+
assertNotNull(blob);
82+
83+
try {
84+
History result =
85+
InternalUtils.DeserializeFromBlobDataToHistory(
86+
Lists.newArrayList(blob), HistoryEventFilterType.ALL_EVENT);
87+
assertNotNull(result);
88+
assertEquals(1, result.events.size());
89+
assertEquals(event.getEventId(), result.events.get(0).getEventId());
90+
assertEquals(event.getVersion(), result.events.get(0).getVersion());
91+
assertEquals(event.getEventType(), result.events.get(0).getEventType());
92+
assertEquals(event.getTimestamp(), result.events.get(0).getTimestamp());
93+
assertEquals(
94+
event.getWorkflowExecutionStartedEventAttributes(),
95+
result.events.get(0).getWorkflowExecutionStartedEventAttributes());
96+
} catch (Exception e) {
97+
TestCase.fail("Received unexpected error during deserialization");
98+
}
99+
}
100+
};
101+
102+
try {
103+
new MultithreadingTester().add(r).numThreads(50).numRoundsPerThread(10).run();
104+
} catch (Exception e) {
105+
TestCase.fail("Received unexpected error during concurrent deserialization");
106+
}
107+
}
108+
109+
@Test
110+
public void testSerialization_HistoryEvent() {
111+
112+
RunnableAssert r =
113+
new RunnableAssert("history_event_serialization") {
114+
@Override
115+
public void run() {
116+
HistoryEvent event =
117+
new HistoryEvent()
118+
.setEventId(1)
119+
.setVersion(1)
120+
.setEventType(WorkflowExecutionStarted)
121+
.setTimestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC))
122+
.setWorkflowExecutionStartedEventAttributes(
123+
new WorkflowExecutionStartedEventAttributes()
124+
.setAttempt(1)
125+
.setFirstExecutionRunId("test"));
126+
127+
List<HistoryEvent> historyEvents = Lists.newArrayList(event);
128+
List<DataBlob> blobList =
129+
InternalUtils.SerializeFromHistoryEventToBlobData(historyEvents);
130+
assertEquals(1, blobList.size());
131+
132+
try {
133+
List<HistoryEvent> result =
134+
InternalUtils.DeserializeFromBlobDataToHistoryEvents(blobList);
135+
assertNotNull(result);
136+
assertEquals(1, result.size());
137+
assertEquals(event.getEventId(), result.get(0).getEventId());
138+
assertEquals(event.getVersion(), result.get(0).getVersion());
139+
assertEquals(event.getEventType(), result.get(0).getEventType());
140+
assertEquals(event.getTimestamp(), result.get(0).getTimestamp());
141+
assertEquals(
142+
event.getWorkflowExecutionStartedEventAttributes(),
143+
result.get(0).getWorkflowExecutionStartedEventAttributes());
144+
} catch (Exception e) {
145+
TestCase.fail("Received unexpected error during deserialization");
146+
}
147+
}
148+
};
149+
150+
try {
151+
new MultithreadingTester().add(r).numThreads(50).numRoundsPerThread(10).run();
152+
} catch (Exception e) {
153+
TestCase.fail("Received unexpected error during concurrent deserialization");
154+
}
155+
}
50156
}

0 commit comments

Comments
 (0)