Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.Nulls;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -106,10 +114,16 @@ public static class SourceReport {
@Nullable
private Map<String, Double> ingestionHighStageSeconds;

@JsonDeserialize(contentUsing = LogEntryDeserializer.class)
@JsonSetter(contentNulls = Nulls.SKIP)
private List<LogEntry> warnings = Collections.emptyList();

@JsonDeserialize(contentUsing = LogEntryDeserializer.class)
@JsonSetter(contentNulls = Nulls.SKIP)
private List<LogEntry> failures = Collections.emptyList();

@JsonDeserialize(contentUsing = LogEntryDeserializer.class)
@JsonSetter(contentNulls = Nulls.SKIP)
private List<LogEntry> infos = Collections.emptyList();
}

Expand Down Expand Up @@ -143,6 +157,8 @@ public static class SinkReport {

@Nullable private String mode;

@JsonDeserialize(contentUsing = LogEntryDeserializer.class)
@JsonSetter(contentNulls = Nulls.SKIP)
private List<LogEntry> failures = Collections.emptyList();
}

Expand All @@ -160,4 +176,44 @@ public static class LogEntry {
@Nullable
private String logCategory;
}

/**
* Custom element deserializer for {@code List<LogEntry>} fields.
*
* <p>The Python ingestion framework's {@code LossyList} caps log-entry arrays at 10 items and
* appends a plain-string sentinel (e.g. {@code "... sampled of 1246 total elements"}) as the 11th
* element when the original list was larger. Without this deserializer, Jackson throws a {@code
* MismatchedInputException} when it encounters that sentinel string while trying to construct a
* {@link LogEntry} object.
*
* <p>Strategy: annotate every {@code List<LogEntry>} field with
* {@code @JsonDeserialize(contentUsing = LogEntryDeserializer.class)}. When Jackson processes
* each list element it dispatches here:
*
* <ul>
* <li>String token → return an empty {@link LogEntry} (silently dropped by {@code
* toLogEntryMaps} because all fields are null and the resulting map is empty).
* <li>Object token → delegate to standard bean deserialization via {@code ctxt.readValue(p,
* LogEntry.class)}, which is safe because {@link LogEntry} itself does <em>not</em> carry
* {@code @JsonDeserialize} — so there is no recursion.
* </ul>
*/
public static class LogEntryDeserializer extends StdDeserializer<LogEntry> {

public LogEntryDeserializer() {
super(LogEntry.class);
}

@Override
public LogEntry deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
if (p.currentToken() == JsonToken.VALUE_STRING) {
// Python LossyList sentinel string — return null so @JsonSetter(contentNulls = Nulls.SKIP)
// excludes it from the list entirely. This keeps list.size() == real entry count.
return null;
}
// Standard bean deserialization. No recursion: LogEntry itself has no @JsonDeserialize,
// so ctxt.readValue uses the default BeanDeserializer, not this class.
return ctxt.readValue(p, LogEntry.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,78 @@ public void testNonJsonContentTypeIgnored() throws Exception {
assertTrue(runsCounter == null || runsCounter.count() == 0.0);
}

/**
* Regression test for the LossyList sentinel bug.
*
* <p>When the Python ingestion framework's {@code LossyList} truncates a list that exceeds 10
* entries, it appends a plain-string sentinel as the 11th element (e.g. {@code "... sampled of
* 1246 total elements"}). Without {@link IngestionRunReport.LogEntryDeserializer}, Jackson throws
* a {@code MismatchedInputException} when it encounters that string while trying to construct a
* {@code LogEntry} object.
*
* <p>This test verifies that a report with more than 10 failures (triggering the sentinel) is
* deserialized successfully and that only the real log entries are counted/surfaced.
*/
@Test
public void testLossyListSentinelDoesNotThrow() throws Exception {
// Build a failures array with 10 real LogEntry objects followed by the LossyList sentinel
// string — exactly what Python emits when sampled=True.
StringBuilder failuresJson = new StringBuilder("[");
for (int i = 0; i < 10; i++) {
if (i > 0) failuresJson.append(",");
failuresJson.append(
"{\"title\":\"Extraction error\",\"message\":\"error " + i + "\",\"context\":[]}");
}
failuresJson.append(",\"... sampled of 1246 total elements\"");
failuresJson.append("]");

String reportJson =
"{"
+ "\"cli\": {\"cli_version\": \"1.2.0\"},"
+ "\"source\": {"
+ " \"type\": \"mysql\","
+ " \"report\": {"
+ " \"events_produced\": 500,"
+ " \"warnings\": [],"
+ " \"failures\": "
+ failuresJson
+ ","
+ " \"platform\": \"mysql\""
+ " }"
+ "},"
+ "\"sink\": {"
+ " \"type\": \"datahub-rest\","
+ " \"report\": {"
+ " \"total_records_written\": 400,"
+ " \"failures\": []"
+ " }"
+ "}"
+ "}";

ExecutionRequestResult result = new ExecutionRequestResult();
result.setStatus("FAILURE");
result.setDurationMs(3000L);

StructuredExecutionReport structuredReport = new StructuredExecutionReport();
structuredReport.setType("CLI_INGEST");
structuredReport.setSerializedValue(reportJson);
structuredReport.setContentType("application/json");
result.setStructuredReport(structuredReport);

// Must not throw MismatchedInputException
emitter.observeMCPs(List.of(createBatchItem(result, ChangeType.UPSERT)), retrieverContext);

// Run is counted
Counter runsCounter = meterRegistry.find("com.datahub.ingest.runs").counter();
assertNotNull(runsCounter);
assertEquals(runsCounter.count(), 1.0);

// Only the 10 real entries are counted — the sentinel is silently dropped
Counter failuresCounter = meterRegistry.find("com.datahub.ingest.failures").counter();
assertNotNull(failuresCounter);
assertEquals(failuresCounter.count(), 10.0);
}

// Helper methods

private BatchItem createBatchItem(ExecutionRequestResult result, ChangeType changeType)
Expand Down
Loading