diff --git a/Makefile b/Makefile index b53b34d..e5c9ddf 100644 --- a/Makefile +++ b/Makefile @@ -86,6 +86,7 @@ build-transform: cp -R lambdas/transform/__init__.py $(BUILD_DIR)/transform/lambdas/transform/__init__.py cp -R lambdas/shared $(BUILD_DIR)/transform/lambdas/shared find $(BUILD_DIR)/transform -type d -name '__pycache__' -prune -exec rm -rf {} + + $(PY) -m pip install -r lambdas/transform/requirements-build.txt --target $(BUILD_DIR)/transform --upgrade cd $(BUILD_DIR)/transform && zip -qr ../transform.zip . build-ops-replay: diff --git a/README.md b/README.md index e43d591..c2b7937 100644 --- a/README.md +++ b/README.md @@ -11,11 +11,11 @@ > 轻量起步,企业化能力随开随用:**S3 → Lambda → SQS → Lambda → S3(Parquet)**,可选编排、目录、质量门禁与可观测性。 This v2.0 elevates the minimal v1 into a **production-ready, enterprise-style** framework: -- **Orchestration:** EventBridge → Step Functions → Glue Job (+ optional **Great Expectations** gate) -- **Catalog / Query:** Glue Data Catalog + Crawler + Athena tables for **silver/** Parquet -- **Replay / Recovery:** `replay` & `dlq-redrive` scripts for backfill and poison-message recovery -- **Idempotency:** DynamoDB TTL for object-level dedup, optional GSI for audit -- **CI/CD:** GitHub Actions pipelines (Lambda build+deploy, Terraform plan+apply) via **OIDC** (no long-lived keys) +- **Orchestration (optional):** EventBridge → Step Functions → Glue Job (+ optional **Great Expectations** gate) +- **Catalog / Query (optional):** Glue Data Catalog + Crawler + Athena tables for **silver/** Parquet +- **Replay / Recovery:** `replay` & `redrive` scripts for backfill and poison-message recovery +- **Idempotency:** Object-level dedup via DynamoDB TTL (Powertools Idempotency) +- **CI/CD:** GitHub Actions CI (pytest + terraform fmt) + manual Terraform plan/apply (supports keys/OIDC) --- @@ -25,10 +25,10 @@ This v2.0 elevates the minimal v1 into a **production-ready, enterprise-style** S3 (bronze/*.jsonl) └─(ObjectCreated) - Lambda ingest (Powertools/DynamoDB idempotency) - └─ SQS (events) ──(event source mapping)──> Lambda transform (Parquet) - └─ DLQ (optional) - └─ S3 (silver/*.parquet) ──> Glue Catalog & Athena (optional) + Lambda ingest (Powertools logging/metrics + DynamoDB idempotency) + └─ SQS (events) ──(event source mapping)──> Lambda transform (Parquet) + └─ DLQ (optional) + └─ S3 (silver/*.parquet) ──> (optional) Glue Catalog & Athena ``` @@ -42,10 +42,10 @@ S3 (bronze/*.jsonl) |---|---|---| | Pipeline | S3→Lambda→SQS→Lambda→S3 | Same + Step Functions orchestration | | Storage | JSONL → Parquet | Parquet + Glue tables for Athena | -| Idempotency | DDB basic | DDB TTL + optional GSI for audit | -| Replay / DLQ | Manual | `scripts/replay*.sh`, `scripts/dlq-redrive.sh` | +| Idempotency | DDB object-level lock | Powertools Idempotency (DynamoDB TTL) + replay/backfill | +| Replay / DLQ | Manual | `scripts/replay.sh`, `scripts/redrive.sh` | | Observability | Logs only | (optional) CloudWatch Dashboards + Alarms | -| CI/CD | Manual apply | GitHub Actions (OIDC), terraform plan/apply | +| CI/CD | Manual apply | GitHub Actions CI + manual terraform plan/apply (keys/OIDC) | | DQ | – | Glue Job + optional Great Expectations gate | --- @@ -73,7 +73,7 @@ repo-root/ │ ├─ gen_fake_events.py │ ├─ replay_from_s3.py # S3→SQS 直推 (需要 sqs:SendMessage) │ ├─ replay.sh # S3 copy 到新的 bronze/ 前缀(推荐,无需 SQS 发信权限) -│ ├─ dlq-redrive.sh # SQS 原生 redrive +│ ├─ redrive.sh # SQS 原生 redrive │ └─ scaffold.sh # 生成 dataset 骨架 ├─ configs/ # 每个 dataset 的元配置 ├─ dq/ # 轻量 DQ 规则(或映射到 GE) @@ -184,10 +184,18 @@ make verify-idempotency # End to End Validation make verify-e2e +### 🖼️ README 图片显示不出来? + +如果你新插入的图片比如 `![](demo/1.png)` 显示不出来,通常是下面原因之一: + +- 文件路径不对:GitHub 对路径大小写敏感;而且 `demo/1.png` 必须真的存在(本 repo 里默认是 `demo/0-1.png`、`demo/dataset-scaffold.png` 这类文件名)。 +- 图片还没被 git 跟踪:本地能看到但没 `git add` / `git commit` / `git push`,GitHub 上当然不会有。 +- 文件名有空格/括号/中文:用 `![]()` 这种写法更稳。 + +建议先跑一键版(会依次执行多步 CLI 验证 + 造数 + 等待 Silver): + +- `make verify-e2e` -![](demo/1.png) -![](demo/2.png) -![](demo/3.png) ![]() ![]() @@ -195,7 +203,7 @@ make verify-e2e 🧪 Idempotency Model • Scope: S3 object-level (bucket/key#etag) - • Store: DynamoDB with TTL(可选 GSI:审计与快速查询) + • Store: DynamoDB with TTL (Powertools Idempotency) • SQS consumer: partial batch failure + DLQ redrive 脚本 ⸻ @@ -330,4 +338,3 @@ TF_AUTO_APPROVE=1 make tf-destroy License MIT - diff --git a/infra/terraform/envs/dev/main.tf b/infra/terraform/envs/dev/main.tf index 5c492a7..34f7808 100644 --- a/infra/terraform/envs/dev/main.tf +++ b/infra/terraform/envs/dev/main.tf @@ -108,10 +108,10 @@ module "ingest_lambda" { timeout = 30 memory_size = 256 environment = { - QUEUE_URL = local.queue_url - IDEMPOTENCY_TABLE = module.idempotency_table.name - LOCK_SECONDS = "900" - LOG_LEVEL = "INFO" + QUEUE_URL = local.queue_url + IDEMPOTENCY_TABLE = module.idempotency_table.name + IDEMPOTENCY_TTL_SECONDS = tostring(30 * 24 * 60 * 60) + LOG_LEVEL = "INFO" } tags = local.tags } diff --git a/infra/terraform/modules/iam/main.tf b/infra/terraform/modules/iam/main.tf index d4259c3..3362a69 100644 --- a/infra/terraform/modules/iam/main.tf +++ b/infra/terraform/modules/iam/main.tf @@ -76,7 +76,7 @@ data "aws_iam_policy_document" "ingest" { } statement { - actions = ["dynamodb:UpdateItem"] + actions = ["dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem"] resources = [var.idempotency_table_arn] } } diff --git a/lambdas/ingest/app.py b/lambdas/ingest/app.py index 0361977..6316cd0 100644 --- a/lambdas/ingest/app.py +++ b/lambdas/ingest/app.py @@ -5,21 +5,31 @@ - S3 ObjectCreated events for Bronze JSON/JSONL objects. What it does: -- Uses DynamoDB as a lightweight lock/idempotency table keyed by `s3://bucket/key#etag`. +- Uses DynamoDB as an idempotency store keyed by the S3 object identity (`bucket/key + etag`). - Reads the object, parses JSONL/JSON, normalizes records, and publishes to SQS in batches. +- Emits structured logs + embedded metrics via AWS Lambda Powertools. Environment variables: - `QUEUE_URL` (required): Destination SQS queue URL. - `IDEMPOTENCY_TABLE` (required): DynamoDB table name for object locks. -- `LOCK_SECONDS` (optional): Lock TTL to avoid duplicate ingestion during retries. +- `IDEMPOTENCY_TTL_SECONDS` (optional): TTL for idempotency records (default 30 days). +- `LOCK_SECONDS` (optional, legacy): Backward-compatible alias for `IDEMPOTENCY_TTL_SECONDS`. """ from typing import Any, Dict, List, Optional import boto3 +from aws_lambda_powertools import Logger, Metrics +from aws_lambda_powertools.metrics import MetricUnit +from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function + from lambdas.shared.schemas import normalize_record -from lambdas.shared.utils import env, iter_json_records, json_dumps, log, parse_s3_event_records, utc_epoch +from lambdas.shared.utils import env, iter_json_records, json_dumps, parse_s3_event_records + + +logger = Logger(service="serverless-elt.ingest") +metrics = Metrics(namespace="ServerlessELT", service="ingest") def _clients(): @@ -30,59 +40,6 @@ def _clients(): ) -def _acquire_object_lock(ddb, table_name: str, pk: str, lock_seconds: int) -> bool: - now = utc_epoch() - exp = now + lock_seconds - try: - ddb.update_item( - TableName=table_name, - Key={"pk": {"S": pk}}, - UpdateExpression="SET #s=:inflight, expires_at=:exp, updated_at=:now ADD attempts :one", - ConditionExpression=( - "attribute_not_exists(pk) " - "OR (#s <> :processed AND (attribute_not_exists(expires_at) OR expires_at < :now))" - ), - ExpressionAttributeNames={"#s": "status"}, - ExpressionAttributeValues={ - ":inflight": {"S": "INFLIGHT"}, - ":processed": {"S": "PROCESSED"}, - ":exp": {"N": str(exp)}, - ":now": {"N": str(now)}, - ":one": {"N": "1"}, - }, - ) - return True - except ddb.exceptions.ConditionalCheckFailedException: - return False - - -def _mark_processed(ddb, table_name: str, pk: str) -> None: - now = utc_epoch() - ddb.update_item( - TableName=table_name, - Key={"pk": {"S": pk}}, - UpdateExpression="SET #s=:processed, updated_at=:now REMOVE expires_at", - ExpressionAttributeNames={"#s": "status"}, - ExpressionAttributeValues={":processed": {"S": "PROCESSED"}, ":now": {"N": str(now)}}, - ) - - -def _mark_error(ddb, table_name: str, pk: str, reason: str) -> None: - now = utc_epoch() - ddb.update_item( - TableName=table_name, - Key={"pk": {"S": pk}}, - UpdateExpression="SET #s=:error, error_reason=:r, expires_at=:past, updated_at=:now", - ExpressionAttributeNames={"#s": "status"}, - ExpressionAttributeValues={ - ":error": {"S": "ERROR"}, - ":r": {"S": reason[:500]}, - ":past": {"N": str(now - 1)}, - ":now": {"N": str(now)}, - }, - ) - - def _object_id(bucket: str, key: str, etag: str) -> str: return f"s3://{bucket}/{key}#{etag}" @@ -114,56 +71,111 @@ def _enqueue_records(sqs, queue_url: str, records: List[Dict[str, Any]]) -> int: return sent +def _log(event: str, **fields: Any) -> None: + logger.info(event, extra=fields) + + +def _cached_response_hook(response: Any, data_record: Any) -> Any: + if isinstance(response, dict): + return {**response, "cached": True} + return {"cached": True, "result": response} + + +def _get_idempotent_processor(table_name: str, ttl_seconds: int, ddb_client: Any, lambda_context: Any): + config = IdempotencyConfig( + event_key_jmespath="pk", + expires_after_seconds=ttl_seconds, + response_hook=_cached_response_hook, + lambda_context=lambda_context, + ) + persistence = DynamoDBPersistenceLayer( + table_name=table_name, + key_attr="pk", + expiry_attr="expires_at", + in_progress_expiry_attr="in_progress_expires_at", + status_attr="status", + data_attr="data", + validation_key_attr="validation", + boto3_client=ddb_client, + ) + + @idempotent_function(data_keyword_argument="item", persistence_store=persistence, config=config) + def _process_object(*, item: Dict[str, Any], s3: Any, sqs: Any, queue_url: str) -> Dict[str, Any]: + bucket = item["bucket"] + key = item["key"] + etag = item.get("etag", "") + object_id = item["pk"] + + text = _read_s3_text(s3, bucket, key) + records: List[Dict[str, Any]] = [] + dropped = 0 + for line_no, obj in enumerate(iter_json_records(text), start=1): + try: + normalized = normalize_record(obj) + except Exception as e: + dropped += 1 + _log("ingest_drop_bad_record", object_id=object_id, line_no=line_no, error=str(e)) + continue + normalized["_source"] = {"bucket": bucket, "key": key, "etag": etag, "line_no": line_no} + records.append(normalized) + + enq = _enqueue_records(sqs, queue_url, records) + _log("ingest_object_done", object_id=object_id, records=len(records), enqueued=enq, dropped=dropped) + return {"records": len(records), "enqueued": enq, "dropped": dropped} + + return _process_object + + +@metrics.log_metrics(capture_cold_start_metric=True) def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: queue_url = env("QUEUE_URL") table_name = env("IDEMPOTENCY_TABLE") - lock_seconds = int(env("LOCK_SECONDS", "900")) + ttl_seconds = int(env("IDEMPOTENCY_TTL_SECONDS", env("LOCK_SECONDS", str(30 * 24 * 60 * 60)))) s3, sqs, ddb = _clients() objects = parse_s3_event_records(event) total_records = 0 total_enqueued = 0 skipped = 0 + dropped = 0 - log("ingest_start", objects=len(objects)) - for bucket, key, etag in objects: - # Idempotency scope is the *S3 object version* (bucket/key + etag), not per record. - pk = _object_id(bucket, key, etag) - if not _acquire_object_lock(ddb, table_name, pk, lock_seconds=lock_seconds): - skipped += 1 - log("ingest_skip_idempotent", pk=pk) - continue + metrics.add_metric(name="ObjectsReceived", unit=MetricUnit.Count, value=len(objects)) + _log("ingest_start", objects=len(objects)) + lambda_context = context if hasattr(context, "get_remaining_time_in_millis") else None + process_object = _get_idempotent_processor(table_name=table_name, ttl_seconds=ttl_seconds, ddb_client=ddb, lambda_context=lambda_context) + for bucket, key, etag in objects: + object_id = _object_id(bucket, key, etag) + item = {"pk": object_id, "bucket": bucket, "key": key, "etag": etag} try: - # Read the source object, parse JSONL lines, and normalize into a canonical schema. - text = _read_s3_text(s3, bucket, key) - records: List[Dict[str, Any]] = [] - for line_no, obj in enumerate(iter_json_records(text), start=1): - try: - normalized = normalize_record(obj) - except Exception as e: - log("ingest_drop_bad_record", pk=pk, line_no=line_no, error=str(e)) - continue - normalized["_source"] = {"bucket": bucket, "key": key, "etag": etag, "line_no": line_no} - records.append(normalized) - - total_records += len(records) - # Batch send to SQS (max 10 entries per batch). - enq = _enqueue_records(sqs, queue_url, records) - total_enqueued += enq - _mark_processed(ddb, table_name, pk) - log("ingest_object_done", pk=pk, records=len(records), enqueued=enq) + result = process_object(item=item, s3=s3, sqs=sqs, queue_url=queue_url) except Exception as e: - # Mark error and re-raise so the invocation is considered failed (visible in logs/metrics). - _mark_error(ddb, table_name, pk, reason=str(e)) - log("ingest_object_error", pk=pk, error=str(e)) + _log("ingest_object_error", object_id=object_id, error=str(e)) raise + if isinstance(result, dict) and result.get("cached") is True: + skipped += 1 + metrics.add_metric(name="ObjectsSkippedIdempotent", unit=MetricUnit.Count, value=1) + _log("ingest_skip_idempotent", object_id=object_id) + continue + + total_records += int(result.get("records", 0)) + total_enqueued += int(result.get("enqueued", 0)) + dropped += int(result.get("dropped", 0)) + + metrics.add_metric(name="RecordsEnqueued", unit=MetricUnit.Count, value=total_enqueued) + metrics.add_metric(name="RecordsParsed", unit=MetricUnit.Count, value=total_records) + if dropped: + metrics.add_metric(name="RecordsDropped", unit=MetricUnit.Count, value=dropped) + if skipped: + metrics.add_metric(name="ObjectsSkippedIdempotent", unit=MetricUnit.Count, value=skipped) + return { "objects": len(objects), "records": total_records, "enqueued": total_enqueued, "skipped": skipped, + "dropped": dropped, "request_id": getattr(context, "aws_request_id", None), } diff --git a/lambdas/ingest/requirements.txt b/lambdas/ingest/requirements.txt index 540978c..009f64a 100644 --- a/lambdas/ingest/requirements.txt +++ b/lambdas/ingest/requirements.txt @@ -1,2 +1,2 @@ boto3>=1.34.0 - +aws-lambda-powertools>=3.0.0,<4.0.0 diff --git a/lambdas/ingest/tests/test_ingest.py b/lambdas/ingest/tests/test_ingest.py index 438130e..185c21a 100644 --- a/lambdas/ingest/tests/test_ingest.py +++ b/lambdas/ingest/tests/test_ingest.py @@ -38,28 +38,8 @@ def test_ingest_enqueues_and_marks_processed(monkeypatch): ] } - # Acquire lock - ddb_stubber.add_response( - "update_item", - {}, - { - "TableName": "tbl", - "Key": {"pk": {"S": "s3://bronze-bucket/bronze/shipments/a.jsonl#etag1"}}, - "UpdateExpression": "SET #s=:inflight, expires_at=:exp, updated_at=:now ADD attempts :one", - "ConditionExpression": ( - "attribute_not_exists(pk) " - "OR (#s <> :processed AND (attribute_not_exists(expires_at) OR expires_at < :now))" - ), - "ExpressionAttributeNames": {"#s": "status"}, - "ExpressionAttributeValues": { - ":inflight": {"S": "INFLIGHT"}, - ":processed": {"S": "PROCESSED"}, - ":exp": {"N": ANY}, - ":now": {"N": ANY}, - ":one": {"N": "1"}, - }, - }, - ) + # Powertools idempotency: save_inprogress is a conditional PutItem (no need to validate params here). + ddb_stubber.add_response("put_item", {}, None) s3_stubber.add_response( "get_object", @@ -73,21 +53,22 @@ def test_ingest_enqueues_and_marks_processed(monkeypatch): {"QueueUrl": "https://sqs.example/123/q", "Entries": [{"Id": "0", "MessageBody": ANY}]}, ) - # Mark processed - ddb_stubber.add_response( - "update_item", - {}, - { - "TableName": "tbl", - "Key": {"pk": {"S": "s3://bronze-bucket/bronze/shipments/a.jsonl#etag1"}}, - "UpdateExpression": "SET #s=:processed, updated_at=:now REMOVE expires_at", - "ExpressionAttributeNames": {"#s": "status"}, - "ExpressionAttributeValues": {":processed": {"S": "PROCESSED"}, ":now": {"N": ANY}}, - }, - ) + # Powertools idempotency: save_success updates the record (no need to validate params). + ddb_stubber.add_response("update_item", {}, None) with s3_stubber, sqs_stubber, ddb_stubber: - resp = ingest.handler(event, context=type("C", (), {"aws_request_id": "r1"})()) + resp = ingest.handler( + event, + context=type( + "C", + (), + { + "aws_request_id": "r1", + "function_name": "serverless-elt-ingest", + "get_remaining_time_in_millis": lambda self: 10000, + }, + )(), + ) assert resp["objects"] == 1 assert resp["records"] == 1 @@ -116,32 +97,34 @@ def test_ingest_skips_when_lock_not_acquired(monkeypatch): } ddb_stubber = Stubber(ddb) + # Simulate "already completed": + # - Conditional put fails (no old item returned) + # - Powertools idempotency falls back to GetItem and returns cached response ddb_stubber.add_client_error( - "update_item", + "put_item", service_error_code="ConditionalCheckFailedException", service_message="condition failed", http_status_code=400, - expected_params={ - "TableName": "tbl", - "Key": {"pk": {"S": "s3://bronze-bucket/bronze/shipments/a.jsonl#etag1"}}, - "UpdateExpression": "SET #s=:inflight, expires_at=:exp, updated_at=:now ADD attempts :one", - "ConditionExpression": ( - "attribute_not_exists(pk) " - "OR (#s <> :processed AND (attribute_not_exists(expires_at) OR expires_at < :now))" - ), - "ExpressionAttributeNames": {"#s": "status"}, - "ExpressionAttributeValues": { - ":inflight": {"S": "INFLIGHT"}, - ":processed": {"S": "PROCESSED"}, - ":exp": {"N": ANY}, - ":now": {"N": ANY}, - ":one": {"N": "1"}, - }, + expected_params=None, + ) + ddb_stubber.add_response( + "get_item", + { + "Item": { + "pk": {"S": "some-key"}, + "expires_at": {"N": "9999999999"}, + "status": {"S": "COMPLETED"}, + "data": {"S": "{\"records\":1,\"enqueued\":1,\"dropped\":0}"}, + } }, + None, ) with ddb_stubber: - resp = ingest.handler(event, context=None) + resp = ingest.handler( + event, + context=type("C", (), {"function_name": "serverless-elt-ingest", "get_remaining_time_in_millis": lambda self: 10000})(), + ) assert resp["skipped"] == 1 assert resp["records"] == 0 diff --git a/lambdas/transform/app.py b/lambdas/transform/app.py index 9fe5f29..33e4959 100644 --- a/lambdas/transform/app.py +++ b/lambdas/transform/app.py @@ -18,6 +18,7 @@ - `MAX_RECORDS_PER_FILE` (default: 5000) - `QUALITY_EVENTBRIDGE_ENABLED` (default: false) - `QUALITY_EVENTBUS_NAME` (default: "default"), `QUALITY_EVENT_SOURCE`, `QUALITY_EVENT_DETAIL_TYPE` +- Powertools: structured logs + embedded metrics (no extra CloudWatch permissions required) """ import io @@ -27,8 +28,15 @@ import boto3 +from aws_lambda_powertools import Logger, Metrics +from aws_lambda_powertools.metrics import MetricUnit + from lambdas.shared.schemas import normalize_record, partition_dt, to_pyarrow_schema -from lambdas.shared.utils import chunked, env, json_dumps, log, new_id +from lambdas.shared.utils import chunked, env, json_dumps, new_id + + +logger = Logger(service="serverless-elt.transform") +metrics = Metrics(namespace="ServerlessELT", service="transform") def _clients(): @@ -46,6 +54,11 @@ def _s3_put_parquet(s3, bucket: str, key: str, records: List[Dict[str, Any]], re s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue()) +def _log(event: str, **fields: Any) -> None: + logger.info(event, extra=fields) + + +@metrics.log_metrics(capture_cold_start_metric=True) def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: out_bucket = env("SILVER_BUCKET") base_prefix = env("SILVER_PREFIX", "silver") @@ -60,6 +73,8 @@ def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: records = event.get("Records", []) failures: List[Dict[str, str]] = [] + metrics.add_metric(name="MessagesReceived", unit=MetricUnit.Count, value=len(records)) + # Parse + normalize messages. Bad messages become partial failures (retries/DLQ). good: List[Tuple[str, Dict[str, Any], str]] = [] for r in records: @@ -70,7 +85,7 @@ def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: record_type = normalized["record_type"] good.append((msg_id, normalized, record_type)) except Exception as e: - log("transform_bad_message", message_id=msg_id, error=str(e)) + _log("transform_bad_message", message_id=msg_id, error=str(e)) if msg_id: failures.append({"itemIdentifier": msg_id}) @@ -91,9 +106,9 @@ def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: _s3_put_parquet(s3, out_bucket, key, only_records, record_type=record_type) written_files += 1 partitions_written[(record_type, dt)] = partitions_written.get((record_type, dt), 0) + 1 - log("transform_write_ok", record_type=record_type, dt=dt, key=key, count=len(only_records)) + _log("transform_write_ok", record_type=record_type, dt=dt, key=key, count=len(only_records)) except Exception as e: - log("transform_write_error", record_type=record_type, dt=dt, error=str(e)) + _log("transform_write_error", record_type=record_type, dt=dt, error=str(e)) failures.extend({"itemIdentifier": msg_id} for msg_id, _ in items_chunk if msg_id) # Optional: notify downstream orchestration that a partition is ready for quality validation. @@ -123,9 +138,13 @@ def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: for chunk in chunked(entries, 10): resp = events.put_events(Entries=chunk) failed += int(resp.get("FailedEntryCount", 0)) - log("quality_events_emitted", entries=len(entries), failed=failed) + _log("quality_events_emitted", entries=len(entries), failed=failed) except Exception as e: - log("quality_events_emit_error", error=str(e)) + _log("quality_events_emit_error", error=str(e)) + + metrics.add_metric(name="FilesWritten", unit=MetricUnit.Count, value=written_files) + if failures: + metrics.add_metric(name="MessagesFailed", unit=MetricUnit.Count, value=len(failures)) return {"batchItemFailures": failures} diff --git a/lambdas/transform/requirements-build.txt b/lambdas/transform/requirements-build.txt new file mode 100644 index 0000000..b93a398 --- /dev/null +++ b/lambdas/transform/requirements-build.txt @@ -0,0 +1 @@ +aws-lambda-powertools>=3.0.0,<4.0.0 diff --git a/lambdas/transform/tests/test_transform.py b/lambdas/transform/tests/test_transform.py index fb51590..be47e1c 100644 --- a/lambdas/transform/tests/test_transform.py +++ b/lambdas/transform/tests/test_transform.py @@ -17,5 +17,5 @@ def test_transform_returns_partial_failures_when_bad_json(monkeypatch): {"messageId": "m2", "body": "not-json"}, ] } - resp = transform.handler(event, context=type("C", (), {"aws_request_id": "r1"})()) + resp = transform.handler(event, context=type("C", (), {"aws_request_id": "r1", "function_name": "serverless-elt-transform"})()) assert {"itemIdentifier": "m2"} in resp["batchItemFailures"] diff --git a/requirements-dev.txt b/requirements-dev.txt index 9644c3a..6497901 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,3 +1,4 @@ boto3>=1.34.0 pytest>=8.0.0 pyyaml>=6.0.0 +aws-lambda-powertools>=3.0.0,<4.0.0