-
Notifications
You must be signed in to change notification settings - Fork 5
Open
Labels
enhancementNew feature or requestNew feature or request
Description
The error here is triggered by a custom job parameter being an array instead of a string. It would break the deserialization process, but not drop the message. Therefore, it would crash again right away when it retried to read the message, creating an infinite loop of crashing.
Here is the error message:
Confluent.Kafka.ConsumeException: Local: Value deserialization error
---> System.Text.Json.JsonException: The JSON value could not be converted to System.String. Path: $.customJobParameters[1].value | LineNumber: 0 | BytePositionInLine: 1675.
---> System.InvalidOperationException: Cannot get the value of a token type 'StartArray' as a string.
at System.Text.Json.ThrowHelper.ThrowInvalidOperationException_ExpectedString(JsonTokenType tokenType)
at System.Text.Json.Utf8JsonReader.GetString()
at System.Text.Json.Serialization.Metadata.JsonPropertyInfo`1.ReadJsonAndSetMember(Object obj, ReadStack& state, Utf8JsonReader& reader)
at System.Text.Json.Serialization.Converters.ObjectDefaultConverter`1.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
at System.Text.Json.Serialization.JsonCollectionConverter`2.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, TCollection& value)
at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
at System.Text.Json.Serialization.Metadata.JsonPropertyInfo`1.ReadJsonAndSetMember(Object obj, ReadStack& state, Utf8JsonReader& reader)
at System.Text.Json.Serialization.Converters.ObjectDefaultConverter`1.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value)
at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state)
--- End of inner exception stack trace ---
at System.Text.Json.ThrowHelper.ReThrowWithPath(ReadStack& state, Utf8JsonReader& reader, Exception ex)
at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state)
at System.Text.Json.JsonSerializer.ReadFromSpan[TValue](ReadOnlySpan`1 utf8Json, JsonTypeInfo jsonTypeInfo, Nullable`1 actualByteCount)
at System.Text.Json.JsonSerializer.Deserialize[TValue](ReadOnlySpan`1 utf8Json, JsonSerializerOptions options)
at Orchestrator.Queue.JobsConsumer.JobSerializer`1.Deserialize(ReadOnlySpan`1 data, Boolean isNull, SerializationContext context) in /app/Orchestrator/Queue/JobsConsumer/JobSerializer.cs:line 29
at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
--- End of inner exception stack trace ---
at Confluent.Kafka.Consumer`2.Consume(Int32 millisecondsTimeout)
at Confluent.Kafka.Consumer`2.Consume(CancellationToken cancellationToken)
at Orchestrator.Queue.KafkaConsumer`1.<>c__DisplayClass1_1.<<-ctor>b__0>d.MoveNext() in /app/Orchestrator/Queue/KafkaConsumer.cs:line 34
A potential solution could be to catch the exception and ignore it to just ignore the message. I don't know if it works.
https://jonboulineau.me/blog/kafka/dealing-with-bad-records-in-kafka
Here is a draft of a potential implementation in JsonSerializer.cs:
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull) return default;
try
{
return JsonSerializer.Deserialize<T>(data.ToArray()) ?? default;
}
catch (ConsumeException e)
{
Console.WriteLine("Error message here");
return default;
}
}Again, I don't know that it would work.
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request