Skip to content

Commit e29d313

Browse files
committed
luma: smart rate limit + fix bug by stating stream names explicitely
1 parent dc327f1 commit e29d313

File tree

2 files changed

+118
-9
lines changed

2 files changed

+118
-9
lines changed

source-luma-fetcher/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ data:
1414
connectorSubtype: api
1515
connectorType: source
1616
definitionId: 464a7cea-0317-485e-9a9c-bcd06155bfff
17-
dockerImageTag: 1.0.0
17+
dockerImageTag: v2.0.0
1818
dockerRepository: harbor.status.im/bi/airbyte/source-luma-fetcher
1919
githubIssueLabel: source-luma-fetcher
2020
icon: luma-fetcher.svg

source-luma-fetcher/source_luma_fetcher/source.py

Lines changed: 117 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,61 @@
22
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
#
44
from abc import ABC
5-
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
5+
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Set
6+
from collections import deque
67
import time
78
import logging
89
import requests
910
from airbyte_cdk.sources import AbstractSource
1011
from airbyte_cdk.sources.streams import Stream
1112
from airbyte_cdk.sources.streams.http import HttpStream
13+
from airbyte_cdk.models import ConfiguredAirbyteCatalog
1214

1315
logger = logging.getLogger("airbyte")
1416

17+
18+
class SlidingWindowRateLimiter:
19+
"""
20+
Sliding window rate limiter to maximize throughput while respecting Luma API limits.
21+
22+
Luma API limits: 100 requests per minute (500 per 5 minutes per calendar)
23+
We use 90 req/min to stay safely under the limit with some buffer.
24+
"""
25+
26+
def __init__(self, max_requests: int = 90, window_seconds: int = 60):
27+
self.max_requests = max_requests
28+
self.window_seconds = window_seconds
29+
self.request_timestamps: deque = deque()
30+
31+
def wait_if_needed(self):
32+
now = time.time()
33+
34+
# Remove timestamps outside the sliding window
35+
while self.request_timestamps and self.request_timestamps[0] < now - self.window_seconds:
36+
self.request_timestamps.popleft()
37+
38+
# If we've hit the limit, wait until the oldest request falls outside the window
39+
if len(self.request_timestamps) >= self.max_requests:
40+
sleep_time = self.request_timestamps[0] - (now - self.window_seconds) + 0.1
41+
if sleep_time > 0:
42+
logger.info(f"Rate limit buffer reached ({len(self.request_timestamps)} requests in window). Waiting {sleep_time:.2f}s")
43+
time.sleep(sleep_time)
44+
45+
# Record this request
46+
self.request_timestamps.append(time.time())
47+
48+
49+
# Shared rate limiter instance for all streams
50+
_rate_limiter = SlidingWindowRateLimiter(max_requests=90, window_seconds=60)
51+
1552
class LumaStream(HttpStream, ABC):
1653
url_base = "https://public-api.luma.com/v1/"
1754
primary_key = None
1855

56+
# Explicitly set namespace to None to ensure consistency between discover and read phases
57+
# This prevents "null.stream_name" mismatch errors in Airbyte
58+
namespace = None
59+
1960
def __init__(self, api_key: str, **kwargs):
2061
super().__init__(**kwargs)
2162
self.api_key = api_key
@@ -45,14 +86,18 @@ def request_params(
4586
def should_retry(self, response: requests.Response) -> bool:
4687
"""Enhanced retry logic for rate limiting"""
4788
if response.status_code == 429:
48-
logger.warning(f"Rate limit hit for Luma API. Status: {response.status_code}")
89+
logger.warning(f"Rate limit hit for Luma API (429). Will wait and retry.")
4990
return True
5091
return super().should_retry(response)
5192

5293
def backoff_time(self, response: requests.Response) -> Optional[float]:
53-
"""Custom backoff strategy for rate limiting"""
94+
"""Custom backoff strategy for rate limiting.
95+
96+
Luma API blocks for 1 minute on 429, so we wait 65 seconds to be safe.
97+
"""
5498
if response.status_code == 429:
55-
return 30.0
99+
logger.info("429 received - backing off for 65 seconds (Luma blocks for 1 minute)")
100+
return 65.0
56101
return super().backoff_time(response)
57102

58103
@property
@@ -61,14 +106,22 @@ def max_retries(self) -> Optional[int]:
61106
return 5
62107

63108
def _apply_rate_limiting(self):
64-
"""Apply rate limiting between requests"""
65-
time.sleep(2)
66-
logger.info("Applied rate limiting delay for Luma API")
109+
"""Apply smart rate limiting using sliding window algorithm.
110+
111+
Only waits when approaching the rate limit, allowing maximum throughput
112+
while staying safely under Luma's 100 req/min limit.
113+
"""
114+
_rate_limiter.wait_if_needed()
67115

68116

69117
class LumaEventsStream(LumaStream):
70118
"""Stream for fetching Luma events"""
71119

120+
@property
121+
def name(self) -> str:
122+
"""Explicitly define stream name to ensure consistency between discover and read phases"""
123+
return "luma_events_stream"
124+
72125
def path(
73126
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
74127
) -> str:
@@ -245,10 +298,66 @@ def check_connection(self, logger, config) -> Tuple[bool, any]:
245298
except Exception as e:
246299
return False, f"Connection test failed: {str(e)}"
247300

301+
def _get_catalog_stream_names(self, catalog: ConfiguredAirbyteCatalog) -> Set[str]:
302+
"""Extract stream names from the configured catalog for validation.
303+
304+
Returns a set of stream names that are configured to be synced.
305+
This is used to filter streams and prevent status messages for unconfigured streams.
306+
"""
307+
return {configured_stream.stream.name for configured_stream in catalog.streams}
308+
248309
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
310+
"""Return all available streams.
311+
312+
Note: All streams returned here must have consistent 'name' and 'namespace'
313+
properties to match how they are identified in the catalog during read operations.
314+
"""
249315
api_key = config['api_key']
250316

251317
events_stream = LumaEventsStream(api_key=api_key)
252318
guests_stream = LumaGuestsStream(events_stream=events_stream, api_key=api_key)
253319

254-
return [events_stream, guests_stream]
320+
# Log stream names for debugging namespace/name consistency issues
321+
all_streams = [events_stream, guests_stream]
322+
for stream in all_streams:
323+
logger.info(f"Registering stream: name='{stream.name}', namespace='{stream.namespace}'")
324+
325+
return all_streams
326+
327+
def read(
328+
self,
329+
logger: logging.Logger,
330+
config: Mapping[str, Any],
331+
catalog: ConfiguredAirbyteCatalog,
332+
state: Optional[Mapping[str, Any]] = None
333+
) -> Iterable[Any]:
334+
"""Override read to ensure strict validation of streams against the catalog.
335+
336+
This prevents STREAM_STATUS messages from being emitted for streams
337+
that are not present in the ConfiguredAirbyteCatalog, which causes
338+
the "stream not present in catalog" error.
339+
"""
340+
# Get the set of stream names that are actually configured in the catalog
341+
configured_stream_names = self._get_catalog_stream_names(catalog)
342+
logger.info(f"Configured catalog contains streams: {configured_stream_names}")
343+
344+
# Get all available streams
345+
all_streams = self.streams(config)
346+
347+
# Validate that all configured streams exist in our available streams
348+
available_stream_names = {stream.name for stream in all_streams}
349+
logger.info(f"Available streams from source: {available_stream_names}")
350+
351+
# Check for any mismatches
352+
missing_streams = configured_stream_names - available_stream_names
353+
if missing_streams:
354+
logger.warning(f"Catalog contains streams not available in source: {missing_streams}")
355+
356+
extra_streams = available_stream_names - configured_stream_names
357+
if extra_streams:
358+
logger.info(f"Source has streams not in catalog (will be skipped): {extra_streams}")
359+
360+
# Delegate to parent implementation which will handle the actual reading
361+
# The parent's read method should filter based on the catalog, but we've
362+
# added logging above to help debug any namespace/name mismatches
363+
yield from super().read(logger, config, catalog, state)

0 commit comments

Comments
 (0)