record : records) {
+ if (!isPolling.get()) break;
+
+ try {
+ // Convert Kafka record to PubsubMessage
+ PubsubMessage message = convertToPubsubMessage(record);
+
+ // Generate a unique message ID
+ String messageId =
+ String.format(
+ "%s:%d:%d", record.topic(), record.partition(), record.offset());
+
+ // Store offset info for later acknowledgment
+ pendingAcks.put(
+ messageId,
+ new OffsetInfo(
+ new TopicPartition(record.topic(), record.partition()),
+ record.offset(),
+ record.timestamp()));
+
+ // Create AckReplyConsumer for this message
+ AckReplyConsumer ackReplyConsumer =
+ new AckReplyConsumer() {
+ private final AtomicBoolean acked = new AtomicBoolean(false);
+
+ @Override
+ public void ack() {
+ if (acked.compareAndSet(false, true)) {
+ commitOffset(messageId);
+ }
+ }
+
+ @Override
+ public void nack() {
+ if (acked.compareAndSet(false, true)) {
+ // In Kafka, nack typically means we don't commit the offset
+ // The message will be redelivered after session timeout
+ log.info("Message nacked, will be redelivered: " + messageId);
+ pendingAcks.remove(messageId);
+ }
+ }
+ };
+
+ // Deliver message to receiver
+ receiver.receiveMessage(message, ackReplyConsumer);
+
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Error processing message from Kafka", e);
+ }
+ }
+
+ } catch (WakeupException e) {
+ // This is expected when consumer.wakeup() is called
+ break;
+ } catch (Exception e) {
+ log.log(Level.SEVERE, "Error in Kafka poll loop", e);
+ if (!isPolling.get()) break;
+
+ // Sleep briefly before retrying
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+ } finally {
+ isPolling.set(false);
+ }
+ });
+ }
+
+ /**
+ * Converts a Kafka ConsumerRecord to a PubsubMessage.
+ *
+ * Translation rules: - Data: Bytes -> Bytes (direct pass-through) - Key: Kafka key -> PubSub
+ * orderingKey (preserves ordering logic) - Headers: Kafka headers -> PubSub attributes
+ * (multi-value headers are flattened) - Timestamp: Kafka timestamp (Unix epoch millis) ->
+ * Protobuf Timestamp
+ */
+ private PubsubMessage convertToPubsubMessage(ConsumerRecord record) {
+ PubsubMessage.Builder builder = PubsubMessage.newBuilder();
+
+ // Data: direct bytes pass-through
+ if (record.value() != null) {
+ builder.setData(ByteString.copyFrom(record.value()));
+ }
+
+ // Key: Kafka key becomes ordering key (preserves partitioning/ordering)
+ if (record.key() != null) {
+ builder.setOrderingKey(new String(record.key(), java.nio.charset.StandardCharsets.UTF_8));
+ }
+
+ // Convert Kafka timestamp (Unix epoch milliseconds) to Protobuf Timestamp
+ long timestampMs = record.timestamp();
+ if (timestampMs > 0) {
+ long seconds = timestampMs / 1000;
+ int nanos = (int) ((timestampMs % 1000) * 1_000_000);
+ builder.setPublishTime(Timestamp.newBuilder().setSeconds(seconds).setNanos(nanos).build());
+ }
+
+ // Headers: Convert Kafka headers to PubSub attributes
+ // For multi-value headers with the same key, we use indexed suffixes
+ Map attributes = new HashMap<>();
+ Map headerCounts = new HashMap<>();
+
+ for (Header header : record.headers()) {
+ if (header.value() != null) {
+ String key = header.key();
+ String value = new String(header.value(), java.nio.charset.StandardCharsets.UTF_8);
+
+ // Handle special headers that map to PubsubMessage fields
+ if (key.equals("pubsublite.publish_time")) {
+ // Already handled via record.timestamp()
+ continue;
+ }
+
+ // Handle multi-value attributes by appending index suffix
+ int count = headerCounts.getOrDefault(key, 0);
+ if (count == 0) {
+ attributes.put(key, value);
+ } else {
+ attributes.put(key + "." + count, value);
+ }
+ headerCounts.put(key, count + 1);
+ }
+ }
+
+ // Add Kafka-specific metadata as special attributes
+ attributes.put("x-kafka-topic", record.topic());
+ attributes.put("x-kafka-partition", String.valueOf(record.partition()));
+ attributes.put("x-kafka-offset", String.valueOf(record.offset()));
+ attributes.put("x-kafka-timestamp-ms", String.valueOf(record.timestamp()));
+ attributes.put("x-kafka-timestamp-type", record.timestampType().name());
+
+ builder.putAllAttributes(attributes);
+
+ // Set message ID in format: topic:partition:offset
+ builder.setMessageId(
+ String.format("%s:%d:%d", record.topic(), record.partition(), record.offset()));
+
+ return builder.build();
+ }
+
+ private void commitOffset(String messageId) {
+ OffsetInfo info = pendingAcks.remove(messageId);
+ if (info == null) {
+ return;
+ }
+
+ // Skip commit if we're shutting down
+ if (!isPolling.get()) {
+ log.fine("Skipping offset commit during shutdown for message: " + messageId);
+ return;
+ }
+
+ try {
+ // Commit the offset for this message
+ Map offsets = new HashMap<>();
+ offsets.put(info.partition, new OffsetAndMetadata(info.offset + 1));
+ kafkaConsumer.commitSync(offsets);
+
+ log.fine("Committed offset for message: " + messageId);
+ } catch (WakeupException e) {
+ // This is expected during shutdown - consumer.wakeup() was called
+ log.fine("Offset commit interrupted by shutdown for message: " + messageId);
+ } catch (Exception e) {
+ log.log(Level.WARNING, "Failed to commit offset for message: " + messageId, e);
+ }
+ }
+
+ public String getSubscriptionNameString() {
+ return topicName + "/" + groupId;
+ }
+
+ @Override
+ public ApiService startAsync() {
+ // Start parent service first
+ super.startAsync();
+ return this;
+ }
+
+ @Override
+ protected void doStart() {
+ try {
+ startPolling();
+ notifyStarted();
+ } catch (Exception e) {
+ notifyFailed(e);
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ try {
+ // Stop polling
+ isPolling.set(false);
+
+ // Wake up the consumer if it's blocked in poll()
+ kafkaConsumer.wakeup();
+
+ // Shutdown executor
+ pollExecutor.shutdown();
+ try {
+ if (!pollExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
+ pollExecutor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ pollExecutor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+
+ kafkaConsumer.close();
+
+ notifyStopped();
+ } catch (Exception e) {
+ notifyFailed(e);
+ }
+ }
+}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/KafkaAdminClient.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/KafkaAdminClient.java
new file mode 100644
index 000000000..fedcd4e27
--- /dev/null
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/KafkaAdminClient.java
@@ -0,0 +1,491 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.internal;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.cloud.pubsublite.AdminClient;
+import com.google.cloud.pubsublite.BacklogLocation;
+import com.google.cloud.pubsublite.CloudRegion;
+import com.google.cloud.pubsublite.LocationPath;
+import com.google.cloud.pubsublite.ReservationPath;
+import com.google.cloud.pubsublite.SeekTarget;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.proto.OperationMetadata;
+import com.google.cloud.pubsublite.proto.Reservation;
+import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;
+import com.google.cloud.pubsublite.proto.Subscription;
+import com.google.cloud.pubsublite.proto.Topic;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.protobuf.FieldMask;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+
+/**
+ * An AdminClient implementation that wraps Kafka's AdminClient.
+ *
+ * This maps Pub/Sub Lite admin operations to Kafka admin operations: - Topics:
+ * Create/Delete/List/Get mapped to Kafka topic operations - Subscriptions: Mapped to Kafka consumer
+ * groups - Reservations: No-ops (PSL-specific feature that doesn't exist in Kafka)
+ */
+public class KafkaAdminClient implements AdminClient {
+ private static final Logger log = Logger.getLogger(KafkaAdminClient.class.getName());
+
+ private final CloudRegion region;
+ private final org.apache.kafka.clients.admin.AdminClient kafkaAdmin;
+ private final int defaultPartitions;
+ private final short defaultReplicationFactor;
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ private final AtomicBoolean isTerminated = new AtomicBoolean(false);
+
+ /**
+ * Creates a new KafkaAdminClient.
+ *
+ * @param region The cloud region for this client.
+ * @param kafkaProperties Kafka connection properties (must include bootstrap.servers).
+ * @param defaultPartitions Default number of partitions for new topics.
+ * @param defaultReplicationFactor Default replication factor for new topics.
+ */
+ public KafkaAdminClient(
+ CloudRegion region,
+ Map kafkaProperties,
+ int defaultPartitions,
+ short defaultReplicationFactor) {
+ this.region = region;
+ this.defaultPartitions = defaultPartitions;
+ this.defaultReplicationFactor = defaultReplicationFactor;
+
+ Properties props = new Properties();
+ props.putAll(kafkaProperties);
+ this.kafkaAdmin = org.apache.kafka.clients.admin.AdminClient.create(props);
+ }
+
+ @Override
+ public CloudRegion region() {
+ return region;
+ }
+
+ // Topic Operations
+
+ @Override
+ public ApiFuture createTopic(Topic topic) {
+ String topicName = extractTopicName(topic.getName());
+ int partitions =
+ topic.getPartitionConfig().getCount() > 0
+ ? (int) topic.getPartitionConfig().getCount()
+ : defaultPartitions;
+
+ NewTopic newTopic = new NewTopic(topicName, partitions, defaultReplicationFactor);
+
+ return toApiFuture(
+ kafkaAdmin.createTopics(Collections.singleton(newTopic)).all(),
+ v -> topic,
+ e -> {
+ if (e instanceof TopicExistsException) {
+ throw new CheckedApiException(
+ "Topic already exists: " + topicName, StatusCode.Code.ALREADY_EXISTS)
+ .underlying;
+ }
+ throw new RuntimeException("Failed to create topic: " + topicName, e);
+ });
+ }
+
+ @Override
+ public ApiFuture getTopic(TopicPath path) {
+ String topicName = path.name().value();
+
+ return toApiFuture(
+ kafkaAdmin.describeTopics(Collections.singleton(topicName)).allTopicNames(),
+ descriptions -> {
+ TopicDescription desc = descriptions.get(topicName);
+ if (desc == null) {
+ throw new CheckedApiException(
+ "Topic not found: " + topicName, StatusCode.Code.NOT_FOUND)
+ .underlying;
+ }
+ return buildTopic(path, desc);
+ },
+ e -> {
+ if (e instanceof UnknownTopicOrPartitionException) {
+ throw new CheckedApiException(
+ "Topic not found: " + topicName, StatusCode.Code.NOT_FOUND)
+ .underlying;
+ }
+ throw new RuntimeException("Failed to get topic: " + topicName, e);
+ });
+ }
+
+ @Override
+ public ApiFuture getTopicPartitionCount(TopicPath path) {
+ String topicName = path.name().value();
+
+ return toApiFuture(
+ kafkaAdmin.describeTopics(Collections.singleton(topicName)).allTopicNames(),
+ descriptions -> {
+ TopicDescription desc = descriptions.get(topicName);
+ if (desc == null) {
+ throw new CheckedApiException(
+ "Topic not found: " + topicName, StatusCode.Code.NOT_FOUND)
+ .underlying;
+ }
+ return (long) desc.partitions().size();
+ },
+ e -> {
+ if (e instanceof UnknownTopicOrPartitionException) {
+ throw new CheckedApiException(
+ "Topic not found: " + topicName, StatusCode.Code.NOT_FOUND)
+ .underlying;
+ }
+ throw new RuntimeException("Failed to get partition count: " + topicName, e);
+ });
+ }
+
+ @Override
+ public ApiFuture> listTopics(LocationPath path) {
+ return toApiFuture(
+ kafkaAdmin.listTopics().names(),
+ topicNames -> {
+ List topics = new ArrayList<>();
+ for (String name : topicNames) {
+ // Skip internal Kafka topics
+ if (!name.startsWith("__")) {
+ TopicPath topicPath =
+ TopicPath.newBuilder()
+ .setProject(path.project())
+ .setLocation(path.location())
+ .setName(com.google.cloud.pubsublite.TopicName.of(name))
+ .build();
+ topics.add(Topic.newBuilder().setName(topicPath.toString()).build());
+ }
+ }
+ return topics;
+ },
+ e -> {
+ throw new RuntimeException("Failed to list topics", e);
+ });
+ }
+
+ @Override
+ public ApiFuture updateTopic(Topic topic, FieldMask mask) {
+ // Kafka doesn't support most topic updates without recreation
+ // For now, just return the topic as-is (no-op for updates)
+ log.warning(
+ "Topic updates are not fully supported in Kafka backend. "
+ + "Some fields may not be updated: "
+ + mask);
+ return ApiFutures.immediateFuture(topic);
+ }
+
+ @Override
+ public ApiFuture deleteTopic(TopicPath path) {
+ String topicName = path.name().value();
+
+ return toApiFuture(
+ kafkaAdmin.deleteTopics(Collections.singleton(topicName)).all(),
+ v -> null,
+ e -> {
+ if (e instanceof UnknownTopicOrPartitionException) {
+ throw new CheckedApiException(
+ "Topic not found: " + topicName, StatusCode.Code.NOT_FOUND)
+ .underlying;
+ }
+ throw new RuntimeException("Failed to delete topic: " + topicName, e);
+ });
+ }
+
+ @Override
+ public ApiFuture> listTopicSubscriptions(TopicPath path) {
+ // In Kafka, "subscriptions" are consumer groups
+ // This lists consumer groups that have committed offsets for this topic
+ String topicName = path.name().value();
+
+ return toApiFuture(
+ kafkaAdmin.listConsumerGroups().all(),
+ groups -> {
+ List subscriptions = new ArrayList<>();
+ for (org.apache.kafka.clients.admin.ConsumerGroupListing group : groups) {
+ // Create a subscription path from the consumer group
+ SubscriptionPath subPath =
+ SubscriptionPath.newBuilder()
+ .setProject(path.project())
+ .setLocation(path.location())
+ .setName(com.google.cloud.pubsublite.SubscriptionName.of(group.groupId()))
+ .build();
+ subscriptions.add(subPath);
+ }
+ return subscriptions;
+ },
+ e -> {
+ throw new RuntimeException("Failed to list subscriptions for topic: " + topicName, e);
+ });
+ }
+
+ // Subscription Operations
+ // Subscriptions map to Kafka consumer groups
+
+ @Override
+ public ApiFuture createSubscription(
+ Subscription subscription, BacklogLocation startingOffset) {
+ // In Kafka, consumer groups are created implicitly when a consumer joins
+ // We just validate the topic exists and return the subscription
+ String topicName = extractTopicName(subscription.getTopic());
+
+ return toApiFuture(
+ kafkaAdmin.describeTopics(Collections.singleton(topicName)).allTopicNames(),
+ descriptions -> {
+ if (!descriptions.containsKey(topicName)) {
+ throw new CheckedApiException(
+ "Topic not found: " + topicName, StatusCode.Code.NOT_FOUND)
+ .underlying;
+ }
+ return subscription;
+ },
+ e -> {
+ if (e instanceof UnknownTopicOrPartitionException) {
+ throw new CheckedApiException(
+ "Topic not found: " + topicName, StatusCode.Code.NOT_FOUND)
+ .underlying;
+ }
+ throw new RuntimeException("Failed to create subscription", e);
+ });
+ }
+
+ @Override
+ public ApiFuture createSubscription(Subscription subscription, SeekTarget target) {
+ // Seek target is not directly supported in Kafka consumer group creation
+ // The seek would need to be done when the consumer connects
+ return createSubscription(subscription, BacklogLocation.END);
+ }
+
+ @Override
+ public ApiFuture getSubscription(SubscriptionPath path) {
+ String groupId = path.name().value();
+
+ return toApiFuture(
+ kafkaAdmin.describeConsumerGroups(Collections.singleton(groupId)).all(),
+ descriptions -> {
+ if (!descriptions.containsKey(groupId)) {
+ throw new CheckedApiException(
+ "Subscription (consumer group) not found: " + groupId,
+ StatusCode.Code.NOT_FOUND)
+ .underlying;
+ }
+ return Subscription.newBuilder().setName(path.toString()).build();
+ },
+ e -> {
+ throw new RuntimeException("Failed to get subscription: " + groupId, e);
+ });
+ }
+
+ @Override
+ public ApiFuture> listSubscriptions(LocationPath path) {
+ return toApiFuture(
+ kafkaAdmin.listConsumerGroups().all(),
+ groups -> {
+ List subscriptions = new ArrayList<>();
+ for (org.apache.kafka.clients.admin.ConsumerGroupListing group : groups) {
+ SubscriptionPath subPath =
+ SubscriptionPath.newBuilder()
+ .setProject(path.project())
+ .setLocation(path.location())
+ .setName(com.google.cloud.pubsublite.SubscriptionName.of(group.groupId()))
+ .build();
+ subscriptions.add(Subscription.newBuilder().setName(subPath.toString()).build());
+ }
+ return subscriptions;
+ },
+ e -> {
+ throw new RuntimeException("Failed to list subscriptions", e);
+ });
+ }
+
+ @Override
+ public ApiFuture updateSubscription(Subscription subscription, FieldMask mask) {
+ // Consumer group configuration updates are limited in Kafka
+ log.warning("Subscription updates are limited in Kafka backend");
+ return ApiFutures.immediateFuture(subscription);
+ }
+
+ @Override
+ public OperationFuture seekSubscription(
+ SubscriptionPath path, SeekTarget target) {
+ // Kafka consumer group offset seeking is done via consumer API, not admin API
+ // This would require resetting consumer group offsets
+ throw new UnsupportedOperationException(
+ "Seek subscription is not directly supported in Kafka backend. "
+ + "Use consumer API to seek to specific offsets.");
+ }
+
+ @Override
+ public ApiFuture deleteSubscription(SubscriptionPath path) {
+ String groupId = path.name().value();
+
+ return toApiFuture(
+ kafkaAdmin.deleteConsumerGroups(Collections.singleton(groupId)).all(),
+ v -> null,
+ e -> {
+ throw new RuntimeException(
+ "Failed to delete subscription (consumer group): " + groupId, e);
+ });
+ }
+
+ // Reservation Operations
+ // Reservations are PSL-specific and don't exist in Kafka - all are no-ops
+
+ @Override
+ public ApiFuture createReservation(Reservation reservation) {
+ log.info("Reservations are not supported in Kafka backend. Operation is a no-op.");
+ return ApiFutures.immediateFuture(reservation);
+ }
+
+ @Override
+ public ApiFuture getReservation(ReservationPath path) {
+ log.info("Reservations are not supported in Kafka backend. Returning empty reservation.");
+ return ApiFutures.immediateFuture(Reservation.newBuilder().setName(path.toString()).build());
+ }
+
+ @Override
+ public ApiFuture> listReservations(LocationPath path) {
+ log.info("Reservations are not supported in Kafka backend. Returning empty list.");
+ return ApiFutures.immediateFuture(Collections.emptyList());
+ }
+
+ @Override
+ public ApiFuture updateReservation(Reservation reservation, FieldMask mask) {
+ log.info("Reservations are not supported in Kafka backend. Operation is a no-op.");
+ return ApiFutures.immediateFuture(reservation);
+ }
+
+ @Override
+ public ApiFuture deleteReservation(ReservationPath path) {
+ log.info("Reservations are not supported in Kafka backend. Operation is a no-op.");
+ return ApiFutures.immediateFuture(null);
+ }
+
+ @Override
+ public ApiFuture> listReservationTopics(ReservationPath path) {
+ log.info("Reservations are not supported in Kafka backend. Returning empty list.");
+ return ApiFutures.immediateFuture(Collections.emptyList());
+ }
+
+ // Lifecycle
+
+ @Override
+ public void close() {
+ shutdown();
+ }
+
+ @Override
+ public void shutdown() {
+ if (isShutdown.compareAndSet(false, true)) {
+ kafkaAdmin.close();
+ isTerminated.set(true);
+ }
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return isShutdown.get();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return isTerminated.get();
+ }
+
+ @Override
+ public void shutdownNow() {
+ shutdown();
+ }
+
+ @Override
+ public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
+ return isTerminated.get();
+ }
+
+ // Helper Methods
+
+ private String extractTopicName(String fullPath) {
+ int lastSlash = fullPath.lastIndexOf('/');
+ return lastSlash >= 0 ? fullPath.substring(lastSlash + 1) : fullPath;
+ }
+
+ private Topic buildTopic(TopicPath path, TopicDescription desc) {
+ return Topic.newBuilder()
+ .setName(path.toString())
+ .setPartitionConfig(
+ com.google.cloud.pubsublite.proto.Topic.PartitionConfig.newBuilder()
+ .setCount(desc.partitions().size())
+ .build())
+ .build();
+ }
+
+ private ApiFuture toApiFuture(
+ KafkaFuture kafkaFuture,
+ java.util.function.Function successMapper,
+ java.util.function.Function errorMapper) {
+ return ApiFutures.transform(
+ ApiFutures.catching(
+ new KafkaFutureAdapter<>(kafkaFuture).toApiFuture(),
+ Throwable.class,
+ t -> {
+ throw errorMapper.apply(t);
+ },
+ MoreExecutors.directExecutor()),
+ successMapper::apply,
+ MoreExecutors.directExecutor());
+ }
+
+ /** Adapter to convert KafkaFuture to ApiFuture. */
+ private static class KafkaFutureAdapter {
+ private final KafkaFuture kafkaFuture;
+
+ KafkaFutureAdapter(KafkaFuture kafkaFuture) {
+ this.kafkaFuture = kafkaFuture;
+ }
+
+ ApiFuture toApiFuture() {
+ com.google.api.core.SettableApiFuture apiFuture =
+ com.google.api.core.SettableApiFuture.create();
+
+ kafkaFuture.whenComplete(
+ (result, error) -> {
+ if (error != null) {
+ apiFuture.setException(error);
+ } else {
+ apiFuture.set(result);
+ }
+ });
+
+ return apiFuture;
+ }
+ }
+}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/KafkaCursorClient.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/KafkaCursorClient.java
new file mode 100644
index 000000000..f4dc2fc6b
--- /dev/null
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/KafkaCursorClient.java
@@ -0,0 +1,287 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.internal;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.cloud.pubsublite.CloudRegion;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.proto.Cursor;
+import com.google.cloud.pubsublite.proto.PartitionCursor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * A cursor client implementation for Kafka backend.
+ *
+ * In Pub/Sub Lite, cursors are a first-class service with specific RPCs (commit, list, streaming
+ * commit). In Kafka, cursor management is handled via consumer group offsets.
+ *
+ *
This implementation provides helper functions that interact with Kafka's consumer group offset
+ * management:
+ *
+ *
+ * - {@link #commitOffset}: Save the position of the consumer for a partition
+ *
- {@link #readCommittedOffsets}: Retrieve the saved position per partition
+ *
+ */
+public class KafkaCursorClient implements ApiBackgroundResource {
+ private static final Logger log = Logger.getLogger(KafkaCursorClient.class.getName());
+
+ private final CloudRegion region;
+ private final AdminClient kafkaAdmin;
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ private final AtomicBoolean isTerminated = new AtomicBoolean(false);
+
+ /**
+ * Creates a new KafkaCursorClient.
+ *
+ * @param region The cloud region for this client.
+ * @param kafkaProperties Kafka connection properties (must include bootstrap.servers).
+ */
+ public KafkaCursorClient(CloudRegion region, Map kafkaProperties) {
+ this.region = region;
+ Properties props = new Properties();
+ props.putAll(kafkaProperties);
+ this.kafkaAdmin = AdminClient.create(props);
+ }
+
+ /** The Google Cloud region this client operates on. */
+ public CloudRegion region() {
+ return region;
+ }
+
+ /**
+ * Commits an offset for a specific partition within a consumer group.
+ *
+ * This maps to Kafka's consumer group offset commit functionality.
+ *
+ * @param subscriptionPath The subscription path (used to derive consumer group ID).
+ * @param topicName The Kafka topic name.
+ * @param partition The partition to commit offset for.
+ * @param offset The offset to commit (next message to be consumed).
+ * @return A future that completes when the offset is committed.
+ */
+ public ApiFuture commitOffset(
+ SubscriptionPath subscriptionPath, String topicName, Partition partition, Offset offset) {
+ String groupId = deriveGroupId(subscriptionPath);
+ TopicPartition tp = new TopicPartition(topicName, (int) partition.value());
+ Map offsets = new HashMap<>();
+ offsets.put(tp, new OffsetAndMetadata(offset.value()));
+
+ try {
+ kafkaAdmin.alterConsumerGroupOffsets(groupId, offsets).all().get();
+ return ApiFutures.immediateFuture(null);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException("Interrupted while committing offset", StatusCode.Code.ABORTED)
+ .underlying);
+ } catch (ExecutionException e) {
+ log.log(Level.WARNING, "Failed to commit offset", e);
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Failed to commit offset: " + e.getCause().getMessage(), StatusCode.Code.INTERNAL)
+ .underlying);
+ }
+ }
+
+ /**
+ * Reads the committed offsets for all partitions of a subscription (consumer group).
+ *
+ * This maps to Kafka's listConsumerGroupOffsets functionality.
+ *
+ * @param subscriptionPath The subscription path (used to derive consumer group ID).
+ * @param topicName The Kafka topic name.
+ * @return A future containing a list of partition cursors with their committed offsets.
+ */
+ public ApiFuture> readCommittedOffsets(
+ SubscriptionPath subscriptionPath, String topicName) {
+ String groupId = deriveGroupId(subscriptionPath);
+
+ try {
+ ListConsumerGroupOffsetsResult result = kafkaAdmin.listConsumerGroupOffsets(groupId);
+ Map offsets = result.partitionsToOffsetAndMetadata().get();
+
+ List cursors = new ArrayList<>();
+ for (Map.Entry entry : offsets.entrySet()) {
+ TopicPartition tp = entry.getKey();
+ OffsetAndMetadata oam = entry.getValue();
+
+ // Filter to only the requested topic
+ if (tp.topic().equals(topicName) && oam != null) {
+ cursors.add(
+ PartitionCursor.newBuilder()
+ .setPartition(tp.partition())
+ .setCursor(Cursor.newBuilder().setOffset(oam.offset()).build())
+ .build());
+ }
+ }
+
+ return ApiFutures.immediateFuture(cursors);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Interrupted while reading committed offsets", StatusCode.Code.ABORTED)
+ .underlying);
+ } catch (ExecutionException e) {
+ log.log(Level.WARNING, "Failed to read committed offsets", e);
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Failed to read committed offsets: " + e.getCause().getMessage(),
+ StatusCode.Code.INTERNAL)
+ .underlying);
+ }
+ }
+
+ /**
+ * Gets the committed offset for a specific partition.
+ *
+ * @param subscriptionPath The subscription path (used to derive consumer group ID).
+ * @param topicName The Kafka topic name.
+ * @param partition The partition to get offset for.
+ * @return A future containing the cursor with the committed offset, or empty if not committed.
+ */
+ public ApiFuture getCommittedOffset(
+ SubscriptionPath subscriptionPath, String topicName, Partition partition) {
+ String groupId = deriveGroupId(subscriptionPath);
+ TopicPartition tp = new TopicPartition(topicName, (int) partition.value());
+
+ try {
+ ListConsumerGroupOffsetsResult result = kafkaAdmin.listConsumerGroupOffsets(groupId);
+ Map offsets = result.partitionsToOffsetAndMetadata().get();
+
+ OffsetAndMetadata oam = offsets.get(tp);
+ if (oam != null) {
+ return ApiFutures.immediateFuture(Cursor.newBuilder().setOffset(oam.offset()).build());
+ } else {
+ // No committed offset, return offset 0
+ return ApiFutures.immediateFuture(Cursor.newBuilder().setOffset(0).build());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Interrupted while getting committed offset", StatusCode.Code.ABORTED)
+ .underlying);
+ } catch (ExecutionException e) {
+ log.log(Level.WARNING, "Failed to get committed offset", e);
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Failed to get committed offset: " + e.getCause().getMessage(),
+ StatusCode.Code.INTERNAL)
+ .underlying);
+ }
+ }
+
+ /**
+ * Resets offsets for a consumer group to a specific position.
+ *
+ * @param subscriptionPath The subscription path (used to derive consumer group ID).
+ * @param topicName The Kafka topic name.
+ * @param partitionOffsets Map of partition to target offset.
+ * @return A future that completes when offsets are reset.
+ */
+ public ApiFuture resetOffsets(
+ SubscriptionPath subscriptionPath,
+ String topicName,
+ Map partitionOffsets) {
+ String groupId = deriveGroupId(subscriptionPath);
+ Map offsets = new HashMap<>();
+
+ for (Map.Entry entry : partitionOffsets.entrySet()) {
+ TopicPartition tp = new TopicPartition(topicName, (int) entry.getKey().value());
+ offsets.put(tp, new OffsetAndMetadata(entry.getValue().value()));
+ }
+
+ try {
+ kafkaAdmin.alterConsumerGroupOffsets(groupId, offsets).all().get();
+ return ApiFutures.immediateFuture(null);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException("Interrupted while resetting offsets", StatusCode.Code.ABORTED)
+ .underlying);
+ } catch (ExecutionException e) {
+ log.log(Level.WARNING, "Failed to reset offsets", e);
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Failed to reset offsets: " + e.getCause().getMessage(), StatusCode.Code.INTERNAL)
+ .underlying);
+ }
+ }
+
+ /**
+ * Derives a Kafka consumer group ID from a subscription path.
+ *
+ * The group ID is derived by replacing slashes with dashes to create a valid Kafka group ID.
+ */
+ private String deriveGroupId(SubscriptionPath subscriptionPath) {
+ return subscriptionPath.toString().replace('/', '-');
+ }
+
+ // Lifecycle
+
+ @Override
+ public void close() {
+ shutdown();
+ }
+
+ @Override
+ public void shutdown() {
+ if (isShutdown.compareAndSet(false, true)) {
+ kafkaAdmin.close();
+ isTerminated.set(true);
+ }
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return isShutdown.get();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return isTerminated.get();
+ }
+
+ @Override
+ public void shutdownNow() {
+ shutdown();
+ }
+
+ @Override
+ public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
+ return isTerminated.get();
+ }
+}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/KafkaTopicStatsClient.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/KafkaTopicStatsClient.java
new file mode 100644
index 000000000..5400f18f5
--- /dev/null
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/KafkaTopicStatsClient.java
@@ -0,0 +1,435 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsublite.internal;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.rpc.StatusCode;
+import com.google.cloud.pubsublite.CloudRegion;
+import com.google.cloud.pubsublite.Offset;
+import com.google.cloud.pubsublite.Partition;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
+import com.google.cloud.pubsublite.proto.Cursor;
+import com.google.protobuf.Timestamp;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * A topic stats client implementation for Kafka backend.
+ *
+ *
Pub/Sub Lite has specific RPCs to query topic statistics. Kafka does not support these RPCs
+ * directly, so this implementation provides helper methods that calculate stats based on Kafka
+ * primitives:
+ *
+ *
+ * - {@link #getEarliestOffset}: The oldest available message offset in the partition
+ *
- {@link #getLatestOffset}: The newest message offset in the partition (head cursor)
+ *
- {@link #computeBacklogBytes}: Approximate backlog calculated by comparing consumer offset
+ * against latest offset and estimating based on average message size
+ *
- {@link #computeMessageStats}: Approximate message stats between two offsets
+ *
+ */
+public class KafkaTopicStatsClient implements TopicStatsClient {
+ private static final Logger log = Logger.getLogger(KafkaTopicStatsClient.class.getName());
+
+ // Default average message size estimate (in bytes) when we can't calculate it
+ private static final long DEFAULT_AVG_MESSAGE_SIZE = 1024; // 1KB
+
+ private final CloudRegion region;
+ private final AdminClient kafkaAdmin;
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+ private final AtomicBoolean isTerminated = new AtomicBoolean(false);
+
+ /**
+ * Creates a new KafkaTopicStatsClient.
+ *
+ * @param region The cloud region for this client.
+ * @param kafkaProperties Kafka connection properties (must include bootstrap.servers).
+ */
+ public KafkaTopicStatsClient(CloudRegion region, Map kafkaProperties) {
+ this.region = region;
+ Properties props = new Properties();
+ props.putAll(kafkaProperties);
+ this.kafkaAdmin = AdminClient.create(props);
+ }
+
+ @Override
+ public CloudRegion region() {
+ return region;
+ }
+
+ /**
+ * Gets the earliest (oldest) available offset for a partition.
+ *
+ * This is the offset of the oldest message still available in the partition (messages before
+ * this have been deleted due to retention policies).
+ *
+ * @param topicName The Kafka topic name.
+ * @param partition The partition to query.
+ * @return A future containing the earliest offset.
+ */
+ public ApiFuture getEarliestOffset(String topicName, Partition partition) {
+ TopicPartition tp = new TopicPartition(topicName, (int) partition.value());
+ Map request = new HashMap<>();
+ request.put(tp, OffsetSpec.earliest());
+
+ try {
+ ListOffsetsResult result = kafkaAdmin.listOffsets(request);
+ ListOffsetsResultInfo info = result.partitionResult(tp).get();
+ return ApiFutures.immediateFuture(Offset.of(info.offset()));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Interrupted while getting earliest offset", StatusCode.Code.ABORTED)
+ .underlying);
+ } catch (ExecutionException e) {
+ log.log(Level.WARNING, "Failed to get earliest offset", e);
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Failed to get earliest offset: " + e.getCause().getMessage(),
+ StatusCode.Code.INTERNAL)
+ .underlying);
+ }
+ }
+
+ /**
+ * Gets the latest (newest) offset for a partition.
+ *
+ * This is the offset that will be assigned to the next message published to the partition.
+ *
+ * @param topicName The Kafka topic name.
+ * @param partition The partition to query.
+ * @return A future containing the latest offset.
+ */
+ public ApiFuture getLatestOffset(String topicName, Partition partition) {
+ TopicPartition tp = new TopicPartition(topicName, (int) partition.value());
+ Map request = new HashMap<>();
+ request.put(tp, OffsetSpec.latest());
+
+ try {
+ ListOffsetsResult result = kafkaAdmin.listOffsets(request);
+ ListOffsetsResultInfo info = result.partitionResult(tp).get();
+ return ApiFutures.immediateFuture(Offset.of(info.offset()));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Interrupted while getting latest offset", StatusCode.Code.ABORTED)
+ .underlying);
+ } catch (ExecutionException e) {
+ log.log(Level.WARNING, "Failed to get latest offset", e);
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Failed to get latest offset: " + e.getCause().getMessage(),
+ StatusCode.Code.INTERNAL)
+ .underlying);
+ }
+ }
+
+ /**
+ * Gets the offset for a specific timestamp.
+ *
+ * Returns the earliest offset whose timestamp is greater than or equal to the given timestamp.
+ *
+ * @param topicName The Kafka topic name.
+ * @param partition The partition to query.
+ * @param timestampMs The target timestamp in milliseconds since epoch.
+ * @return A future containing the offset, or empty if no message exists at or after the
+ * timestamp.
+ */
+ public ApiFuture> getOffsetForTimestamp(
+ String topicName, Partition partition, long timestampMs) {
+ TopicPartition tp = new TopicPartition(topicName, (int) partition.value());
+ Map request = new HashMap<>();
+ request.put(tp, OffsetSpec.forTimestamp(timestampMs));
+
+ try {
+ ListOffsetsResult result = kafkaAdmin.listOffsets(request);
+ ListOffsetsResultInfo info = result.partitionResult(tp).get();
+
+ if (info.offset() >= 0) {
+ return ApiFutures.immediateFuture(Optional.of(Offset.of(info.offset())));
+ } else {
+ return ApiFutures.immediateFuture(Optional.empty());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Interrupted while getting offset for timestamp", StatusCode.Code.ABORTED)
+ .underlying);
+ } catch (ExecutionException e) {
+ log.log(Level.WARNING, "Failed to get offset for timestamp", e);
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Failed to get offset for timestamp: " + e.getCause().getMessage(),
+ StatusCode.Code.INTERNAL)
+ .underlying);
+ }
+ }
+
+ /**
+ * Computes the approximate backlog in bytes for a consumer group on a partition.
+ *
+ * The backlog is calculated by:
+ *
+ *
+ * - Getting the consumer's committed offset
+ *
- Getting the latest offset (log end offset)
+ *
- Calculating the difference (number of unconsumed messages)
+ *
- Multiplying by the estimated average message size
+ *
+ *
+ * Note: This is an approximation. For exact values, Kafka would need to sum the actual sizes
+ * of all unconsumed messages, which is not efficiently supported.
+ *
+ * @param topicName The Kafka topic name.
+ * @param partition The partition to query.
+ * @param subscriptionPath The subscription path (used to derive consumer group ID).
+ * @param estimatedAvgMessageSize The estimated average message size in bytes (use 0 for default).
+ * @return A future containing the BacklogInfo with message count and byte estimate.
+ */
+ public ApiFuture computeBacklogBytes(
+ String topicName,
+ Partition partition,
+ SubscriptionPath subscriptionPath,
+ long estimatedAvgMessageSize) {
+ String groupId = subscriptionPath.toString().replace('/', '-');
+ TopicPartition tp = new TopicPartition(topicName, (int) partition.value());
+
+ try {
+ // Get committed offset for the consumer group
+ Map committedOffsets =
+ kafkaAdmin.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();
+
+ // Get latest offset
+ Map latestRequest = new HashMap<>();
+ latestRequest.put(tp, OffsetSpec.latest());
+ ListOffsetsResultInfo latestInfo =
+ kafkaAdmin.listOffsets(latestRequest).partitionResult(tp).get();
+
+ long latestOffset = latestInfo.offset();
+ long committedOffset = 0;
+
+ OffsetAndMetadata oam = committedOffsets.get(tp);
+ if (oam != null) {
+ committedOffset = oam.offset();
+ } else {
+ // No committed offset - use earliest offset as the starting point
+ Map earliestRequest = new HashMap<>();
+ earliestRequest.put(tp, OffsetSpec.earliest());
+ ListOffsetsResultInfo earliestInfo =
+ kafkaAdmin.listOffsets(earliestRequest).partitionResult(tp).get();
+ committedOffset = earliestInfo.offset();
+ }
+
+ long messageCount = Math.max(0, latestOffset - committedOffset);
+ long avgSize =
+ estimatedAvgMessageSize > 0 ? estimatedAvgMessageSize : DEFAULT_AVG_MESSAGE_SIZE;
+ long estimatedBytes = messageCount * avgSize;
+
+ return ApiFutures.immediateFuture(new BacklogInfo(messageCount, estimatedBytes));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException("Interrupted while computing backlog", StatusCode.Code.ABORTED)
+ .underlying);
+ } catch (ExecutionException e) {
+ log.log(Level.WARNING, "Failed to compute backlog", e);
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Failed to compute backlog: " + e.getCause().getMessage(),
+ StatusCode.Code.INTERNAL)
+ .underlying);
+ }
+ }
+
+ /** Container for backlog information. */
+ public static class BacklogInfo {
+ private final long messageCount;
+ private final long estimatedBytes;
+
+ public BacklogInfo(long messageCount, long estimatedBytes) {
+ this.messageCount = messageCount;
+ this.estimatedBytes = estimatedBytes;
+ }
+
+ /** The number of unconsumed messages. */
+ public long getMessageCount() {
+ return messageCount;
+ }
+
+ /** The estimated size in bytes of unconsumed messages. */
+ public long getEstimatedBytes() {
+ return estimatedBytes;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "BacklogInfo{messages=%d, estimatedBytes=%d}", messageCount, estimatedBytes);
+ }
+ }
+
+ // TopicStatsClient Interface Implementation
+
+ @Override
+ public ApiFuture computeMessageStats(
+ TopicPath path, Partition partition, Offset start, Offset end) {
+ // Kafka doesn't provide detailed message stats like PSL does.
+ // We can only provide an approximation based on offset difference.
+ long messageCount = Math.max(0, end.value() - start.value());
+
+ // Approximate byte count using default message size
+ long estimatedBytes = messageCount * DEFAULT_AVG_MESSAGE_SIZE;
+
+ // For minimum publish time, we'd need to fetch actual messages which is expensive.
+ // Return a response with what we can calculate.
+ return ApiFutures.immediateFuture(
+ ComputeMessageStatsResponse.newBuilder()
+ .setMessageCount(messageCount)
+ .setMessageBytes(estimatedBytes)
+ // We cannot efficiently compute min publish time without reading messages
+ .build());
+ }
+
+ @Override
+ public ApiFuture computeHeadCursor(TopicPath path, Partition partition) {
+ String topicName = path.name().value();
+ TopicPartition tp = new TopicPartition(topicName, (int) partition.value());
+ Map request = new HashMap<>();
+ request.put(tp, OffsetSpec.latest());
+
+ try {
+ ListOffsetsResult result = kafkaAdmin.listOffsets(request);
+ ListOffsetsResultInfo info = result.partitionResult(tp).get();
+ return ApiFutures.immediateFuture(Cursor.newBuilder().setOffset(info.offset()).build());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Interrupted while computing head cursor", StatusCode.Code.ABORTED)
+ .underlying);
+ } catch (ExecutionException e) {
+ log.log(Level.WARNING, "Failed to compute head cursor", e);
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Failed to compute head cursor: " + e.getCause().getMessage(),
+ StatusCode.Code.INTERNAL)
+ .underlying);
+ }
+ }
+
+ @Override
+ public ApiFuture> computeCursorForPublishTime(
+ TopicPath path, Partition partition, Timestamp publishTime) {
+ String topicName = path.name().value();
+ long timestampMs = publishTime.getSeconds() * 1000 + publishTime.getNanos() / 1_000_000;
+
+ TopicPartition tp = new TopicPartition(topicName, (int) partition.value());
+ Map request = new HashMap<>();
+ request.put(tp, OffsetSpec.forTimestamp(timestampMs));
+
+ try {
+ ListOffsetsResult result = kafkaAdmin.listOffsets(request);
+ ListOffsetsResultInfo info = result.partitionResult(tp).get();
+
+ if (info.offset() >= 0) {
+ return ApiFutures.immediateFuture(
+ Optional.of(Cursor.newBuilder().setOffset(info.offset()).build()));
+ } else {
+ return ApiFutures.immediateFuture(Optional.empty());
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Interrupted while computing cursor for publish time", StatusCode.Code.ABORTED)
+ .underlying);
+ } catch (ExecutionException e) {
+ log.log(Level.WARNING, "Failed to compute cursor for publish time", e);
+ return ApiFutures.immediateFailedFuture(
+ new CheckedApiException(
+ "Failed to compute cursor for publish time: " + e.getCause().getMessage(),
+ StatusCode.Code.INTERNAL)
+ .underlying);
+ }
+ }
+
+ @Override
+ public ApiFuture> computeCursorForEventTime(
+ TopicPath path, Partition partition, Timestamp eventTime) {
+ // Kafka doesn't have a concept of event time in the same way as PSL.
+ // We can only use the message timestamp, which corresponds to publish time.
+ // For event time queries, we fall back to publish time behavior.
+ log.warning(
+ "Kafka does not support event time queries. "
+ + "Falling back to publish time behavior for cursor computation.");
+ return computeCursorForPublishTime(path, partition, eventTime);
+ }
+
+ // Lifecycle
+
+ @Override
+ public void close() {
+ shutdown();
+ }
+
+ @Override
+ public void shutdown() {
+ if (isShutdown.compareAndSet(false, true)) {
+ kafkaAdmin.close();
+ isTerminated.set(true);
+ }
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return isShutdown.get();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return isTerminated.get();
+ }
+
+ @Override
+ public void shutdownNow() {
+ shutdown();
+ }
+
+ @Override
+ public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
+ return isTerminated.get();
+ }
+}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TopicStatsClientSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TopicStatsClientSettings.java
index adbc5aa96..c412d7eb7 100644
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TopicStatsClientSettings.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TopicStatsClientSettings.java
@@ -21,8 +21,10 @@
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.CloudRegion;
+import com.google.cloud.pubsublite.cloudpubsub.MessagingBackend;
import com.google.cloud.pubsublite.v1.TopicStatsServiceClient;
import com.google.cloud.pubsublite.v1.TopicStatsServiceSettings;
+import java.util.Map;
import java.util.Optional;
@AutoValue
@@ -34,8 +36,15 @@ public abstract class TopicStatsClientSettings {
// Optional parameters.
abstract Optional serviceClient();
+ /** The backend messaging system to use (e.g., PUBSUB_LITE or MANAGED_KAFKA). */
+ public abstract MessagingBackend messagingBackend();
+
+ /** Kafka-specific properties for when using MANAGED_KAFKA backend. */
+ public abstract Optional