Skip to content

Commit c7bb4f6

Browse files
committed
fix lossy list
1 parent 9a7ed32 commit c7bb4f6

2 files changed

Lines changed: 128 additions & 0 deletions

File tree

metadata-io/src/main/java/com/linkedin/metadata/ingestion/IngestionRunReport.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@
44

55
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
66
import com.fasterxml.jackson.annotation.JsonProperty;
7+
import com.fasterxml.jackson.annotation.JsonSetter;
8+
import com.fasterxml.jackson.annotation.Nulls;
9+
import com.fasterxml.jackson.core.JsonParser;
10+
import com.fasterxml.jackson.core.JsonToken;
11+
import com.fasterxml.jackson.databind.DeserializationContext;
12+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
13+
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
14+
import java.io.IOException;
715
import java.util.Collections;
816
import java.util.List;
917
import java.util.Map;
@@ -106,10 +114,16 @@ public static class SourceReport {
106114
@Nullable
107115
private Map<String, Double> ingestionHighStageSeconds;
108116

117+
@JsonDeserialize(contentUsing = LogEntryDeserializer.class)
118+
@JsonSetter(contentNulls = Nulls.SKIP)
109119
private List<LogEntry> warnings = Collections.emptyList();
110120

121+
@JsonDeserialize(contentUsing = LogEntryDeserializer.class)
122+
@JsonSetter(contentNulls = Nulls.SKIP)
111123
private List<LogEntry> failures = Collections.emptyList();
112124

125+
@JsonDeserialize(contentUsing = LogEntryDeserializer.class)
126+
@JsonSetter(contentNulls = Nulls.SKIP)
113127
private List<LogEntry> infos = Collections.emptyList();
114128
}
115129

@@ -143,6 +157,8 @@ public static class SinkReport {
143157

144158
@Nullable private String mode;
145159

160+
@JsonDeserialize(contentUsing = LogEntryDeserializer.class)
161+
@JsonSetter(contentNulls = Nulls.SKIP)
146162
private List<LogEntry> failures = Collections.emptyList();
147163
}
148164

@@ -160,4 +176,44 @@ public static class LogEntry {
160176
@Nullable
161177
private String logCategory;
162178
}
179+
180+
/**
181+
* Custom element deserializer for {@code List<LogEntry>} fields.
182+
*
183+
* <p>The Python ingestion framework's {@code LossyList} caps log-entry arrays at 10 items and
184+
* appends a plain-string sentinel (e.g. {@code "... sampled of 1246 total elements"}) as the 11th
185+
* element when the original list was larger. Without this deserializer, Jackson throws a {@code
186+
* MismatchedInputException} when it encounters that sentinel string while trying to construct a
187+
* {@link LogEntry} object.
188+
*
189+
* <p>Strategy: annotate every {@code List<LogEntry>} field with
190+
* {@code @JsonDeserialize(contentUsing = LogEntryDeserializer.class)}. When Jackson processes
191+
* each list element it dispatches here:
192+
*
193+
* <ul>
194+
* <li>String token → return an empty {@link LogEntry} (silently dropped by {@code
195+
* toLogEntryMaps} because all fields are null and the resulting map is empty).
196+
* <li>Object token → delegate to standard bean deserialization via {@code ctxt.readValue(p,
197+
* LogEntry.class)}, which is safe because {@link LogEntry} itself does <em>not</em> carry
198+
* {@code @JsonDeserialize} — so there is no recursion.
199+
* </ul>
200+
*/
201+
public static class LogEntryDeserializer extends StdDeserializer<LogEntry> {
202+
203+
public LogEntryDeserializer() {
204+
super(LogEntry.class);
205+
}
206+
207+
@Override
208+
public LogEntry deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
209+
if (p.currentToken() == JsonToken.VALUE_STRING) {
210+
// Python LossyList sentinel string — return null so @JsonSetter(contentNulls = Nulls.SKIP)
211+
// excludes it from the list entirely. This keeps list.size() == real entry count.
212+
return null;
213+
}
214+
// Standard bean deserialization. No recursion: LogEntry itself has no @JsonDeserialize,
215+
// so ctxt.readValue uses the default BeanDeserializer, not this class.
216+
return ctxt.readValue(p, LogEntry.class);
217+
}
218+
}
163219
}

metadata-io/src/test/java/com/linkedin/metadata/ingestion/IngestionMetricsEmitterTest.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,78 @@ public void testNonJsonContentTypeIgnored() throws Exception {
394394
assertTrue(runsCounter == null || runsCounter.count() == 0.0);
395395
}
396396

397+
/**
398+
* Regression test for the LossyList sentinel bug.
399+
*
400+
* <p>When the Python ingestion framework's {@code LossyList} truncates a list that exceeds 10
401+
* entries, it appends a plain-string sentinel as the 11th element (e.g. {@code "... sampled of
402+
* 1246 total elements"}). Without {@link IngestionRunReport.LogEntryDeserializer}, Jackson throws
403+
* a {@code MismatchedInputException} when it encounters that string while trying to construct a
404+
* {@code LogEntry} object.
405+
*
406+
* <p>This test verifies that a report with more than 10 failures (triggering the sentinel) is
407+
* deserialized successfully and that only the real log entries are counted/surfaced.
408+
*/
409+
@Test
410+
public void testLossyListSentinelDoesNotThrow() throws Exception {
411+
// Build a failures array with 10 real LogEntry objects followed by the LossyList sentinel
412+
// string — exactly what Python emits when sampled=True.
413+
StringBuilder failuresJson = new StringBuilder("[");
414+
for (int i = 0; i < 10; i++) {
415+
if (i > 0) failuresJson.append(",");
416+
failuresJson.append(
417+
"{\"title\":\"Extraction error\",\"message\":\"error " + i + "\",\"context\":[]}");
418+
}
419+
failuresJson.append(",\"... sampled of 1246 total elements\"");
420+
failuresJson.append("]");
421+
422+
String reportJson =
423+
"{"
424+
+ "\"cli\": {\"cli_version\": \"1.2.0\"},"
425+
+ "\"source\": {"
426+
+ " \"type\": \"mysql\","
427+
+ " \"report\": {"
428+
+ " \"events_produced\": 500,"
429+
+ " \"warnings\": [],"
430+
+ " \"failures\": "
431+
+ failuresJson
432+
+ ","
433+
+ " \"platform\": \"mysql\""
434+
+ " }"
435+
+ "},"
436+
+ "\"sink\": {"
437+
+ " \"type\": \"datahub-rest\","
438+
+ " \"report\": {"
439+
+ " \"total_records_written\": 400,"
440+
+ " \"failures\": []"
441+
+ " }"
442+
+ "}"
443+
+ "}";
444+
445+
ExecutionRequestResult result = new ExecutionRequestResult();
446+
result.setStatus("FAILURE");
447+
result.setDurationMs(3000L);
448+
449+
StructuredExecutionReport structuredReport = new StructuredExecutionReport();
450+
structuredReport.setType("CLI_INGEST");
451+
structuredReport.setSerializedValue(reportJson);
452+
structuredReport.setContentType("application/json");
453+
result.setStructuredReport(structuredReport);
454+
455+
// Must not throw MismatchedInputException
456+
emitter.observeMCPs(List.of(createBatchItem(result, ChangeType.UPSERT)), retrieverContext);
457+
458+
// Run is counted
459+
Counter runsCounter = meterRegistry.find("com.datahub.ingest.runs").counter();
460+
assertNotNull(runsCounter);
461+
assertEquals(runsCounter.count(), 1.0);
462+
463+
// Only the 10 real entries are counted — the sentinel is silently dropped
464+
Counter failuresCounter = meterRegistry.find("com.datahub.ingest.failures").counter();
465+
assertNotNull(failuresCounter);
466+
assertEquals(failuresCounter.count(), 10.0);
467+
}
468+
397469
// Helper methods
398470

399471
private BatchItem createBatchItem(ExecutionRequestResult result, ChangeType changeType)

0 commit comments

Comments
 (0)