|
1 | | -from os import environ as env |
| 1 | +import os |
2 | 2 | import re |
3 | | -import boto3, botocore |
| 3 | +import logging |
| 4 | +from typing import List |
| 5 | +import boto3 |
| 6 | +from botocore.exceptions import ClientError |
4 | 7 |
|
5 | | -def check_bucket(s3, bucket_name): |
| 8 | +# Configure logging |
| 9 | +logging.basicConfig(level=logging.INFO) |
| 10 | +logger = logging.getLogger(__name__) |
| 11 | + |
| 12 | + |
| 13 | +def get_environment_variable(name: str, required: bool = True) -> str: |
| 14 | + """ |
| 15 | + Retrieves an environment variable or raises an error if it's missing. |
| 16 | + """ |
| 17 | + value = os.getenv(name) |
| 18 | + if required and not value: |
| 19 | + raise ValueError(f"Missing required environment variable: {name}") |
| 20 | + return value |
| 21 | + |
| 22 | + |
| 23 | +def parse_bucket_list(bucket_list: str) -> List[str]: |
| 24 | + """ |
| 25 | + Parses a string of bucket names separated by commas, spaces, or semicolons into a list. |
| 26 | + """ |
| 27 | + return re.split(r'[ ,;]+', bucket_list.strip()) |
| 28 | + |
| 29 | + |
| 30 | +def check_bucket(s3_client, bucket_name: str) -> bool: |
| 31 | + """ |
| 32 | + Checks if a bucket exists and is accessible. |
| 33 | +
|
| 34 | + Returns True if the bucket exists or is private, False otherwise. |
| 35 | + """ |
6 | 36 | try: |
7 | | - s3.head_bucket(Bucket=bucket_name) |
8 | | - print(f"{bucket_name} available") |
| 37 | + s3_client.head_bucket(Bucket=bucket_name) |
| 38 | + logger.info(f"Bucket '{bucket_name}' is available.") |
9 | 39 | return True |
10 | | - except botocore.exceptions.ClientError as e: |
| 40 | + except ClientError as e: |
11 | 41 | error_code = int(e.response['Error']['Code']) |
12 | 42 | if error_code == 403: |
13 | | - print(f"{bucket_name} is Private. Access denied.") |
| 43 | + logger.warning(f"Bucket '{bucket_name}' is private. Access denied.") |
14 | 44 | return True |
15 | 45 | elif error_code == 404: |
16 | | - print(f"{bucket_name} does not exist") |
| 46 | + logger.info(f"Bucket '{bucket_name}' does not exist.") |
17 | 47 | return False |
| 48 | + else: |
| 49 | + logger.error(f"Error checking bucket '{bucket_name}': {e}") |
| 50 | + raise |
| 51 | + |
| 52 | + |
| 53 | +def create_bucket(s3_client, bucket_name: str): |
| 54 | + """ |
| 55 | + Creates a bucket if it does not exist. |
| 56 | + """ |
| 57 | + try: |
| 58 | + logger.info(f"Creating bucket '{bucket_name}'.") |
| 59 | + s3_client.create_bucket(Bucket=bucket_name) |
| 60 | + logger.info(f"Bucket '{bucket_name}' created successfully.") |
| 61 | + except ClientError as e: |
| 62 | + logger.error(f"Failed to create bucket '{bucket_name}': {e}") |
| 63 | + raise |
| 64 | + |
18 | 65 |
|
19 | 66 | def main(): |
20 | | - buckets = env['S3_BUCKET_LIST'] |
21 | | - buckets = re.split(r',| |;', buckets) |
22 | | - end_point = env['S3_BUCKET_ENDPOINT'] |
| 67 | + """ |
| 68 | + Main function to check and create S3 buckets as needed. |
| 69 | + """ |
| 70 | + bucket_list_str = get_environment_variable('S3_BUCKET_LIST') |
| 71 | + buckets = parse_bucket_list(bucket_list_str) |
| 72 | + endpoint = get_environment_variable('S3_BUCKET_ENDPOINT') |
23 | 73 |
|
24 | | - session = boto3.session.Session() |
| 74 | + aws_access_key_id = get_environment_variable('AWS_ACCESS_KEY_ID') |
| 75 | + aws_secret_access_key = get_environment_variable('AWS_SECRET_ACCESS_KEY') |
25 | 76 |
|
| 77 | + session = boto3.session.Session() |
26 | 78 | s3 = session.client( |
27 | 79 | service_name='s3', |
28 | | - aws_access_key_id=env['AWS_ACCESS_KEY_ID'], |
29 | | - aws_secret_access_key=env['AWS_SECRET_ACCESS_KEY'], |
30 | | - endpoint_url=end_point, |
| 80 | + aws_access_key_id=aws_access_key_id, |
| 81 | + aws_secret_access_key=aws_secret_access_key, |
| 82 | + endpoint_url=endpoint, |
31 | 83 | ) |
32 | 84 |
|
33 | | - for i in buckets: |
34 | | - if not check_bucket(s3, i): |
35 | | - print(f"Creating {i}") |
36 | | - s3.create_bucket(Bucket=i) |
| 85 | + for bucket_name in buckets: |
| 86 | + if not check_bucket(s3, bucket_name): |
| 87 | + create_bucket(s3, bucket_name) |
| 88 | + |
37 | 89 |
|
38 | | -if __name__=="__main__": |
39 | | - create_buckets = env['CREATE_DEFAULT_S3_BUCKETS'] |
40 | | - if create_buckets.lower() == 'true': |
41 | | - print("Creating default buckets") |
42 | | - main() |
| 90 | +if __name__ == "__main__": |
| 91 | + try: |
| 92 | + create_buckets = get_environment_variable('CREATE_DEFAULT_S3_BUCKETS', required=False) |
| 93 | + if create_buckets and create_buckets.lower() == 'true': |
| 94 | + logger.info("Starting bucket creation process.") |
| 95 | + main() |
| 96 | + else: |
| 97 | + logger.info("CREATE_DEFAULT_S3_BUCKETS is not set to 'true'. Skipping bucket creation.") |
| 98 | + except Exception as e: |
| 99 | + logger.error(f"An error occurred: {e}") |
| 100 | + exit(1) |
0 commit comments