Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ build-transform:
cp -R lambdas/transform/__init__.py $(BUILD_DIR)/transform/lambdas/transform/__init__.py
cp -R lambdas/shared $(BUILD_DIR)/transform/lambdas/shared
find $(BUILD_DIR)/transform -type d -name '__pycache__' -prune -exec rm -rf {} +
$(PY) -m pip install -r lambdas/transform/requirements-build.txt --target $(BUILD_DIR)/transform --upgrade
cd $(BUILD_DIR)/transform && zip -qr ../transform.zip .

build-ops-replay:
Expand Down
43 changes: 25 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
> 轻量起步,企业化能力随开随用:**S3 → Lambda → SQS → Lambda → S3(Parquet)**,可选编排、目录、质量门禁与可观测性。

This v2.0 elevates the minimal v1 into a **production-ready, enterprise-style** framework:
- **Orchestration:** EventBridge → Step Functions → Glue Job (+ optional **Great Expectations** gate)
- **Catalog / Query:** Glue Data Catalog + Crawler + Athena tables for **silver/** Parquet
- **Replay / Recovery:** `replay` & `dlq-redrive` scripts for backfill and poison-message recovery
- **Idempotency:** DynamoDB TTL for object-level dedup, optional GSI for audit
- **CI/CD:** GitHub Actions pipelines (Lambda build+deploy, Terraform plan+apply) via **OIDC** (no long-lived keys)
- **Orchestration (optional):** EventBridge → Step Functions → Glue Job (+ optional **Great Expectations** gate)
- **Catalog / Query (optional):** Glue Data Catalog + Crawler + Athena tables for **silver/** Parquet
- **Replay / Recovery:** `replay` & `redrive` scripts for backfill and poison-message recovery
- **Idempotency:** Object-level dedup via DynamoDB TTL (Powertools Idempotency)
- **CI/CD:** GitHub Actions CI (pytest + terraform fmt) + manual Terraform plan/apply (supports keys/OIDC)

---

Expand All @@ -25,10 +25,10 @@ This v2.0 elevates the minimal v1 into a **production-ready, enterprise-style**

S3 (bronze/*.jsonl)
└─(ObjectCreated)
Lambda ingest (Powertools/DynamoDB idempotency)
└─ SQS (events) ──(event source mapping)──> Lambda transform (Parquet)
└─ DLQ (optional)
└─ S3 (silver/*.parquet) ──> Glue Catalog & Athena (optional)
Lambda ingest (Powertools logging/metrics + DynamoDB idempotency)
└─ SQS (events) ──(event source mapping)──> Lambda transform (Parquet)
└─ DLQ (optional)
└─ S3 (silver/*.parquet) ──> (optional) Glue Catalog & Athena

```

Expand All @@ -42,10 +42,10 @@ S3 (bronze/*.jsonl)
|---|---|---|
| Pipeline | S3→Lambda→SQS→Lambda→S3 | Same + Step Functions orchestration |
| Storage | JSONL → Parquet | Parquet + Glue tables for Athena |
| Idempotency | DDB basic | DDB TTL + optional GSI for audit |
| Replay / DLQ | Manual | `scripts/replay*.sh`, `scripts/dlq-redrive.sh` |
| Idempotency | DDB object-level lock | Powertools Idempotency (DynamoDB TTL) + replay/backfill |
| Replay / DLQ | Manual | `scripts/replay.sh`, `scripts/redrive.sh` |
| Observability | Logs only | (optional) CloudWatch Dashboards + Alarms |
| CI/CD | Manual apply | GitHub Actions (OIDC), terraform plan/apply |
| CI/CD | Manual apply | GitHub Actions CI + manual terraform plan/apply (keys/OIDC) |
| DQ | – | Glue Job + optional Great Expectations gate |

---
Expand Down Expand Up @@ -73,7 +73,7 @@ repo-root/
│ ├─ gen_fake_events.py
│ ├─ replay_from_s3.py # S3→SQS 直推 (需要 sqs:SendMessage)
│ ├─ replay.sh # S3 copy 到新的 bronze/ 前缀(推荐,无需 SQS 发信权限)
│ ├─ dlq-redrive.sh # SQS 原生 redrive
│ ├─ redrive.sh # SQS 原生 redrive
│ └─ scaffold.sh # 生成 dataset 骨架
├─ configs/ # 每个 dataset 的元配置
├─ dq/ # 轻量 DQ 规则(或映射到 GE)
Expand Down Expand Up @@ -184,18 +184,26 @@ make verify-idempotency

# End to End Validation
make verify-e2e
### 🖼️ README 图片显示不出来?

如果你新插入的图片比如 `![](demo/1.png)` 显示不出来,通常是下面原因之一:

- 文件路径不对:GitHub 对路径大小写敏感;而且 `demo/1.png` 必须真的存在(本 repo 里默认是 `demo/0-1.png`、`demo/dataset-scaffold.png` 这类文件名)。
- 图片还没被 git 跟踪:本地能看到但没 `git add` / `git commit` / `git push`,GitHub 上当然不会有。
- 文件名有空格/括号/中文:用 `![](<demo/你的文件名 (1).png>)` 这种写法更稳。

建议先跑一键版(会依次执行多步 CLI 验证 + 造数 + 等待 Silver):

- `make verify-e2e`

![](demo/1.png)
![](demo/2.png)
![](demo/3.png)
![](<demo/0-1.png>)
![](<demo/0-2.png>)


🧪 Idempotency Model
• Scope: S3 object-level (bucket/key#etag)
• Store: DynamoDB with TTL(可选 GSI:审计与快速查询)
• Store: DynamoDB with TTL (Powertools Idempotency)
• SQS consumer: partial batch failure + DLQ redrive 脚本

Expand Down Expand Up @@ -330,4 +338,3 @@ TF_AUTO_APPROVE=1 make tf-destroy
License

MIT

8 changes: 4 additions & 4 deletions infra/terraform/envs/dev/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ module "ingest_lambda" {
timeout = 30
memory_size = 256
environment = {
QUEUE_URL = local.queue_url
IDEMPOTENCY_TABLE = module.idempotency_table.name
LOCK_SECONDS = "900"
LOG_LEVEL = "INFO"
QUEUE_URL = local.queue_url
IDEMPOTENCY_TABLE = module.idempotency_table.name
IDEMPOTENCY_TTL_SECONDS = tostring(30 * 24 * 60 * 60)
LOG_LEVEL = "INFO"
}
tags = local.tags
}
Expand Down
2 changes: 1 addition & 1 deletion infra/terraform/modules/iam/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ data "aws_iam_policy_document" "ingest" {
}

statement {
actions = ["dynamodb:UpdateItem"]
actions = ["dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem"]
resources = [var.idempotency_table_arn]
}
}
Expand Down
184 changes: 98 additions & 86 deletions lambdas/ingest/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,31 @@
- S3 ObjectCreated events for Bronze JSON/JSONL objects.

What it does:
- Uses DynamoDB as a lightweight lock/idempotency table keyed by `s3://bucket/key#etag`.
- Uses DynamoDB as an idempotency store keyed by the S3 object identity (`bucket/key + etag`).
- Reads the object, parses JSONL/JSON, normalizes records, and publishes to SQS in batches.
- Emits structured logs + embedded metrics via AWS Lambda Powertools.

Environment variables:
- `QUEUE_URL` (required): Destination SQS queue URL.
- `IDEMPOTENCY_TABLE` (required): DynamoDB table name for object locks.
- `LOCK_SECONDS` (optional): Lock TTL to avoid duplicate ingestion during retries.
- `IDEMPOTENCY_TTL_SECONDS` (optional): TTL for idempotency records (default 30 days).
- `LOCK_SECONDS` (optional, legacy): Backward-compatible alias for `IDEMPOTENCY_TTL_SECONDS`.
"""

from typing import Any, Dict, List, Optional

import boto3

from aws_lambda_powertools import Logger, Metrics
from aws_lambda_powertools.metrics import MetricUnit
from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function

from lambdas.shared.schemas import normalize_record
from lambdas.shared.utils import env, iter_json_records, json_dumps, log, parse_s3_event_records, utc_epoch
from lambdas.shared.utils import env, iter_json_records, json_dumps, parse_s3_event_records


logger = Logger(service="serverless-elt.ingest")
metrics = Metrics(namespace="ServerlessELT", service="ingest")


def _clients():
Expand All @@ -30,59 +40,6 @@ def _clients():
)


def _acquire_object_lock(ddb, table_name: str, pk: str, lock_seconds: int) -> bool:
now = utc_epoch()
exp = now + lock_seconds
try:
ddb.update_item(
TableName=table_name,
Key={"pk": {"S": pk}},
UpdateExpression="SET #s=:inflight, expires_at=:exp, updated_at=:now ADD attempts :one",
ConditionExpression=(
"attribute_not_exists(pk) "
"OR (#s <> :processed AND (attribute_not_exists(expires_at) OR expires_at < :now))"
),
ExpressionAttributeNames={"#s": "status"},
ExpressionAttributeValues={
":inflight": {"S": "INFLIGHT"},
":processed": {"S": "PROCESSED"},
":exp": {"N": str(exp)},
":now": {"N": str(now)},
":one": {"N": "1"},
},
)
return True
except ddb.exceptions.ConditionalCheckFailedException:
return False


def _mark_processed(ddb, table_name: str, pk: str) -> None:
now = utc_epoch()
ddb.update_item(
TableName=table_name,
Key={"pk": {"S": pk}},
UpdateExpression="SET #s=:processed, updated_at=:now REMOVE expires_at",
ExpressionAttributeNames={"#s": "status"},
ExpressionAttributeValues={":processed": {"S": "PROCESSED"}, ":now": {"N": str(now)}},
)


def _mark_error(ddb, table_name: str, pk: str, reason: str) -> None:
now = utc_epoch()
ddb.update_item(
TableName=table_name,
Key={"pk": {"S": pk}},
UpdateExpression="SET #s=:error, error_reason=:r, expires_at=:past, updated_at=:now",
ExpressionAttributeNames={"#s": "status"},
ExpressionAttributeValues={
":error": {"S": "ERROR"},
":r": {"S": reason[:500]},
":past": {"N": str(now - 1)},
":now": {"N": str(now)},
},
)


def _object_id(bucket: str, key: str, etag: str) -> str:
return f"s3://{bucket}/{key}#{etag}"

Expand Down Expand Up @@ -114,56 +71,111 @@ def _enqueue_records(sqs, queue_url: str, records: List[Dict[str, Any]]) -> int:
return sent


def _log(event: str, **fields: Any) -> None:
logger.info(event, extra=fields)


def _cached_response_hook(response: Any, data_record: Any) -> Any:
if isinstance(response, dict):
return {**response, "cached": True}
return {"cached": True, "result": response}


def _get_idempotent_processor(table_name: str, ttl_seconds: int, ddb_client: Any, lambda_context: Any):
config = IdempotencyConfig(
event_key_jmespath="pk",
expires_after_seconds=ttl_seconds,
response_hook=_cached_response_hook,
lambda_context=lambda_context,
)
persistence = DynamoDBPersistenceLayer(
table_name=table_name,
key_attr="pk",
expiry_attr="expires_at",
in_progress_expiry_attr="in_progress_expires_at",
status_attr="status",
data_attr="data",
validation_key_attr="validation",
boto3_client=ddb_client,
)

@idempotent_function(data_keyword_argument="item", persistence_store=persistence, config=config)
def _process_object(*, item: Dict[str, Any], s3: Any, sqs: Any, queue_url: str) -> Dict[str, Any]:
bucket = item["bucket"]
key = item["key"]
etag = item.get("etag", "")
object_id = item["pk"]

text = _read_s3_text(s3, bucket, key)
records: List[Dict[str, Any]] = []
dropped = 0
for line_no, obj in enumerate(iter_json_records(text), start=1):
try:
normalized = normalize_record(obj)
except Exception as e:
dropped += 1
_log("ingest_drop_bad_record", object_id=object_id, line_no=line_no, error=str(e))
continue
normalized["_source"] = {"bucket": bucket, "key": key, "etag": etag, "line_no": line_no}
records.append(normalized)

enq = _enqueue_records(sqs, queue_url, records)
_log("ingest_object_done", object_id=object_id, records=len(records), enqueued=enq, dropped=dropped)
return {"records": len(records), "enqueued": enq, "dropped": dropped}

return _process_object


@metrics.log_metrics(capture_cold_start_metric=True)
def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
queue_url = env("QUEUE_URL")
table_name = env("IDEMPOTENCY_TABLE")
lock_seconds = int(env("LOCK_SECONDS", "900"))
ttl_seconds = int(env("IDEMPOTENCY_TTL_SECONDS", env("LOCK_SECONDS", str(30 * 24 * 60 * 60))))

s3, sqs, ddb = _clients()
objects = parse_s3_event_records(event)
total_records = 0
total_enqueued = 0
skipped = 0
dropped = 0

log("ingest_start", objects=len(objects))
for bucket, key, etag in objects:
# Idempotency scope is the *S3 object version* (bucket/key + etag), not per record.
pk = _object_id(bucket, key, etag)
if not _acquire_object_lock(ddb, table_name, pk, lock_seconds=lock_seconds):
skipped += 1
log("ingest_skip_idempotent", pk=pk)
continue
metrics.add_metric(name="ObjectsReceived", unit=MetricUnit.Count, value=len(objects))
_log("ingest_start", objects=len(objects))

lambda_context = context if hasattr(context, "get_remaining_time_in_millis") else None
process_object = _get_idempotent_processor(table_name=table_name, ttl_seconds=ttl_seconds, ddb_client=ddb, lambda_context=lambda_context)
for bucket, key, etag in objects:
object_id = _object_id(bucket, key, etag)
item = {"pk": object_id, "bucket": bucket, "key": key, "etag": etag}
try:
# Read the source object, parse JSONL lines, and normalize into a canonical schema.
text = _read_s3_text(s3, bucket, key)
records: List[Dict[str, Any]] = []
for line_no, obj in enumerate(iter_json_records(text), start=1):
try:
normalized = normalize_record(obj)
except Exception as e:
log("ingest_drop_bad_record", pk=pk, line_no=line_no, error=str(e))
continue
normalized["_source"] = {"bucket": bucket, "key": key, "etag": etag, "line_no": line_no}
records.append(normalized)

total_records += len(records)
# Batch send to SQS (max 10 entries per batch).
enq = _enqueue_records(sqs, queue_url, records)
total_enqueued += enq
_mark_processed(ddb, table_name, pk)
log("ingest_object_done", pk=pk, records=len(records), enqueued=enq)
result = process_object(item=item, s3=s3, sqs=sqs, queue_url=queue_url)
except Exception as e:
# Mark error and re-raise so the invocation is considered failed (visible in logs/metrics).
_mark_error(ddb, table_name, pk, reason=str(e))
log("ingest_object_error", pk=pk, error=str(e))
_log("ingest_object_error", object_id=object_id, error=str(e))
raise

if isinstance(result, dict) and result.get("cached") is True:
skipped += 1
metrics.add_metric(name="ObjectsSkippedIdempotent", unit=MetricUnit.Count, value=1)
_log("ingest_skip_idempotent", object_id=object_id)
continue

total_records += int(result.get("records", 0))
total_enqueued += int(result.get("enqueued", 0))
dropped += int(result.get("dropped", 0))

metrics.add_metric(name="RecordsEnqueued", unit=MetricUnit.Count, value=total_enqueued)
metrics.add_metric(name="RecordsParsed", unit=MetricUnit.Count, value=total_records)
if dropped:
metrics.add_metric(name="RecordsDropped", unit=MetricUnit.Count, value=dropped)
if skipped:
metrics.add_metric(name="ObjectsSkippedIdempotent", unit=MetricUnit.Count, value=skipped)

return {
"objects": len(objects),
"records": total_records,
"enqueued": total_enqueued,
"skipped": skipped,
"dropped": dropped,
"request_id": getattr(context, "aws_request_id", None),
}

Expand Down
2 changes: 1 addition & 1 deletion lambdas/ingest/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
boto3>=1.34.0

aws-lambda-powertools>=3.0.0,<4.0.0
Loading