Skip to content

Support .offsetsForTimes of the consumer API #40

@raycheung

Description

@raycheung

Currently I'm doing something like below in order to lookup offsets at a certain timestamps then reset the topic/partition to that. I've to go through a few Java/Clojure interop in order to match with the API interface. It would be great if all can be done nicely in Clojure.

(require '[kinsky.client :as k])

(let [topic-partitions~
      (map k/->topic-partition [{:topic "topic" :partition 0} {:topic "topic" :partition 1}])

      topic-partition-times-map~
      (into {} (for [tp topic-partitions] [tp timestamp]))

      topic-partition-offsets-map
      (.offsetsForTimes @consumer topic-partition-times-map (t/duration 10 :seconds))

      topic-offsets
      (for [[topic-partition offset-and-timestamp] topic-partition-offsets-map
            :when (not (nil? offset-and-timestamp))]
        (merge (k/topic-partition->data topic-partition) {:offset (.offset offset-and-timestamp)}))]
  (k/commit! consumer topic-offsets)))

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions