-
-
Notifications
You must be signed in to change notification settings - Fork 979
Description
Problem Description
Currently, when using the GCP Pub/Sub transport, if a subscription already exists in GCP, Kombu silently ignores the AlreadyExists exception and does not update the subscription configuration. This means that any changes to transport options (like ack_deadline_seconds, expiration_seconds, message_retention_duration, or subscription filters) require manual intervention in the GCP Console.
Current Behavior
When Celery workers start up with updated Pub/Sub configuration, the code does nothing if the subscription already exists:
# kombu/transport/gcpubsub.py, line 327
except AlreadyExists:
pass # Does nothing - no update logicExpected Behavior
When a subscription already exists but the configuration has changed, Kombu should update the existing subscription with the new configuration parameters instead of silently ignoring the update.
Use Case
As a developer, when I:
- Update Kombu transport options (e.g., change
ack_deadline_secondsfrom 240 to 300) - Restart my Celery workers
I expect the existing GCP Pub/Sub subscriptions to be updated with the new configuration automatically, without having to manually update them in the GCP Console.
Proposed Solution
Update the _create_subscription method to use the update_subscription API when catching AlreadyExists:
from google.cloud. pubsub_v1.types import Subscription, FieldMask
def _create_subscription(self, ... ):
# ... existing code ...
try:
self.subscriber.create_subscription(request=subscription_config)
except AlreadyExists:
logger.debug('subscription exists, updating: %s', subscription_path)
subscription = Subscription(subscription_config)
update_mask = FieldMask(paths=[
'ack_deadline_seconds',
'expiration_policy. ttl',
'message_retention_duration',
])
if filter_args:
update_mask.paths.append('filter')
self. subscriber.update_subscription(
request={
'subscription': subscription,
'update_mask': update_mask
}
)
return subscription_path