Skip to content

Commit 93edcf1

Browse files
author
Francois
committed
consumer+producer: added name and counter to support toString
1 parent 62c591d commit 93edcf1

2 files changed

Lines changed: 127 additions & 98 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ Thanks a lot to these awesome contributors
5252
### 0.1.25
5353

5454
- Removal of the asynchronous façade, transducers should suffice
55+
- Consumer+Producer: added name and counter to support toString
5556
- Producer: added send-cb! function that takes a callback fn
5657
- Producer: added abort-transaction! function
5758
- Consumer: added seek-beginning! and seek-end!

src/kinsky/client.clj

Lines changed: 126 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,14 @@
443443
(instance? Pattern topics) topics
444444
:else (throw (ex-info "topics argument is invalid" {:topics topics}))))
445445

446+
(def consumer-counter (atom 0))
447+
(defn opts->consumer-id
448+
^String
449+
[opts]
450+
(str "kinsky-consumer-" (swap! consumer-counter inc) "-"
451+
(get opts "group.id" "") "-"
452+
(get opts "client.id" "") "-"))
453+
446454
(defn consumer->driver
447455
"Given a KafkaConsumer and an optional callback to call when stopping,
448456
yield a consumer driver.
@@ -454,6 +462,7 @@
454462
- `clojure.lang.IDeref`: `deref` to access underlying
455463
[KafkaConsumer](http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/KafkaConsumer.html)
456464
instance.
465+
- `java.lang.Object`: `.toString` produces the kinsky consumer `id`.
457466
458467
The consumer-driver can also take options:
459468
@@ -464,57 +473,61 @@
464473
(consumer->driver consumer nil))
465474
([^KafkaConsumer consumer
466475
{::keys [run-fn consumer-decoder-fn]
467-
:or {consumer-decoder-fn consumer-records->data}}]
468-
(reify
469-
ConsumerDriver
470-
(poll! [this timeout]
471-
(consumer-decoder-fn (.poll consumer (java.time.Duration/ofMillis timeout))))
472-
(stop! [this]
473-
(stop! this 0))
474-
(stop! [this timeout]
475-
(when (fn? run-fn)
476-
(run-fn))
477-
(.wakeup consumer))
478-
(pause! [this topic-partitions]
479-
(.pause consumer
480-
(map ->topic-partition topic-partitions)))
481-
(resume! [this topic-partitions]
482-
(.resume consumer
483-
(map ->topic-partition topic-partitions)))
484-
(subscribe! [this topics]
485-
(.subscribe consumer (->topics topics)))
486-
(subscribe! [this topics listener]
487-
(.subscribe consumer (->topics topics) (rebalance-listener listener)))
488-
(unsubscribe! [this]
489-
(.unsubscribe consumer))
490-
(wake-up! [this]
491-
(.wakeup consumer))
492-
(commit! [this]
493-
(.commitSync consumer))
494-
(commit! [this topic-offsets]
495-
(let [offsets (->> topic-offsets
496-
(map (juxt ->topic-partition ->offset-metadata))
497-
(reduce merge {}))]
498-
(.commitSync consumer ^Map offsets)))
499-
(seek! [this topic-partition offset]
500-
(.seek consumer (->topic-partition topic-partition) (long offset)))
501-
(seek-beginning! [this topic-partitions]
502-
(.seekToBeginning consumer (map ->topic-partition topic-partitions)))
503-
(seek-end! [this topic-partitions]
504-
(.seekToEnd consumer (map ->topic-partition topic-partitions)))
505-
(position! [this topic-partition]
506-
(.position consumer (->topic-partition topic-partition)))
507-
(subscription [this]
508-
(.subscription consumer))
509-
GenericDriver
510-
(close! [this]
511-
(.close consumer))
512-
MetadataDriver
513-
(partitions-for [this topic]
514-
(mapv partition-info->data (.partitionsFor consumer topic)))
515-
clojure.lang.IDeref
516-
(deref [this]
517-
consumer))))
476+
:or {consumer-decoder-fn consumer-records->data}
477+
:as config}]
478+
(let [id (opts->consumer-id config)]
479+
(reify
480+
ConsumerDriver
481+
(poll! [this timeout]
482+
(consumer-decoder-fn (.poll consumer (java.time.Duration/ofMillis timeout))))
483+
(stop! [this]
484+
(stop! this 0))
485+
(stop! [this timeout]
486+
(when (fn? run-fn)
487+
(run-fn))
488+
(.wakeup consumer))
489+
(pause! [this topic-partitions]
490+
(.pause consumer
491+
(map ->topic-partition topic-partitions)))
492+
(resume! [this topic-partitions]
493+
(.resume consumer
494+
(map ->topic-partition topic-partitions)))
495+
(subscribe! [this topics]
496+
(.subscribe consumer (->topics topics)))
497+
(subscribe! [this topics listener]
498+
(.subscribe consumer (->topics topics) (rebalance-listener listener)))
499+
(unsubscribe! [this]
500+
(.unsubscribe consumer))
501+
(wake-up! [this]
502+
(.wakeup consumer))
503+
(commit! [this]
504+
(.commitSync consumer))
505+
(commit! [this topic-offsets]
506+
(let [offsets (->> topic-offsets
507+
(map (juxt ->topic-partition ->offset-metadata))
508+
(reduce merge {}))]
509+
(.commitSync consumer ^Map offsets)))
510+
(seek! [this topic-partition offset]
511+
(.seek consumer (->topic-partition topic-partition) (long offset)))
512+
(seek-beginning! [this topic-partitions]
513+
(.seekToBeginning consumer (map ->topic-partition topic-partitions)))
514+
(seek-end! [this topic-partitions]
515+
(.seekToEnd consumer (map ->topic-partition topic-partitions)))
516+
(position! [this topic-partition]
517+
(.position consumer (->topic-partition topic-partition)))
518+
(subscription [this]
519+
(.subscription consumer))
520+
GenericDriver
521+
(close! [this]
522+
(.close consumer))
523+
MetadataDriver
524+
(partitions-for [this topic]
525+
(mapv partition-info->data (.partitionsFor consumer topic)))
526+
clojure.lang.IDeref
527+
(deref [this]
528+
consumer)
529+
Object
530+
(toString [this] id)))))
518531

519532
(defn safe-poll!
520533
"Implementation of poll which disregards wake-up exceptions"
@@ -589,6 +602,14 @@
589602
(onCompletion [_ record-metadata exception]
590603
(f (some-> record-metadata rm->data) exception)))))
591604

605+
(def producer-counter (atom 0))
606+
(defn opts->producer-id
607+
^String
608+
[opts]
609+
(str "kinsky-producer-" (swap! producer-counter inc) "-"
610+
(get opts "transaction.id" "") "-"
611+
(get opts "client.id" "") "-"))
612+
592613
(defn producer->driver
593614
"Yield a driver from a Kafka Producer.
594615
The producer driver implements the following protocols:
@@ -597,66 +618,73 @@
597618
- [MetadataDriver](#var-MetadataDriver)
598619
- `clojure.lang.IDeref`: `deref` to access underlying
599620
[KafkaProducer](http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html)
600-
instance."
601-
[^KafkaProducer producer]
602-
(reify
603-
GenericDriver
604-
(close! [this]
605-
(.close producer))
606-
(close! [this timeout]
607-
(if (nil? timeout)
608-
(.close producer)
609-
(.close producer (long timeout) TimeUnit/MILLISECONDS)))
610-
ProducerDriver
611-
(send! [this record]
612-
(.send producer (->record record)))
613-
(send! [this topic k v]
614-
(.send producer (->record {:key k :value v :topic topic})))
615-
(send! [this topic k v headers]
616-
(.send producer (->record {:key k :value v :topic topic :headers headers})))
617-
(send-cb! [this record cb]
618-
(.send producer (->record record) (->callback cb)))
619-
(send-cb! [this topic k v cb]
620-
(.send producer
621-
(->record {:key k :value v :topic topic})
622-
(->callback cb)))
623-
(send-cb! [this topic k v headers cb]
624-
(.send producer
625-
(->record {:key k :value v :topic topic :headers headers})
626-
(->callback cb)))
627-
(flush! [this]
628-
(.flush producer))
629-
(init-transactions! [this]
630-
(.initTransactions producer))
631-
(begin-transaction! [this]
632-
(.beginTransaction producer))
633-
(commit-transaction! [this]
634-
(.commitTransaction producer))
635-
(abort-transaction! [this]
636-
(.abortTransaction producer))
637-
MetadataDriver
638-
(partitions-for [this topic]
639-
(mapv partition-info->data (.partitionsFor producer topic)))
640-
clojure.lang.IDeref
641-
(deref [this]
642-
producer)))
621+
instance.
622+
- `java.lang.Object`: `.toString` produces the kinsky producer `id`."
623+
([^KafkaProducer producer]
624+
(producer->driver producer nil))
625+
([^KafkaProducer producer config]
626+
(let [id (opts->producer-id config)]
627+
(reify
628+
GenericDriver
629+
(close! [this]
630+
(.close producer))
631+
(close! [this timeout]
632+
(if (nil? timeout)
633+
(.close producer)
634+
(.close producer (long timeout) TimeUnit/MILLISECONDS)))
635+
ProducerDriver
636+
(send! [this record]
637+
(.send producer (->record record)))
638+
(send! [this topic k v]
639+
(.send producer (->record {:key k :value v :topic topic})))
640+
(send! [this topic k v headers]
641+
(.send producer (->record {:key k :value v :topic topic :headers headers})))
642+
(send-cb! [this record cb]
643+
(.send producer (->record record) (->callback cb)))
644+
(send-cb! [this topic k v cb]
645+
(.send producer
646+
(->record {:key k :value v :topic topic})
647+
(->callback cb)))
648+
(send-cb! [this topic k v headers cb]
649+
(.send producer
650+
(->record {:key k :value v :topic topic :headers headers})
651+
(->callback cb)))
652+
(flush! [this]
653+
(.flush producer))
654+
(init-transactions! [this]
655+
(.initTransactions producer))
656+
(begin-transaction! [this]
657+
(.beginTransaction producer))
658+
(commit-transaction! [this]
659+
(.commitTransaction producer))
660+
(abort-transaction! [this]
661+
(.abortTransaction producer))
662+
MetadataDriver
663+
(partitions-for [this topic]
664+
(mapv partition-info->data (.partitionsFor producer topic)))
665+
clojure.lang.IDeref
666+
(deref [this]
667+
producer)
668+
Object
669+
(toString [this] id)))))
643670

644671
(defn producer
645672
"Create a producer from a configuration and optional serializers.
646673
If a single serializer is provided, it will be used for both keys
647674
and values. If none are provided, the configuration is expected to
648675
hold serializer class names."
649676
([config]
650-
(producer->driver (KafkaProducer. (opts->props config))))
677+
(producer->driver (KafkaProducer. (opts->props config)) config))
651678
([config serializer]
652679
(producer->driver (KafkaProducer. (opts->props config)
653680
(->serializer serializer)
654-
(->serializer serializer))))
681+
(->serializer serializer))
682+
config))
655683
([config kserializer vserializer]
656684
(producer->driver (KafkaProducer. (opts->props config)
657685
(->serializer kserializer)
658-
(->serializer vserializer)))))
659-
686+
(->serializer vserializer))
687+
config)))
660688
(defn consumer
661689
"Create a consumer from a configuration and optional deserializers.
662690
If a callback is given, call it when stopping the consumer.

0 commit comments

Comments
 (0)