|
13 | 13 | CONFIG = Config() |
14 | 14 |
|
15 | 15 |
|
16 | | -def get_extracted_files_for_source( |
17 | | - source: str, |
18 | | - bucket: str = CONFIG.TIMDEX_BUCKET, |
19 | | -) -> list[str]: |
20 | | - """List S3 URIs for extract files in TIMDEX S3 bucket for a given source.""" |
21 | | - s3_client = boto3.client("s3") |
22 | | - files = [] |
23 | | - |
24 | | - paginator = s3_client.get_paginator("list_objects_v2") |
25 | | - page_iterator = paginator.paginate(Bucket=bucket, Prefix=source) |
26 | | - |
27 | | - for page in page_iterator: |
28 | | - if "Contents" in page: |
29 | | - for obj in page["Contents"]: |
30 | | - if not obj["Key"].endswith("/"): # skip folders |
31 | | - s3_uri = f"s3://{bucket}/{obj['Key']}" |
32 | | - files.append(s3_uri) |
33 | | - |
34 | | - # filter where "extracted" in filename |
35 | | - return [file for file in files if "extracted" in file] |
| 16 | +def get_ordered_extracted_files_all_sources( |
| 17 | + sources: list[str] | None = None, |
| 18 | +) -> dict[str, list[str]]: |
| 19 | + """Get ordered extract files for all TIMDEX sources.""" |
| 20 | + if not sources: |
| 21 | + sources = CONFIG.active_timdex_sources |
| 22 | + return { |
| 23 | + source: get_ordered_extracted_files_since_last_full_run(source=source) |
| 24 | + for source in sources |
| 25 | + } |
36 | 26 |
|
37 | 27 |
|
38 | 28 | def get_ordered_extracted_files_since_last_full_run(source: str) -> list[str]: |
@@ -81,13 +71,23 @@ def _extract_date(filename: str) -> datetime.datetime: |
81 | 71 | return datetime.datetime.strptime(date_string, "%Y-%m-%d").astimezone(datetime.UTC) |
82 | 72 |
|
83 | 73 |
|
84 | | -def get_ordered_extracted_files_all_sources( |
85 | | - sources: list[str] | None = None, |
86 | | -) -> dict[str, list[str]]: |
87 | | - """Get ordered extract files for all TIMDEX sources.""" |
88 | | - if not sources: |
89 | | - sources = CONFIG.active_timdex_sources |
90 | | - return { |
91 | | - source: get_ordered_extracted_files_since_last_full_run(source=source) |
92 | | - for source in sources |
93 | | - } |
| 74 | +def get_extracted_files_for_source( |
| 75 | + source: str, |
| 76 | + bucket: str = CONFIG.TIMDEX_BUCKET, |
| 77 | +) -> list[str]: |
| 78 | + """List S3 URIs for extract files in TIMDEX S3 bucket for a given source.""" |
| 79 | + s3_client = boto3.client("s3") |
| 80 | + files = [] |
| 81 | + |
| 82 | + paginator = s3_client.get_paginator("list_objects_v2") |
| 83 | + page_iterator = paginator.paginate(Bucket=bucket, Prefix=source) |
| 84 | + |
| 85 | + for page in page_iterator: |
| 86 | + if "Contents" in page: |
| 87 | + for obj in page["Contents"]: |
| 88 | + if not obj["Key"].endswith("/"): # skip folders |
| 89 | + s3_uri = f"s3://{bucket}/{obj['Key']}" |
| 90 | + files.append(s3_uri) |
| 91 | + |
| 92 | + # filter where "extracted" in filename |
| 93 | + return [file for file in files if "extracted" in file] |
0 commit comments