Skip to content

[ANA-4718] Namespace pattern subscriber#1

Merged
vbabenkoru merged 4 commits intov4.2-iterablefrom
victor/ANA-4718-namespace-pattern-subscriber
Mar 26, 2026
Merged

[ANA-4718] Namespace pattern subscriber#1
vbabenkoru merged 4 commits intov4.2-iterablefrom
victor/ANA-4718-namespace-pattern-subscriber

Conversation

@vbabenkoru
Copy link
Copy Markdown

@vbabenkoru vbabenkoru commented Mar 26, 2026

Purpose of the change

Add a NamespacePatternSubscriber that dynamically discovers namespaces and topics using a pattern in the format tenant/namespace-pattern/topic-pattern. This enables consuming from topics across multiple namespaces without explicitly listing each one — useful when namespaces are created dynamically (e.g., per-organization).

Brief change log

  • Add NamespacePatternSubscriber that discovers namespaces matching a regex under a tenant via PulsarAdmin, then discovers topics matching a second regex within each namespace using Pulsar's lookup protocol.
  • Add RequiresPulsarAdmin capability interface so the enumerator can inject a PulsarAdmin instance into subscribers that need administrative operations (e.g., listing namespaces).
  • Add PulsarClientFactory.createAdmin() to create a PulsarAdmin from Flink configuration, with automatic URL derivation from the service URL when PULSAR_ADMIN_URL is not set.
  • Add PULSAR_ADMIN_URL config option to PulsarOptions.
  • Add setNamespaceTopicPattern() builder methods to PulsarSourceBuilder.
  • Wire up PulsarAdmin lifecycle in PulsarSourceEnumerator — created on start, closed on close, injected into subscribers that implement RequiresPulsarAdmin.
  • Fix PulsarAdmin resource leak by refactoring lifecycle management so the enumerator owns and closes the admin client.
  • Fix NPE when lastKnownNamespaces is null after deserialization (checkpoint recovery).

Verifying this change

This change added tests and can be verified as follows:

  • Added NamespacePatternSubscriberTest with unit tests covering namespace discovery, topic pattern matching, fallback to last-known namespaces on failure, deserialization safety, and pattern parsing.

Significant changes

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? JavaDocs

…overy

Introduces NamespacePatternSubscriber which discovers both namespaces and topics dynamically using regex patterns. This enables consuming from patterns like tenant/namespace-pattern/topic-pattern where both namespace and topic names are discovered at runtime.

Key features:
- Pattern format: tenant/namespace-regex/topic-regex
- Auto-derives admin URL from service URL (pulsar://host:6650 -> http://host:8080)
- Uses existing partition discovery interval for both namespace and topic discovery
- Falls back to last known namespaces on discovery failures
- Minimal PulsarAdmin support restored for namespace listing only

This is essential for deployments where namespaces are created dynamically and need to be discovered automatically without manual configuration updates.
Moved PulsarAdmin creation from NamespacePatternSubscriber to
PulsarSourceEnumerator to ensure proper resource cleanup and follow
dependency injection pattern. Previously, PulsarAdmin instances were
never closed, causing connection pool exhaustion in long-running applications.

Key changes:
- Add RequiresPulsarAdmin interface for capability-based injection
- Enumerator now owns and manages both PulsarClient and PulsarAdmin lifecycles
- Fix resource leak: PulsarAdmin now properly closed in enumerator.close()
- Remove code duplication: deriveAdminUrl() consolidated in PulsarClientFactory
- Add missing TLS keystore options to PulsarClientFactory.createAdmin()
- Fix thread safety: use CopyOnWriteArrayList for namespace caching
- Fix bounds checking to prevent IndexOutOfBoundsException in topic/namespace parsing
The lastKnownNamespaces field is transient and becomes null after
deserialization. The error handling code was calling .isEmpty() without
checking for null first, causing a NullPointerException that masked the
original exception. This made debugging impossible when namespace
discovery failed after a restart or failover.

Added null check before isEmpty() to properly handle the deserialized state.
@vbabenkoru vbabenkoru marked this pull request as ready for review March 26, 2026 15:56
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@vbabenkoru vbabenkoru merged commit 5a9a1ee into v4.2-iterable Mar 26, 2026
0 of 12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants