Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
Expand All @@ -38,6 +40,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
Expand Down Expand Up @@ -180,6 +183,81 @@ public static PulsarClient createClient(PulsarConfiguration configuration)
return builder.build();
}

/**
* Create a PulsarAdmin by using the flink Configuration.
*
* <p>If PULSAR_ADMIN_URL is not configured, this method will derive it from PULSAR_SERVICE_URL
* by replacing the protocol and port: pulsar://host:6650 → http://host:8080
* pulsar+ssl://host:6651 → https://host:8443
*/
public static PulsarAdmin createAdmin(PulsarConfiguration configuration)
throws PulsarClientException {
String adminUrl;

if (configuration.contains(PULSAR_ADMIN_URL)) {
adminUrl = configuration.get(PULSAR_ADMIN_URL);
} else {
// Derive admin URL from service URL
String serviceUrl = configuration.get(PULSAR_SERVICE_URL);
adminUrl = deriveAdminUrl(serviceUrl);
}

PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminUrl);

// Create the authentication instance
builder.authentication(createAuthentication(configuration));

// TLS configuration
configuration.useOption(PULSAR_TLS_KEY_FILE_PATH, builder::tlsKeyFilePath);
configuration.useOption(PULSAR_TLS_CERTIFICATE_FILE_PATH, builder::tlsCertificateFilePath);
configuration.useOption(PULSAR_TLS_TRUST_CERTS_FILE_PATH, builder::tlsTrustCertsFilePath);
configuration.useOption(
PULSAR_TLS_ALLOW_INSECURE_CONNECTION, builder::allowTlsInsecureConnection);
configuration.useOption(
PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE, builder::enableTlsHostnameVerification);
configuration.useOption(PULSAR_USE_KEY_STORE_TLS, builder::useKeyStoreTls);
configuration.useOption(PULSAR_SSL_PROVIDER, builder::sslProvider);
configuration.useOption(PULSAR_TLS_KEY_STORE_TYPE, builder::tlsKeyStoreType);
configuration.useOption(PULSAR_TLS_KEY_STORE_PATH, builder::tlsKeyStorePath);
configuration.useOption(PULSAR_TLS_KEY_STORE_PASSWORD, builder::tlsKeyStorePassword);
configuration.useOption(PULSAR_TLS_TRUST_STORE_TYPE, builder::tlsTrustStoreType);
configuration.useOption(PULSAR_TLS_TRUST_STORE_PATH, builder::tlsTrustStorePath);
configuration.useOption(PULSAR_TLS_TRUST_STORE_PASSWORD, builder::tlsTrustStorePassword);
configuration.useOption(PULSAR_TLS_CIPHERS, TreeSet::new, builder::tlsCiphers);
configuration.useOption(PULSAR_TLS_PROTOCOLS, TreeSet::new, builder::tlsProtocols);

return builder.build();
}

/**
* Derive admin URL from service URL by replacing protocol and port.
*
* <p>Examples: pulsar://localhost:6650 → http://localhost:8080 pulsar+ssl://localhost:6651 →
* https://localhost:8443 pulsar://broker1:6650,broker2:6650 → http://broker1:8080
*/
private static String deriveAdminUrl(String serviceUrl) {
boolean useTls = serviceUrl.startsWith("pulsar+ssl://");
String protocol = useTls ? "https" : "http";
int defaultPort = useTls ? 8443 : 8080;

// Remove protocol prefix
String hostPart = serviceUrl.replaceFirst("pulsar(\\+ssl)?://", "");

// Handle multiple brokers - take the first one
if (hostPart.contains(",")) {
hostPart = hostPart.split(",")[0];
}

// Replace port if specified
if (hostPart.contains(":")) {
String host = hostPart.substring(0, hostPart.lastIndexOf(':'));
return protocol + "://" + host + ":" + defaultPort;
} else {
// No port specified, add default admin port
return protocol + "://" + hostPart + ":" + defaultPort;
}
}

/**
* Create the {@link Authentication} instance for {@code PulsarClient}. If the user didn't
* provide configuration, a {@link AuthenticationDisabled} instance would be returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,22 @@ private PulsarOptions() {
"pulsar+ssl://pulsar.us-west.example.com:6651")))
.build());

public static final ConfigOption<String> PULSAR_ADMIN_URL =
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "adminUrl")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text("The Pulsar Admin URL for HTTP API access.")
.linebreak()
.text(
"If not specified, it will be automatically derived from serviceUrl by replacing the protocol and port.")
.linebreak()
.list(
text("Example: %s", code("http://localhost:8080")),
text("For TLS: %s", code("https://localhost:8443")))
.build());

public static final ConfigOption<String> PULSAR_AUTH_PLUGIN_CLASS_NAME =
ConfigOptions.key(CLIENT_CONFIG_PREFIX + "authPluginClassName")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,54 @@ public PulsarSourceBuilder<OUT> setTopicPatterns(
return this;
}

/**
* Set a pattern for dynamic namespace and topic discovery. You can set topics once either with
* {@link #setTopics}, {@link #setTopicPattern}, {@link #setTopicPatterns}, or {@link
* #setNamespaceTopicPattern} in this builder.
*
* <p>The pattern must be in the format: tenant/namespace-pattern/topic-pattern
*
* <p>Example: a pattern like eventbus/org-[0-9]+/post-[a-z]+ will discover all namespaces
* matching the pattern org-[0-9]+ under tenant eventbus, then discover all topics matching the
* pattern post-[a-z]+ in each namespace.
*
* @param fullPattern the full pattern in format tenant/namespace-pattern/topic-pattern.
* @return this PulsarSourceBuilder.
*/
public PulsarSourceBuilder<OUT> setNamespaceTopicPattern(Pattern fullPattern) {
return setNamespaceTopicPattern(fullPattern, RegexSubscriptionMode.AllTopics);
}

/**
* Set a pattern for dynamic namespace and topic discovery. You can set topics once either with
* {@link #setTopics}, {@link #setTopicPattern}, {@link #setTopicPatterns}, or {@link
* #setNamespaceTopicPattern} in this builder.
*
* <p>The pattern must be in the format: tenant/namespace-pattern/topic-pattern
*
* <p>Example: a pattern like eventbus/org-[0-9]+/post-[a-z]+ will discover all namespaces
* matching the pattern org-[0-9]+ under tenant eventbus, then discover all topics matching the
* pattern post-[a-z]+ in each namespace.
*
* @param fullPattern the full pattern in format tenant/namespace-pattern/topic-pattern.
* @param regexSubscriptionMode When subscribing to topics using regular expressions, you can
* pick a certain type of topic.
* <ul>
* <li>PersistentOnly: only subscribe to persistent topics.
* <li>NonPersistentOnly: only subscribe to non-persistent topics.
* <li>AllTopics: subscribe to both persistent and non-persistent topics.
* </ul>
*
* @return this PulsarSourceBuilder.
*/
public PulsarSourceBuilder<OUT> setNamespaceTopicPattern(
Pattern fullPattern, RegexSubscriptionMode regexSubscriptionMode) {
ensureSubscriberIsNull("namespace topic pattern");
this.subscriber =
PulsarSubscriber.getNamespacePatternSubscriber(fullPattern, regexSubscriptionMode);
return this;
}

/**
* The consumer name is informative, and it can be used to identify a particular consumer
* instance from the topic stats.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.RequiresPulsarAdmin;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
Expand All @@ -45,6 +47,7 @@
import java.util.Set;

import static java.util.Collections.singletonList;
import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createClient;
import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState;
import static org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner.createAssigner;
Expand All @@ -65,6 +68,8 @@ public class PulsarSourceEnumerator
private final SplitAssigner splitAssigner;
private final SplitEnumeratorMetricGroup metricGroup;

private PulsarAdmin pulsarAdmin;

public PulsarSourceEnumerator(
PulsarSubscriber subscriber,
StartCursor startCursor,
Expand Down Expand Up @@ -105,6 +110,17 @@ public PulsarSourceEnumerator(
@Override
public void start() {
subscriber.open(pulsarClient);

// Inject PulsarAdmin if subscriber requires it
if (subscriber instanceof RequiresPulsarAdmin) {
try {
this.pulsarAdmin = createAdmin(sourceConfiguration);
((RequiresPulsarAdmin) subscriber).setAdmin(this.pulsarAdmin);
} catch (PulsarClientException e) {
throw new FlinkRuntimeException("Failed to create PulsarAdmin", e);
}
}

rangeGenerator.open(sourceConfiguration);

// Expose the split assignment metrics if Flink has supported.
Expand Down Expand Up @@ -170,6 +186,9 @@ public PulsarSourceEnumState snapshotState(long checkpointId) {

@Override
public void close() throws PulsarClientException {
if (pulsarAdmin != null) {
pulsarAdmin.close();
}
if (pulsarClient != null) {
pulsarClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl.MultiPatternSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl.NamespacePatternSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl.TopicListSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl.TopicPatternSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
Expand Down Expand Up @@ -81,4 +82,9 @@ static PulsarSubscriber getMultiTopicPatternSubscriber(
List<Pattern> topicPatterns, RegexSubscriptionMode subscriptionMode) {
return new MultiPatternSubscriber(topicPatterns, subscriptionMode);
}

static PulsarSubscriber getNamespacePatternSubscriber(
Pattern fullPattern, RegexSubscriptionMode subscriptionMode) {
return new NamespacePatternSubscriber(fullPattern, subscriptionMode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.pulsar.source.enumerator.subscriber;

import org.apache.flink.annotation.Internal;

import org.apache.pulsar.client.admin.PulsarAdmin;

/**
* Capability interface for {@link PulsarSubscriber} implementations that require PulsarAdmin for
* their operations.
*
* <p>Subscribers that need to perform administrative operations (such as listing namespaces,
* querying tenant metadata, etc.) should implement this interface. The PulsarSourceEnumerator will
* detect this capability and inject a PulsarAdmin instance during initialization.
*
* <p>The PulsarAdmin instance is owned and managed by the enumerator - subscribers should not close
* it.
*
* <p>Example usage: {@link
* org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl.NamespacePatternSubscriber}
* implements this interface to perform dynamic namespace discovery.
*/
@Internal
public interface RequiresPulsarAdmin {

/**
* Inject the PulsarAdmin instance for administrative operations.
*
* <p>This method is called by the PulsarSourceEnumerator during initialization, after {@link
* PulsarSubscriber#open(org.apache.pulsar.client.api.PulsarClient)} has been called.
*
* @param admin the PulsarAdmin instance to use for administrative operations. Must not be null.
*/
void setAdmin(PulsarAdmin admin);
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public Set<TopicPartition> getSubscribedTopicPartitions(
}

/**
* Query topics for a single pattern using Pulsar's internal protocol. This method is similar
* to TopicPatternSubscriber's implementation.
* Query topics for a single pattern using Pulsar's internal protocol. This method is similar to
* TopicPatternSubscriber's implementation.
*/
private Set<String> queryTopicsForPattern(PatternInfo patternInfo)
throws PulsarClientException {
Expand Down
Loading
Loading