Affected Version
37.0.0 (the relevant code is unchanged on current master)
Description
Summary. A Kafka-type lookup (druid-kafka-extraction-namespace) is reported as started/loaded as soon as its Kafka consumer completes its first poll(), not when it has caught up to the end of the topic. Because KafkaLookupExtractorFactory generates a fresh random group.id (the factoryId) on every process start, the consumer finds no committed offsets and re-reads the entire (typically compacted) topic from offset 0. During that catch-up window the process announces itself and serves queries from a partially populated map: LOOKUP(...) returns NULL and SELECT * FROM lookup."X" returns no rows for keys that are durably present in the topic — and this affects all keys, not just recently added ones.
Where this bites hard: Kubernetes deployments with broker autoscaling. Every HPA scale-up adds a cold broker that immediately receives query traffic, so query results flap between complete and empty depending on which broker happens to serve them, then self-heal once the consumer catches up. With an aggressive HPA (we observed 2→4→5→4→2 within ~26 minutes, and hundreds of rescales over ~19 days) this produces user-visible intermittent data loss in dashboards several times per hour.
Root cause. In KafkaLookupExtractorFactory.start() the startup latch is released by the first poll, even if it returns zero records:
final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
startingReads.countDown();
and start() only awaits that latch (bounded by connectTimeout) — i.e. it waits for connection, not for catch-up. Consequently:
LookupReferencesManager considers the lookup loaded and the lookup status APIs report loaded: true while the in-memory map is still mostly empty;
- the node announces itself (we run
druid.lookup.enableLookupSyncOnStartup=true, which doesn't help because of the above);
- there is no API or metric exposing the catch-up state, so operators cannot gate a Kubernetes readiness probe on it either.
Cluster / configuration
- Druid 37.0.0 on Kubernetes (druid-operator), ZooKeeper-less (
druid.discovery.type=k8s)
- Brokers: 2–5 replicas via HPA on CPU
- Lookup: type
kafka, log-compacted topic (cleanup.policy=compact), 8 partitions, ~280 keys
Steps to reproduce
- Create a Kafka lookup over a compacted topic with a non-trivial number of keys.
- Start an additional broker and immediately run
SELECT COUNT(*) FROM lookup."X" against it (port-forward to the pod to bypass load balancing).
- The count starts at/near 0 and grows until the consumer catches up, while the lookup is already reported as loaded on that broker. The broker log shows
Found no committed offset for partition ... followed by Resetting offset for partition ... to position FetchPosition{offset=0, ...} for all partitions, confirming the full re-read under a fresh random group id.
Expected behavior (either would solve it)
start() waits — optionally behind a config flag, with a configurable timeout — until the consumer reaches the end offsets captured at subscription time before reporting started, so that "loaded" means "caught up"; or
- the factory exposes its catch-up state (e.g. via lookup introspection and/or a metric) so the broker readiness endpoint
/druid/broker/v1/readiness (or an operator-provided probe) can take it into account before the node receives queries.
Debugging already done
- Verified at steady state that all warm brokers hold the full map (~280 entries each) and repeated queries return correct results — no intermittency once brokers are warm.
- Reproduced the empty results by querying a freshly started broker directly during a scale-up while warm brokers answered the same query correctly at the same time.
- Read
KafkaLookupExtractorFactory.start() on current master and confirmed the latch is counted down by the first poll regardless of consumer lag.
Affected Version
37.0.0 (the relevant code is unchanged on current master)
Description
Summary. A Kafka-type lookup (
druid-kafka-extraction-namespace) is reported as started/loaded as soon as its Kafka consumer completes its firstpoll(), not when it has caught up to the end of the topic. BecauseKafkaLookupExtractorFactorygenerates a fresh randomgroup.id(thefactoryId) on every process start, the consumer finds no committed offsets and re-reads the entire (typically compacted) topic from offset 0. During that catch-up window the process announces itself and serves queries from a partially populated map:LOOKUP(...)returns NULL andSELECT * FROM lookup."X"returns no rows for keys that are durably present in the topic — and this affects all keys, not just recently added ones.Where this bites hard: Kubernetes deployments with broker autoscaling. Every HPA scale-up adds a cold broker that immediately receives query traffic, so query results flap between complete and empty depending on which broker happens to serve them, then self-heal once the consumer catches up. With an aggressive HPA (we observed 2→4→5→4→2 within ~26 minutes, and hundreds of rescales over ~19 days) this produces user-visible intermittent data loss in dashboards several times per hour.
Root cause. In
KafkaLookupExtractorFactory.start()the startup latch is released by the first poll, even if it returns zero records:and
start()only awaits that latch (bounded byconnectTimeout) — i.e. it waits for connection, not for catch-up. Consequently:LookupReferencesManagerconsiders the lookup loaded and the lookup status APIs reportloaded: truewhile the in-memory map is still mostly empty;druid.lookup.enableLookupSyncOnStartup=true, which doesn't help because of the above);Cluster / configuration
druid.discovery.type=k8s)kafka, log-compacted topic (cleanup.policy=compact), 8 partitions, ~280 keysSteps to reproduce
SELECT COUNT(*) FROM lookup."X"against it (port-forward to the pod to bypass load balancing).Found no committed offset for partition ...followed byResetting offset for partition ... to position FetchPosition{offset=0, ...}for all partitions, confirming the full re-read under a fresh random group id.Expected behavior (either would solve it)
start()waits — optionally behind a config flag, with a configurable timeout — until the consumer reaches the end offsets captured at subscription time before reporting started, so that "loaded" means "caught up"; or/druid/broker/v1/readiness(or an operator-provided probe) can take it into account before the node receives queries.Debugging already done
KafkaLookupExtractorFactory.start()on current master and confirmed the latch is counted down by the first poll regardless of consumer lag.