Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 42 additions & 23 deletions src/Transports/GCP/Wolverine.Pubsub/PubsubEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class PubsubEndpoint : Endpoint<IPubsubEnvelopeMapper, PubsubEnvelopeMapp

private bool _hasInitialized;
public PubsubClientOptions Client = new();
internal bool IsExistingSubscription = false;

protected override PubsubEnvelopeMapper buildMapper(IWolverineRuntime runtime)
{
Expand Down Expand Up @@ -49,9 +50,8 @@ public PubsubEndpoint(
Server.Topic.Name = new TopicName(transport.ProjectId, topicName);
Server.Subscription.Name = new SubscriptionName(
transport.ProjectId,
_transport.IdentifierPrefix.IsNotEmpty() &&
topicName.StartsWith($"{_transport.IdentifierPrefix}.")
? _transport.MaybeCorrectName(topicName.Substring(_transport.IdentifierPrefix.Length + 1))
_transport.IdentifierPrefix.IsNotEmpty() && !topicName.StartsWith($"{_transport.IdentifierPrefix}.")
? _transport.MaybeCorrectName(topicName)
: topicName
);
EndpointName = topicName;
Expand All @@ -69,6 +69,33 @@ public async ValueTask SetupAsync(ILogger logger)
throw new WolverinePubsubTransportNotConnectedException();
}

if (IsExistingSubscription)
{
if (!IsListener && !IsDeadLetter)
{
return;
}

if (_transport.SubscriberApiClient is null)
{
throw new WolverinePubsubTransportNotConnectedException();
}

try
{
await _transport.SubscriberApiClient.GetSubscriptionAsync(Server.Subscription.Name);
}
catch (Exception ex)
{
logger.LogError(ex, "{Uri}: Error trying to verify Google Cloud Platform Pub/Sub subscription \"{Subscription}\"",
Uri, Server.Subscription.Name);

throw;
}

return;
}

try
{
await _transport.PublisherApiClient.CreateTopicAsync(new Topic
Expand All @@ -77,16 +104,8 @@ await _transport.PublisherApiClient.CreateTopicAsync(new Topic
MessageRetentionDuration = Server.Topic.Options.MessageRetentionDuration
});
}
catch (RpcException ex)
catch (RpcException ex) when (ex.StatusCode == StatusCode.AlreadyExists)
{
if (ex.StatusCode != StatusCode.AlreadyExists)
{
logger.LogError(ex, "{Uri}: Error trying to initialize Google Cloud Platform Pub/Sub topic \"{Topic}\"",
Uri, Server.Topic.Name);

throw;
}

logger.LogInformation("{Uri}: Google Cloud Platform Pub/Sub topic \"{Topic}\" already exists", Uri,
Server.Topic.Name);
}
Expand Down Expand Up @@ -145,17 +164,8 @@ await _transport.PublisherApiClient.CreateTopicAsync(new Topic

await _transport.SubscriberApiClient.CreateSubscriptionAsync(request);
}
catch (RpcException ex)
catch (RpcException ex) when (ex.StatusCode == StatusCode.AlreadyExists)
{
if (ex.StatusCode != StatusCode.AlreadyExists)
{
logger.LogError(ex,
"{Uri}: Error trying to initialize Google Cloud Platform Pub/Sub subscription \"{Subscription}\" to topic \"{Topic}\"",
Uri, Server.Subscription.Name, Server.Topic.Name);

throw;
}

logger.LogInformation("{Uri}: Google Cloud Platform Pub/Sub subscription \"{Subscription}\" already exists",
Uri, Server.Subscription.Name);
}
Expand All @@ -181,7 +191,10 @@ _transport.SubscriberApiClient is null

try
{
await _transport.PublisherApiClient.GetTopicAsync(Server.Topic.Name);
if (!IsExistingSubscription)
{
await _transport.PublisherApiClient.GetTopicAsync(Server.Topic.Name);
}

if (IsListener || IsDeadLetter)
{
Expand Down Expand Up @@ -239,6 +252,8 @@ await _transport.SubscriberApiClient.AcknowledgeAsync(

public async ValueTask TeardownAsync(ILogger logger)
{
if (IsExistingSubscription) { return; }

if (_transport.SubscriberApiClient is not null && IsListener)
{
await _transport.SubscriberApiClient.DeleteSubscriptionAsync(Server.Subscription.Name);
Expand All @@ -252,6 +267,10 @@ public async ValueTask TeardownAsync(ILogger logger)

public override async ValueTask InitializeAsync(ILogger logger)
{
if (IsExistingSubscription)
{
_hasInitialized = true;
}
if (_hasInitialized)
{
return;
Expand Down
111 changes: 69 additions & 42 deletions src/Transports/GCP/Wolverine.Pubsub/PubsubTransportExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,38 @@ namespace Wolverine.Pubsub;

public static class PubsubTransportExtensions
{
/// <summary>
/// Quick access to the Google Cloud Platform Pub/Sub Transport within this application.
/// This is for advanced usage.
/// </summary>
/// <param name="endpoints"></param>
/// <returns></returns>
internal static PubsubTransport PubsubTransport(this WolverineOptions endpoints)
/// <summary>
/// Quick access to the Google Cloud Platform Pub/Sub Transport within this application.
/// This is for advanced usage.
/// </summary>
/// <param name="endpoints"></param>
/// <returns></returns>
internal static PubsubTransport PubsubTransport(this WolverineOptions endpoints)
{
var transports = endpoints.As<WolverineOptions>().Transports;

return transports.GetOrCreate<PubsubTransport>();
}

/// <summary>
/// Additive configuration to the Google Cloud Platform Pub/Sub integration for this Wolverine application.
/// </summary>
/// <param name="endpoints"></param>
/// <returns></returns>
public static PubsubConfiguration ConfigurePubsub(this WolverineOptions endpoints)
/// <summary>
/// Additive configuration to the Google Cloud Platform Pub/Sub integration for this Wolverine application.
/// </summary>
/// <param name="endpoints"></param>
/// <returns></returns>
public static PubsubConfiguration ConfigurePubsub(this WolverineOptions endpoints)
{
return new PubsubConfiguration(endpoints.PubsubTransport(), endpoints);
}

/// <summary>
/// Connect to Google Cloud Platform Pub/Sub with a project id.
/// </summary>
/// <param name="endpoints"></param>
/// <param name="projectId"></param>
/// <param name="configure"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static PubsubConfiguration UsePubsub(this WolverineOptions endpoints, string projectId,
/// <summary>
/// Connect to Google Cloud Platform Pub/Sub with a project id.
/// </summary>
/// <param name="endpoints"></param>
/// <param name="projectId"></param>
/// <param name="configure"></param>
/// <returns></returns>
/// <exception cref="ArgumentNullException"></exception>
public static PubsubConfiguration UsePubsub(this WolverineOptions endpoints, string projectId,
Action<PubsubTransport>? configure = null)
{
var transport = endpoints.PubsubTransport();
Expand All @@ -48,16 +48,16 @@ public static PubsubConfiguration UsePubsub(this WolverineOptions endpoints, str
return new PubsubConfiguration(transport, endpoints);
}

/// <summary>
/// Listen for incoming messages at the designated Google Cloud Platform Pub/Sub topic by name.
/// </summary>
/// <param name="endpoints"></param>
/// <param name="topicName">The name of the Google Cloud Platform Pub/Sub topic</param>
/// <param name="configure">
/// Optional configuration for this Google Cloud Platform Pub/Sub endpoint.
/// </param>
/// <returns></returns>
public static PubsubTopicListenerConfiguration ListenToPubsubTopic(
/// <summary>
/// Listen for incoming messages at the designated Google Cloud Platform Pub/Sub topic by name.
/// </summary>
/// <param name="endpoints"></param>
/// <param name="topicName">The name of the Google Cloud Platform Pub/Sub topic</param>
/// <param name="configure">
/// Optional configuration for this Google Cloud Platform Pub/Sub endpoint.
/// </param>
/// <returns></returns>
public static PubsubTopicListenerConfiguration ListenToPubsubTopic(
this WolverineOptions endpoints,
string topicName,
Action<PubsubEndpoint>? configure = null
Expand All @@ -74,16 +74,43 @@ public static PubsubTopicListenerConfiguration ListenToPubsubTopic(
return new PubsubTopicListenerConfiguration(topic);
}

/// <summary>
/// Publish the designated messages to a Google Cloud Platform Pub/Sub topic.
/// </summary>
/// <param name="publishing"></param>
/// <param name="topicName"></param>
/// <param name="configure">
/// Optional configuration for this Google Cloud Platform Pub/Sub endpoint.
/// </param>
/// <returns></returns>
public static PubsubTopicSubscriberConfiguration ToPubsubTopic(
/// <summary>
/// Listen for incoming messages on an existing Google Cloud Platform Pub/Sub subscription.
/// Since an existing subscription is used, IdentifierPrefix is not applied to the subscription name.
/// </summary>
/// <param name="endpoints"></param>
/// <param name="subscriptionName">The name of the Google Cloud Platform Pub/Sub subscription</param>
/// <param name="configure">
/// Optional configuration for this Google Cloud Platform Pub/Sub endpoint.
/// </param>
/// <returns></returns>
public static PubsubTopicListenerConfiguration ListenToPubsubSubscription(
this WolverineOptions endpoints,
string subscriptionName,
Action<PubsubEndpoint>? configure = null
)
{
var transport = endpoints.PubsubTransport();
var topic = transport.Topics[subscriptionName];

topic.IsListener = true;
topic.IsExistingSubscription = true;

configure?.Invoke(topic);

return new PubsubTopicListenerConfiguration(topic);
}

/// <summary>
/// Publish the designated messages to a Google Cloud Platform Pub/Sub topic.
/// </summary>
/// <param name="publishing"></param>
/// <param name="topicName"></param>
/// <param name="configure">
/// Optional configuration for this Google Cloud Platform Pub/Sub endpoint.
/// </param>
/// <returns></returns>
public static PubsubTopicSubscriberConfiguration ToPubsubTopic(
this IPublishToExpression publishing,
string topicName,
Action<PubsubEndpoint>? configure = null
Expand Down
Loading