Skip to content

feat: Pub/sub#244

Draft
sigurdm wants to merge 5 commits intogoogleapis:mainfrom
sigurdm:pub_sub
Draft

feat: Pub/sub#244
sigurdm wants to merge 5 commits intogoogleapis:mainfrom
sigurdm:pub_sub

Conversation

@sigurdm
Copy link
Copy Markdown
Contributor

@sigurdm sigurdm commented Apr 20, 2026

Initial implementation. Only basic functionality implemented.
Streaming pull is implemented.

No built-in retry yet.

Tools to fetch and compile protos.

@sigurdm sigurdm changed the title Pub sub Pub/sub Apr 20, 2026
@sigurdm sigurdm changed the title Pub/sub feat: Pub/sub Apr 20, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new Google Cloud Pub/Sub client library, including core client functionality, topic and subscription management, and support for the Pub/Sub emulator. The review identified several critical issues: a race condition and token refresh problem in the authentication logic, a resource leak where the auth client is not closed, an overly restrictive project ID requirement for the emulator, an incorrect parsing logic for IPv6 emulator hosts, and code duplication in message mapping. I have recommended addressing these issues to improve the reliability and maintainability of the client.

Comment on lines +106 to +110
_authClient ??= await auth.clientViaApplicationDefaultCredentials(
scopes: ['https://www.googleapis.com/auth/pubsub'],
);
final token = _authClient!.credentials.accessToken.data;
return CallOptions(metadata: {'authorization': 'Bearer $token'});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are several issues with the current authentication logic:

  1. Race Condition: Multiple concurrent RPCs could trigger multiple calls to clientViaApplicationDefaultCredentials before _authClient is set.
  2. Token Refresh: AutoRefreshingAuthClient only refreshes tokens when used via its HTTP interface. Since this code manually extracts the token string for gRPC, the token will eventually expire and never be refreshed. You should call refreshCredentials() to ensure a valid token.
  3. Resource Leak: The _authClient is never closed. It should be closed in the close() method of the PubSub class.

Comment thread pkgs/google_cloud_pubsub/lib/src/client.dart Outdated
Comment on lines +70 to +74
if (host.contains(':')) {
final parts = host.split(':');
cleanHost = parts[0];
port = int.parse(parts[1]);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current logic for splitting the host and port fails for IPv6 addresses (e.g., [::1]:8085). Using lastIndexOf(':') and checking for the closing bracket ] is a more robust approach for parsing the port.

Suggested change
if (host.contains(':')) {
final parts = host.split(':');
cleanHost = parts[0];
port = int.parse(parts[1]);
}
final lastColon = host.lastIndexOf(':');
if (lastColon != -1 && !host.endsWith(']')) {
cleanHost = host.substring(0, lastColon);
port = int.parse(host.substring(lastColon + 1));
}

Comment on lines +119 to +121
Future<void> close() async {
await _channel.shutdown();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _authClient (an AutoRefreshingAuthClient) should be closed when the PubSub client is closed to release underlying resources.

Suggested change
Future<void> close() async {
await _channel.shutdown();
}
Future<void> close() async {
await _channel.shutdown();
await _authClient?.close();
}

Comment on lines +306 to +318
return response.receivedMessages
.map(
(m) => ReceivedMessage(
ackId: m.ackId,
message: Message(
data: m.message.data,
attributes: m.message.attributes,
messageId: m.message.messageId,
publishTime: m.message.publishTime.toDateTime(),
),
),
)
.toList();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for mapping a grpc.ReceivedMessage to the domain ReceivedMessage is duplicated here and in streamingPull. Consider extracting this into a private helper method for better maintainability.

sigurdm and others added 3 commits April 20, 2026 13:02
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant