-
-
Notifications
You must be signed in to change notification settings - Fork 979
Description
I am using Celery with SQS as a broker and I am trying to renew my credentials "AWS_ACCESS_KEY_ID" and "AWS_SECRET_ACCESS_KEY", before they expire, the first time I run the task and the result is success, but after 15 minutes it expires although credentials have been renewed, the function to update credentials is as follows:
import os
import boto3
from celery import Celery
from kombu.utils.url import safequote
def update_aws_credentials():
role_info = {
'RoleArn': f"arn:aws:iam::{os.environ['AWS_ACCOUNT_NUMER']}:role/my_role_execution",
'RoleSessionName': 'roleExecution',
'DurationSeconds': 900
}
sts_client = boto3.client('sts', region_name='eu-central-1')
credentials = sts_client.assume_role(**role_info)
aws_access_key_id = credentials["Credentials"]['AccessKeyId']
aws_secret_access_key = credentials["Credentials"]['SecretAccessKey']
aws_session_token = credentials["Credentials"]["SessionToken"]
os.environ["AWS_ACCESS_KEY_ID"] = aws_access_key_id
os.environ["AWS_SECRET_ACCESS_KEY"] = aws_secret_access_key
os.environ["AWS_DEFAULT_REGION"] = 'eu-central-1'
os.environ["AWS_SESSION_TOKEN"] = aws_session_token
return aws_access_key_id, aws_secret_access_key
def get_celery(aws_access_key_id, aws_secret_access_key):
broker = f"sqs://{safequote(aws_access_key_id)}:{safequote(aws_secret_access_key)}@"
backend = 'redis://redis-service:6379/0'
celery = Celery(f"my_task", broker=broker, backend=backend)
celery.conf["broker_transport_options"] = {
'polling_interval': 30,
'region': 'eu-central-1',
'predefined_queues': {
f"my_queue": {
'url': f"https://sqs.eu-central-1.amazonaws.com/{os.environ['AWS_ACCOUNT_NUMER']}/my_queue"
}
}
}
celery.conf["task_default_queue"] = f"my_queue"
return celery
def refresh_sqs_credentials():
access, secret = update_aws_credentials()
return get_celery(access, secret)
Running refresh_sqs_credentials, new credentials are created:
celery = worker.refresh_sqs_credentials()
And then I run my task with celery:
task = celery.send_task('my_task.code_of_my_task', args=[content], task_id=task_id)
All tasks that I run before 15 minutes finish successfully, but after 15 minutes the error is the following:
[2021-12-14 14:08:15,637] ERROR in app: Exception on /tasks/run [POST]
Traceback (most recent call last):
File "/api/app.py", line 87, in post
task = celery.send_task('glgt_ap35080_dev_sqs_runalgo.allocation_alg_task', args=[content], task_id=task_id)
File "/usr/local/lib/python3.6/site-packages/celery/app/base.py", line 717, in send_task
amqp.send_task_message(P, name, message, **options)
File "/usr/local/lib/python3.6/site-packages/celery/app/amqp.py", line 547, in send_task_message
**properties
File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 178, in publish
exchange_name, declare,
File "/usr/local/lib/python3.6/site-packages/kombu/connection.py", line 525, in _ensured
return fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/kombu/messaging.py", line 200, in _publish
mandatory=mandatory, immediate=immediate,
File "/usr/local/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 605, in basic_publish
return self._put(routing_key, message, **kwargs)
File "/usr/local/lib/python3.6/site-packages/kombu/transport/SQS.py", line 294, in _put
c.send_message(**kwargs)
File "/usr/local/lib/python3.6/site-packages/botocore/client.py", line 337, in _api_call
File "/usr/local/lib/python3.6/site-packages/botocore/client.py", line 656, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ExpiredToken) when calling the SendMessage operation: The security token included in the request is expired
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1813, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1799, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/usr/local/lib/python3.6/site-packages/flask_restplus/api.py", line 325, in wrapper
resp = resource(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/flask/views.py", line 88, in view
return self.dispatch_request(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/flask_restplus/resource.py", line 44, in dispatch_request
resp = meth(*args, **kwargs)
File "/api/app.py", line 90, in post
abort(500)
File "/usr/local/lib/python3.6/site-packages/werkzeug/exceptions.py", line 774, in abort
return _aborter(status, *args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/werkzeug/exceptions.py", line 755, in __call__
raise self.mapping[code](*args, **kwargs)
werkzeug.exceptions.InternalServerError: 500 Internal Server Error: The server encountered an internal error and was unable to complete your request. Either the server is overloaded or there is an error in the application.
10.142.95.217 - - [14/Dec/2021 14:08:15] "POST /tasks/run HTTP/1.1" 500 -
I'm storing the credentials in environment variables, I don't understand why it expires after 15 minutes, can someone help me please?
The versions of the packages used are:
boto3==1.14.54
celery==5.0.0
kombu==5.0.2
pycurl==7.43.0.6
Thank you