Skip to content

Commit 223f713

Browse files
committed
out_kafka: document OTLP partition tradeoff
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
1 parent 35e338a commit 223f713

1 file changed

Lines changed: 28 additions & 23 deletions

File tree

plugins/out_kafka/kafka.c

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,18 @@ struct otlp_logs_resource_partition {
4848
msgpack_sbuffer buffer;
4949
};
5050

51+
static const char *default_logs_body_keys[] = {"log", "message"};
52+
53+
static void init_otlp_logs_options(struct flb_opentelemetry_otlp_logs_options *options)
54+
{
55+
memset(options, 0, sizeof(*options));
56+
options->logs_require_otel_metadata = FLB_FALSE;
57+
options->logs_body_keys = default_logs_body_keys;
58+
options->logs_body_key_count = sizeof(default_logs_body_keys) /
59+
sizeof(default_logs_body_keys[0]);
60+
options->logs_body_key_attributes = FLB_FALSE;
61+
}
62+
5163
void cb_kafka_msg(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage,
5264
void *opaque)
5365
{
@@ -555,7 +567,7 @@ static int produce_raw_payload_with_key_retry_control(const void *payload,
555567
message_key_len = 0;
556568
}
557569

558-
if (queue_full_retry_limit == 0 && allow_engine_retry == FLB_FALSE) {
570+
if (queue_full_retry_limit <= 0 && allow_engine_retry == FLB_FALSE) {
559571
queue_full_retry_limit = FLB_KAFKA_PARTIAL_QUEUE_FULL_RETRIES;
560572
}
561573

@@ -971,7 +983,6 @@ static int produce_partitioned_otlp_logs(struct flb_out_kafka *ctx,
971983
struct otlp_logs_resource_partition *partitions;
972984
struct flb_opentelemetry_otlp_logs_options options;
973985
size_t produced_count;
974-
static const char *default_logs_body_keys[] = {"log", "message"};
975986

976987
partitions = NULL;
977988
partition_count = 0;
@@ -1081,11 +1092,7 @@ static int produce_partitioned_otlp_logs(struct flb_out_kafka *ctx,
10811092
goto cleanup;
10821093
}
10831094

1084-
memset(&options, 0, sizeof(options));
1085-
options.logs_require_otel_metadata = FLB_FALSE;
1086-
options.logs_body_keys = default_logs_body_keys;
1087-
options.logs_body_key_count = 2;
1088-
options.logs_body_key_attributes = FLB_FALSE;
1095+
init_otlp_logs_options(&options);
10891096

10901097
for (index = 0; index < partition_count; index++) {
10911098
partition = &partitions[index];
@@ -1122,9 +1129,12 @@ static int produce_partitioned_otlp_logs(struct flb_out_kafka *ctx,
11221129
}
11231130

11241131
/*
1125-
* Once a partition has been accepted by librdkafka, returning FLB_RETRY
1126-
* for a later queue-full condition would replay the original chunk and
1127-
* duplicate the partitions already enqueued.
1132+
* Partitioned OTLP log sends are at-most-once after the first partition
1133+
* is accepted by librdkafka. If a later partition fails, return FLB_ERROR
1134+
* instead of FLB_RETRY so engine replay does not duplicate partitions
1135+
* already enqueued. Under sustained back-pressure this can partially
1136+
* deliver the original chunk; disable this option or reduce Kafka
1137+
* back-pressure when chunk-level retry durability is required.
11281138
*/
11291139
ret = produce_raw_payload_with_key_retry_control(payload,
11301140
flb_sds_len(payload),
@@ -1162,7 +1172,6 @@ static int produce_otlp_json(struct flb_out_kafka *ctx,
11621172
int result;
11631173
flb_sds_t payload;
11641174
struct flb_opentelemetry_otlp_logs_options options;
1165-
static const char *default_logs_body_keys[] = {"log", "message"};
11661175

11671176
payload = NULL;
11681177

@@ -1173,11 +1182,7 @@ static int produce_otlp_json(struct flb_out_kafka *ctx,
11731182
FLB_KAFKA_FMT_OTLP_JSON);
11741183
}
11751184

1176-
memset(&options, 0, sizeof(options));
1177-
options.logs_require_otel_metadata = FLB_FALSE;
1178-
options.logs_body_keys = default_logs_body_keys;
1179-
options.logs_body_key_count = 2;
1180-
options.logs_body_key_attributes = FLB_FALSE;
1185+
init_otlp_logs_options(&options);
11811186

11821187
payload = flb_opentelemetry_logs_to_otlp_json(event_chunk->data,
11831188
event_chunk->size,
@@ -1224,7 +1229,6 @@ static int produce_otlp_proto(struct flb_out_kafka *ctx,
12241229
struct ctrace *ctr;
12251230
flb_sds_t payload;
12261231
struct flb_opentelemetry_otlp_logs_options options;
1227-
static const char *default_logs_body_keys[] = {"log", "message"};
12281232

12291233
if (event_chunk->type == FLB_EVENT_TYPE_LOGS) {
12301234
if (ctx->otlp_logs_partition_by_resource == FLB_TRUE) {
@@ -1233,11 +1237,7 @@ static int produce_otlp_proto(struct flb_out_kafka *ctx,
12331237
FLB_KAFKA_FMT_OTLP_PROTO);
12341238
}
12351239

1236-
memset(&options, 0, sizeof(options));
1237-
options.logs_require_otel_metadata = FLB_FALSE;
1238-
options.logs_body_keys = default_logs_body_keys;
1239-
options.logs_body_key_count = 2;
1240-
options.logs_body_key_attributes = FLB_FALSE;
1240+
init_otlp_logs_options(&options);
12411241

12421242
payload = flb_opentelemetry_logs_to_otlp_proto(event_chunk->data,
12431243
event_chunk->size,
@@ -1428,7 +1428,12 @@ static struct flb_config_map config_map[] = {
14281428
FLB_CONFIG_MAP_BOOL, "otlp_logs_partition_by_resource", "false",
14291429
0, FLB_TRUE, offsetof(struct flb_out_kafka, otlp_logs_partition_by_resource),
14301430
"When using format otlp_json or otlp_proto, split OTLP log payloads by "
1431-
"resource and use a hash of the resource attributes as the Kafka message key."
1431+
"resource and use a hash of the resource attributes as the Kafka message key. "
1432+
"This supersedes message_key and message_key_field for those chunks; logs "
1433+
"without resource information are unkeyed. After partial partition delivery, "
1434+
"later produce failures are not retried by the engine to avoid duplicates; "
1435+
"disable this option or reduce Kafka back-pressure for chunk-level retry "
1436+
"durability."
14321437
},
14331438
{
14341439
FLB_CONFIG_MAP_STR, "message_key", (char *)NULL,

0 commit comments

Comments
 (0)