Skip to content

Commit 61a31a9

Browse files
committed
estuary-cdk: implement webhook server logic
Implements a `webhook` submodule that includes - A `start_webhook_server` function - A `WebhookDocument` model that automatically enriches document metadata - A `match` submodule with message matching logic Signed-off-by: nicolaslazo <45973144+nicolaslazo@users.noreply.github.com>
1 parent 44f3261 commit 61a31a9

File tree

12 files changed

+1393
-30
lines changed

12 files changed

+1393
-30
lines changed

estuary-cdk/common.Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,6 @@ ENV DOCS_URL=${DOCS_URL}
5151
ENV ENCRYPTION_URL=${ENCRYPTION_URL}
5252
ENV CONNECTOR_NAME=${CONNECTOR_NAME}
5353

54+
EXPOSE 8080
55+
5456
CMD ["/bin/sh", "-c", "/opt/venv/bin/python -m $(echo \"$CONNECTOR_NAME\" | tr '-' '_')"]

estuary-cdk/estuary_cdk/capture/base_capture_connector.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
import json
44
import os
55
import sys
6+
from collections.abc import Awaitable
67
from datetime import UTC, datetime, timedelta
7-
from typing import Any, Awaitable, BinaryIO, Callable, Generic
8+
from typing import Any, BinaryIO, Callable, Generic
89

910
from pydantic import BaseModel
1011

@@ -36,6 +37,7 @@
3637
PERIODIC_RESTART_INTERVAL = 24 * 60 * 60 # 24 hours
3738
GRACEFUL_SHUTDOWN_TIMEOUT = 30 * 60 # 30 minutes
3839
TASK_CANCELLATION_TIMEOUT = 5 * 60 # 5 minutes
40+
WEBHOOK_SHUTDOWN_TIMEOUT = 60 # 1 minute
3941

4042

4143
class TerminateTaskGroup(Exception):
@@ -52,6 +54,7 @@ class BaseCaptureConnector(
5254
BaseConnector[Request[EndpointConfig, ResourceConfig, _ConnectorState]],
5355
HTTPMixin,
5456
Generic[EndpointConfig, ResourceConfig, _ConnectorState],
57+
metaclass=abc.ABCMeta,
5558
):
5659
output: BinaryIO = sys.stdout.buffer
5760

@@ -138,7 +141,7 @@ async def handle(
138141
opened, capture = await self.open(log, open)
139142
await self._emit(Response(opened=opened))
140143

141-
stopping = Task.Stopping(asyncio.Event())
144+
stopping = Task.Stopping()
142145

143146
async def periodic_stop() -> None:
144147
"""Trigger graceful shutdown after 24 hours of continuous operation."""
@@ -152,7 +155,7 @@ async def enforce_shutdown(tg: asyncio.TaskGroup) -> None:
152155
This handles shutdown triggered by any source: periodic_stop,
153156
stream exceptions, or any other code that sets stopping.event.
154157
"""
155-
await stopping.event.wait()
158+
_ = await stopping.event.wait()
156159

157160
log.debug("Waiting for graceful exit.")
158161

@@ -236,6 +239,20 @@ async def enforce_shutdown(tg: asyncio.TaskGroup) -> None:
236239
except* TerminateTaskGroup:
237240
pass # Expected when enforce_shutdown terminates the task group
238241

242+
# TaskGroup exited = all non-webhook tasks done.
243+
# Signal webhook server to drain and shut down.
244+
if stopping.webhook_task:
245+
stopping.webhook_event.set()
246+
try:
247+
await asyncio.wait_for(
248+
stopping.webhook_task, timeout=WEBHOOK_SHUTDOWN_TIMEOUT
249+
)
250+
except asyncio.TimeoutError:
251+
log.error(
252+
f"Webhook server did not shut down in {WEBHOOK_SHUTDOWN_TIMEOUT // 60} minutes. Forcing exit."
253+
)
254+
os._exit(1)
255+
239256
# Cancel the background tasks if capture exited gracefully
240257
# before the shutdown timeout.
241258
_ = periodic_stop_task.cancel()

estuary-cdk/estuary_cdk/capture/common.py

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,42 @@
11
import abc
22
import asyncio
33
import functools
4-
from enum import Enum, StrEnum
4+
import inspect
5+
from collections.abc import AsyncGenerator, Awaitable
56
from dataclasses import dataclass, field
67
from datetime import UTC, datetime, timedelta
7-
import inspect
8+
from enum import Enum, StrEnum
9+
from itertools import combinations
810
from logging import Logger
911
from typing import (
1012
Any,
11-
AsyncGenerator,
12-
Awaitable,
1313
Callable,
14-
cast,
1514
ClassVar,
1615
Generic,
17-
Iterable,
1816
Literal,
1917
TypeVar,
18+
cast,
2019
)
2120

2221
from pydantic import AwareDatetime, BaseModel, Field, NonNegativeInt
2322

2423
from ..cron import next_fire
25-
from ..utils import json_merge_patch
2624
from ..flow import (
2725
AccessToken,
26+
AuthorizationCodeFlowOAuth2Credentials,
2827
BaseOAuth2Credentials,
28+
BasicAuth,
2929
CaptureBinding,
3030
ClientCredentialsOAuth2Credentials,
31-
OAuth2TokenFlowSpec,
32-
AuthorizationCodeFlowOAuth2Credentials,
3331
LongLivedClientCredentialsOAuth2Credentials,
32+
OAuth2Spec,
33+
OAuth2TokenFlowSpec,
3434
ResourceOwnerPasswordOAuth2Credentials,
3535
RotatingOAuth2Credentials,
36-
OAuth2Spec,
3736
ValidationError,
38-
BasicAuth,
3937
)
4038
from ..pydantic_polyfill import GenericModel
39+
from ..utils import json_merge_patch
4140
from . import Task, request, response
4241

4342
LogCursor = tuple[str | int] | AwareDatetime | NonNegativeInt
@@ -113,10 +112,10 @@ def pop_cursor_marker(cursor: dict) -> dict:
113112
and "no pages remain" in a response context.
114113
"""
115114

116-
117115
class Triggers(Enum):
118116
BACKFILL = "BACKFILL"
119117

118+
120119
@dataclass
121120
class SourcedSchema:
122121
"""
@@ -279,7 +278,8 @@ class Snapshot(BaseModel, extra="forbid"):
279278
)
280279

281280
is_connector_initiated: bool = Field(
282-
default=False, description="Indicates if this backfill was initiated by the connector.",
281+
default=False,
282+
description="Indicates if this backfill was initiated by the connector.",
283283
)
284284

285285

@@ -530,11 +530,30 @@ def resolve_bindings(
530530
resources: list[Resource[Any, _BaseResourceConfig, Any]],
531531
resource_term="Resource",
532532
) -> list[tuple[_ResolvableBinding, Resource[Any, _BaseResourceConfig, Any]]]:
533+
533534
resolved: list[
534535
tuple[_ResolvableBinding, Resource[Any, _BaseResourceConfig, Any]]
535536
] = []
536537
errors: list[str] = []
537538

539+
# Check for routing conflicts between webhook match rules.
540+
from .webhook.resources import WebhookResourceConfig
541+
542+
webhook_match_rules = [
543+
resource.initial_config.match_rule
544+
for resource in resources
545+
if isinstance(resource.initial_config, WebhookResourceConfig)
546+
]
547+
errors.extend(
548+
filter(
549+
bool,
550+
(
551+
rule.list_compatibility_errors(other)
552+
for rule, other in combinations(webhook_match_rules, 2)
553+
),
554+
)
555+
)
556+
538557
for binding in bindings:
539558
path = binding.resourceConfig.path()
540559

@@ -698,11 +717,18 @@ async def _run(task: Task):
698717
if inspect.iscoroutine(result):
699718
await result
700719

701-
702720
if soonest_future_scheduled_initialization:
703721
# Gracefully exit to ensure relatively close adherence to any bindings'
704722
# re-initialization schedules.
705-
asyncio.create_task(scheduled_stop(task, soonest_future_scheduled_initialization))
723+
asyncio.create_task(
724+
scheduled_stop(task, soonest_future_scheduled_initialization)
725+
)
726+
727+
if task.stopping.webhook_task is not None:
728+
# The webhook server runs outside the TaskGroup so it can outlive
729+
# non-webhook tasks during two-phase shutdown. Block here so the
730+
# TaskGroup stays alive until graceful shutdown begins.
731+
await task.stopping.event.wait()
706732

707733
return (response.Opened(explicitAcknowledgements=False), _run)
708734

estuary-cdk/estuary_cdk/capture/task.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
1-
from dataclasses import dataclass
2-
import decimal
3-
from estuary_cdk.capture.connector_status import ConnectorStatus
4-
from pydantic import Field
5-
from typing import Generic, Awaitable, Any, BinaryIO, Callable
6-
import orjson
7-
import base64
8-
from logging import Logger
91
import asyncio
2+
import base64
3+
import decimal
104
import tempfile
115
import traceback
6+
from collections.abc import Awaitable
7+
from dataclasses import dataclass, field
8+
from logging import Logger
9+
from typing import Any, BinaryIO, Callable, Generic
10+
11+
import orjson
1212
import xxhash
13+
from pydantic import Field
14+
15+
from estuary_cdk.capture.connector_status import ConnectorStatus
1316

14-
from . import request, response
15-
from ._emit import emit_from_buffer
1617
from ..flow import (
1718
ConnectorSpec,
1819
ConnectorState,
@@ -21,6 +22,8 @@
2122
ResourceConfig,
2223
)
2324
from ..pydantic_polyfill import GenericModel
25+
from . import request, response
26+
from ._emit import emit_from_buffer
2427

2528

2629
class Request(GenericModel, Generic[EndpointConfig, ResourceConfig, ConnectorState]):
@@ -85,9 +88,15 @@ class Stopping:
8588
The Task's coroutine should monitor this event and exit when it's set AND
8689
it has no more immediate work to do (for example, no further documents are
8790
currently ready to be captured).
91+
92+
`webhook_event` is a separate event for the webhook server. It is set
93+
after all non-webhook tasks have completed, signaling the webhook server
94+
to reject new requests with 503 and clean up.
8895
"""
8996

90-
event: asyncio.Event
97+
event: asyncio.Event = field(default_factory=asyncio.Event)
98+
webhook_event: asyncio.Event = field(default_factory=asyncio.Event)
99+
webhook_task: asyncio.Task[None] | None = None
91100
first_error: Exception | None = None
92101
first_error_task: str | None = None
93102

estuary-cdk/estuary_cdk/capture/webhook/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)