Skip to content

Kafka source#966

Open
MrAnno wants to merge 32 commits into
axoflow:mainfrom
MrAnno:kafka-source
Open

Kafka source#966
MrAnno wants to merge 32 commits into
axoflow:mainfrom
MrAnno:kafka-source

Conversation

@MrAnno
Copy link
Copy Markdown
Contributor

@MrAnno MrAnno commented Feb 25, 2026

Original Kafka source implementation sourced from syslog-ng/syslog-ng#5564.

The license has been upgraded from GPL-2.0-or-later to GPL-3.0-or-later, which is permitted under the terms of GPL-2.0-or-later.

The original 103 commits have been squashed and extensively restructured to make them suitable for our review process.

HofiOne and others added 30 commits February 25, 2026 19:28
Signed-off-by: László Várady <laszlo.varady@anno.io>
Signed-off-by: Hofi <hofione@gmail.com>
Signed-off-by: Hofi <hofione@gmail.com>
Signed-off-by: László Várady <laszlo.varady@axoflow.com>
Signed-off-by: Hofi <hofione@gmail.com>
Signed-off-by: Hofi <hofione@gmail.com>
Signed-off-by: László Várady <laszlo.varady@axoflow.com>
Signed-off-by: Hofi <hofione@gmail.com>
Signed-off-by: Hofi <hofione@gmail.com>
Signed-off-by: Hofi <hofione@gmail.com>
Signed-off-by: Hofi <hofione@gmail.com>
- implement connected state detection and logging
- have common kafka callback handlers

Signed-off-by: Hofi <hofione@gmail.com>
Signed-off-by: Hofi <hofione@gmail.com>
Signed-off-by: László Várady <laszlo.varady@axoflow.com>
Signed-off-by: Hofi <hofione@gmail.com>
Signed-off-by: László Várady <laszlo.varady@axoflow.com>
Signed-off-by: Hofi <hofione@gmail.com>
Signed-off-by: László Várady <laszlo.varady@axoflow.com>
Signed-off-by: Hofi <hofione@gmail.com>
…d none-linear consuming

Signed-off-by: Hofi <hofione@gmail.com>
…amed to ignore_saved_bookmarks and synced its handling in the persist state maintenance

Signed-off-by: Hofi <hofione@gmail.com>
…ka bug) at startup without a connection

Signed-off-by: Hofi <hofione@gmail.com>
…t, store owner references

Signed-off-by: Hofi <hofione@gmail.com>
Signed-off-by: Hofi <hofione@gmail.com>
… the already exiting nameing convention

Signed-off-by: Hofi <hofione@gmail.com>
…the rebalance event correctly, synced instead

Signed-off-by: Hofi <hofione@gmail.com>
…fully reinitiation of the consumer (will not always work)

Signed-off-by: Hofi <hofione@gmail.com>
…he persist is invalidated, as those are not affected by the remote connection/partition assignment state

Signed-off-by: Hofi <hofione@gmail.com>
Signed-off-by: Hofi <hofione@gmail.com>
HofiOne and others added 2 commits February 25, 2026 19:28
Signed-off-by: László Várady <laszlo.varady@axoflow.com>
@github-actions
Copy link
Copy Markdown
Contributor

This Pull Request introduces config grammar changes

axoflow/89f829398effb61fa2ec185f3599025e206e60e7 -> MrAnno/kafka-source

Details
--- a/destination
+++ b/destination

+kafka(
+    batch-lines(<nonnegative-integer>)
+    batch-timeout(<nonnegative-integer>)
+    bootstrap-servers(<string>)
+    config(
+        <empty>
+        <string> <string-or-number>
+        <string> => <string-or-number>
+        <string>(<string-or-number>)
+    )
+    disk-buffer(
+        capacity-bytes(<number>)
+        compaction(<yesno>)
+        dir(<string>)
+        disk-buf-size(<number>)
+        flow-control-window-bytes(<nonnegative-integer>)
+        flow-control-window-size(<nonnegative-integer>)
+        front-cache-size(<nonnegative-integer>)
+        mem-buf-length(<nonnegative-integer>)
+        mem-buf-size(<nonnegative-integer>)
+        prealloc(<yesno>)
+        qout-size(<nonnegative-integer>)
+        reliable(<yesno>)
+        truncate-size-ratio(<nonnegative-float>)
+    )
+    fallback-topic(<string>)
+    flags(
+        <empty>
+        <string>
+    )
+    flush-timeout-on-reload(<nonnegative-integer>)
+    flush-timeout-on-shutdown(<nonnegative-integer>)
+    frac-digits(<nonnegative-integer>)
+    hook-commands(
+        setup(<string>)
+        shutdown(<string>)
+        startup(<string>)
+        teardown(<string>)
+    )
+    internal(<yesno>)
+    kafka-logging(<string>)
+    key(<template-content>)
+    local-time-zone(<string>)
+    log-fifo-size(<positive-integer>)
+    message(
+        <template-content>
+        <template-reference>
+    )
+    on-error(<string>)
+    partition-buckets(<template-content>)
+    partition-key(<template-content>)
+    partitions(<positive-integer>)
+    persist-name(<string>)
+    poll-timeout(<nonnegative-integer>)
+    retries(<positive-integer>)
+    send-time-zone(<string>)
+    sync-send(<yesno>)
+    template-escape(<yesno>)
+    throttle(<nonnegative-integer>)
+    time-reopen(<positive-integer>)
+    time-zone(<string>)
+    topic(<template-content>)
+    ts-format(<string>)
+    worker-partition-autoscaling(<yesno>)
+    worker-partition-autoscaling-wfo(<positive-integer>)
+    worker-partition-buckets(<template-content>)
+    worker-partition-key(<template-content>)
+    workers(<positive-integer>)
+)

 kafka-c(
+    kafka-logging(<string>)
 )

--- a/source
+++ b/source

+kafka(
+    bootstrap-servers(<string>)
+    chain-hostnames(<yesno>)
+    config(
+        <empty>
+        <string> <string-or-number>
+        <string> => <string-or-number>
+        <string>(<string-or-number>)
+    )
+    default-facility(
+        <string>
+        KW_SYSLOG
+    )
+    default-level(<string>)
+    default-priority(<string>)
+    default-severity(<string>)
+    disable-bookmarks(<yesno>)
+    dns-cache(<yesno>)
+    flags(
+        <empty>
+        <string>
+    )
+    format(<string>)
+    hook-commands(
+        setup(<string>)
+        shutdown(<string>)
+        startup(<string>)
+        teardown(<string>)
+    )
+    host-override(<string>)
+    ignore-saved-bookmarks(<yesno>)
+    internal(<yesno>)
+    kafka-logging(<string>)
+    keep-hostname(<yesno>)
+    keep-timestamp(<yesno>)
+    log-fetch-delay(<nonnegative-integer>)
+    log-fetch-limit(<positive-integer>)
+    log-fetch-queue-full-delay(<positive-integer>)
+    log-fetch-retry-delay(<nonnegative-integer>)
+    log-iw-size(<positive-integer>)
+    log-prefix(<string>)
+    long-hostnames(<yesno>)
+    normalize-hostnames(<yesno>)
+    partitions(<positive-integer>)
+    persist-name(<string>)
+    persist-store(<string>)
+    poll-timeout(<positive-integer>)
+    program-override(<string>)
+    read-old-records(<yesno>)
+    sdata-prefix(<string>)
+    separate-worker-queues(<yesno>)
+    state-update-timeout(<positive-integer>)
+    store-metadata(<yesno>)
+    strategy-hint(<string>)
+    tags(<string-list>)
+    time-reopen(<positive-integer>)
+    time-zone(<string>)
+    topic(
+        <empty>
+        <string> <string-or-number>
+        <string> => <string-or-number>
+        <string>(<string-or-number>)
+    )
+    use-dns(
+        <yesno>
+        persist-only
+    )
+    use-fqdn(<yesno>)
+    use-syslogng-pid(<yesno>)
+    workers(<positive-integer>)
+)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants