Skip to content

Commit 65bdea5

Browse files
committed
feat(redis)!: support ocr_ready event type
1 parent 72af8dd commit 65bdea5

2 files changed

Lines changed: 220 additions & 137 deletions

File tree

openfoodfacts/redis.py

Lines changed: 162 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
import datetime
2-
from abc import ABC, abstractmethod
3-
from typing import Any, Iterator, Optional, Union, cast
2+
import logging
3+
from typing import Any, Iterator, cast
44

55
from pydantic import BaseModel, Json
66
from redis import Redis
77

8-
from openfoodfacts.utils import get_logger
9-
10-
logger = get_logger(__name__)
8+
logger = logging.getLogger(__name__)
119

1210

1311
def get_redis_client(**kwargs) -> Redis:
@@ -17,14 +15,15 @@ def get_redis_client(**kwargs) -> Redis:
1715
)
1816

1917

20-
class RedisUpdate(BaseModel):
18+
class ProductUpdateEvent(BaseModel):
2119
"""A class representing a product update from a Redis Stream."""
2220

23-
# The Redis ID of the update
21+
# The Redis ID of the event
2422
id: str
2523
# The name of the Redis stream where the update was published
24+
# This will always be "product_updates"
2625
stream: str
27-
# The timestamp of the update
26+
# The timestamp of the event
2827
timestamp: datetime.datetime
2928
# The code of the product
3029
code: str
@@ -40,7 +39,7 @@ class RedisUpdate(BaseModel):
4039
product_type: str
4140
# A JSON object representing the differences between the old and new
4241
# product data
43-
diffs: Optional[Json[Any]] = None
42+
diffs: Json[Any] | None = None
4443

4544
def is_image_upload(self) -> bool:
4645
"""Returns True if the update is an image upload."""
@@ -94,106 +93,132 @@ def is_image_deletion(self) -> bool:
9493
)
9594

9695

96+
class OCRReadyEvent(BaseModel):
97+
"""A class representing an OCR ready event from a Redis Stream.
98+
99+
This event is published when the OCR processing (done by Google Cloud
100+
Vision) of an image is complete.
101+
102+
The OCR result (JSON file) is available at the URL provided in the
103+
`json_url` field.
104+
"""
105+
106+
# The Redis ID of the event
107+
id: str
108+
# The name of the Redis stream where the event was published
109+
# This will always be "ocr_ready"
110+
stream: str
111+
# The timestamp of the event
112+
timestamp: datetime.datetime
113+
# The code of the product
114+
code: str
115+
# the type of the product (food, product, petfood, beauty)
116+
product_type: str
117+
# The ID of the image (ex: "1")
118+
image_id: str
119+
# The URL of the OCR result (JSON file)
120+
json_url: str
121+
122+
97123
def get_processed_since(
98124
redis_client: Redis,
99-
stream_name: str,
100-
min_id: Union[str, datetime.datetime],
125+
min_id: str | datetime.datetime,
126+
product_updates_stream_name: str = "product_updates",
127+
ocr_ready_stream_name: str = "ocr_ready",
101128
batch_size: int = 100,
102-
) -> Iterator[RedisUpdate]:
103-
"""Fetches all the updates that have been published since the given
104-
timestamp.
129+
) -> Iterator[ProductUpdateEvent | OCRReadyEvent]:
130+
"""Fetches all events (product update or ocr ready events) that have been
131+
published since the given timestamp.
105132
106133
:param redis_client: the Redis client
107-
:param stream_name: the name of the Redis stream to read from
108134
:param min_id: the minimum ID to start from, or a datetime object
135+
:param product_updates_stream_name: the name of the Redis stream for
136+
product updates, defaults to "product_updates"
137+
:param ocr_ready_stream_name: the name of the Redis stream for OCR ready
138+
events, defaults to "ocr_ready"
109139
:param batch_size: the size of the batch to fetch, defaults to 100
110-
:yield: a RedisUpdate instance for each update
140+
:yield: a ProductUpdateEvent or OCRReadyEvent instance for each update
111141
"""
112142
if isinstance(min_id, datetime.datetime):
113143
min_id = f"{int(min_id.timestamp() * 1000)}-0"
114144

115-
while True:
116-
logger.debug(
117-
"Fetching batch from Redis, stream %s, min_id %s, count %d",
118-
stream_name,
119-
min_id,
120-
batch_size,
121-
)
122-
batch = redis_client.xrange(stream_name, min=min_id, count=batch_size)
123-
if not batch:
124-
# We reached the end of the stream
125-
break
126-
127-
batch = cast(list[tuple[str, dict]], batch)
128-
# We update the min_id to the last ID of the batch
129-
min_id = f"({batch[-1][0]}"
130-
for timestamp_id, item in batch:
131-
# Get the timestamp from the ID
132-
timestamp = int(timestamp_id.split("-")[0])
133-
yield RedisUpdate(
134-
id=timestamp_id,
135-
timestamp=timestamp, # type: ignore
136-
stream=stream_name,
137-
code=item["code"],
138-
flavor=item["flavor"],
139-
user_id=item["user_id"],
140-
action=item["action"],
141-
comment=item["comment"],
142-
product_type=item["product_type"],
143-
diffs=item.get("diffs"),
145+
for stream_name in (
146+
product_updates_stream_name,
147+
ocr_ready_stream_name,
148+
):
149+
while True:
150+
logger.debug(
151+
"Fetching batch from Redis, stream %s, min_id %s, count %d",
152+
product_updates_stream_name,
153+
min_id,
154+
batch_size,
144155
)
156+
batch = redis_client.xrange(stream_name, min=min_id, count=batch_size)
157+
if not batch:
158+
# We reached the end of the stream
159+
break
145160

161+
batch = cast(list[tuple[str, dict]], batch)
162+
# We update the min_id to the last ID of the batch
163+
min_id = f"({batch[-1][0]}"
164+
for timestamp_id, item in batch:
165+
# Get the timestamp from the ID
166+
timestamp = int(timestamp_id.split("-")[0])
146167

147-
def get_new_updates(
148-
redis_client: Redis,
149-
stream_name: str,
150-
min_id: Union[str, datetime.datetime, None] = "$",
151-
batch_size: int = 100,
152-
) -> Iterator[RedisUpdate]:
153-
"""Reads new updates from a Redis Stream, starting from the moment this
154-
function is called.
155-
156-
The function will block until new updates are available.
157-
158-
:param redis_client: the Redis client
159-
:param stream_name: the name of the Redis stream to read from
160-
:param min_id: the minimum ID to start from, defaults to "$".
161-
A datetime object can also be passed.
162-
:param batch_size: the size of the batch to fetch, defaults to 100
163-
:yield: a RedisUpdate instance for each update
164-
"""
165-
yield from get_new_updates_multistream(
166-
redis_client=redis_client,
167-
stream_names=[stream_name],
168-
min_id=min_id,
169-
batch_size=batch_size,
170-
)
168+
if stream_name == ocr_ready_stream_name:
169+
yield OCRReadyEvent(
170+
id=timestamp_id,
171+
timestamp=timestamp, # type: ignore
172+
stream=stream_name,
173+
code=item["code"],
174+
product_type=item["product_type"],
175+
image_id=item["image_id"],
176+
json_url=item["json_url"],
177+
)
178+
else:
179+
yield ProductUpdateEvent(
180+
id=timestamp_id,
181+
timestamp=timestamp, # type: ignore
182+
stream=stream_name,
183+
code=item["code"],
184+
flavor=item["flavor"],
185+
user_id=item["user_id"],
186+
action=item["action"],
187+
comment=item["comment"],
188+
product_type=item["product_type"],
189+
diffs=item.get("diffs"),
190+
)
171191

172192

173193
def get_new_updates_multistream(
174194
redis_client: Redis,
175-
stream_names: list[str],
176-
min_id: Union[str, datetime.datetime, None] = "$",
195+
product_updates_stream_name: str = "product_updates",
196+
ocr_ready_stream_name: str = "ocr_ready",
197+
min_id: str | datetime.datetime | None = "$",
177198
batch_size: int = 100,
178-
) -> Iterator[RedisUpdate]:
199+
) -> Iterator[ProductUpdateEvent | OCRReadyEvent]:
179200
"""Reads new updates from Redis Stream, starting from the moment this
180201
function is called.
181202
182203
The function will block until new updates are available.
183204
184205
:param redis_client: the Redis client.
185-
:param stream_names: the names of the Redis streams to read from.
206+
:param product_updates_stream_name: the name of the Redis stream for
207+
product updates, defaults to "product_updates".
208+
:param ocr_ready_stream_name: the name of the Redis stream for OCR ready
209+
events, defaults to "ocr_ready".
186210
:param min_id: the minimum ID to start from, defaults to "$".
187211
:param batch_size: the size of the batch to fetch, defaults to 100.
188-
:yield: a RedisUpdate instance for each update.
212+
:yield: a ProductUpdateEvent or OCRReadyEvent instance for each update.
189213
"""
190214
if min_id is None:
191215
min_id = "$"
192216
elif isinstance(min_id, datetime.datetime):
193217
min_id = f"{int(min_id.timestamp() * 1000)}-0"
194218

219+
stream_names = [product_updates_stream_name, ocr_ready_stream_name]
195220
# We start from the last ID
196-
min_ids: dict[Union[bytes, str, memoryview], Union[int, bytes, str, memoryview]] = {
221+
min_ids: dict[bytes | str | memoryview, int | bytes | str | memoryview] = {
197222
stream_name: min_id for stream_name in stream_names
198223
}
199224
while True:
@@ -212,40 +237,59 @@ def get_new_updates_multistream(
212237
for timestamp_id, item in batch:
213238
# Get the timestamp from the ID
214239
timestamp = int(timestamp_id.split("-")[0])
215-
yield RedisUpdate(
216-
id=timestamp_id,
217-
timestamp=timestamp, # type: ignore
218-
stream=stream_name,
219-
code=item["code"],
220-
flavor=item["flavor"],
221-
user_id=item["user_id"],
222-
action=item["action"],
223-
comment=item["comment"],
224-
product_type=item["product_type"],
225-
diffs=item.get("diffs"),
226-
)
227-
228-
229-
class UpdateListener(ABC):
230-
"""A class representing a daemon that listens to updates from a Redis
240+
241+
if stream_name == ocr_ready_stream_name:
242+
yield OCRReadyEvent(
243+
id=timestamp_id,
244+
stream=stream_name,
245+
timestamp=timestamp, # type: ignore
246+
code=item["code"],
247+
product_type=item["product_type"],
248+
image_id=item["image_id"],
249+
json_url=item["json_url"],
250+
)
251+
else:
252+
yield ProductUpdateEvent(
253+
id=timestamp_id,
254+
stream=stream_name,
255+
timestamp=timestamp, # type: ignore
256+
code=item["code"],
257+
flavor=item["flavor"],
258+
user_id=item["user_id"],
259+
action=item["action"],
260+
comment=item["comment"],
261+
product_type=item["product_type"],
262+
diffs=item.get("diffs"),
263+
)
264+
265+
266+
class UpdateListener:
267+
"""A class representing a daemon that listens to events from a Redis
231268
stream and processes them.
232269
233270
The class is meant to be subclassed to implement the processing logic.
234-
Subclasses must implement the `process_redis_update` method.
271+
Subclasses can implement the `process_redis_update` and
272+
`process_ocr_ready` methods.
235273
"""
236274

237275
def __init__(
238-
self, redis_client: Redis, redis_stream_name: str, redis_latest_id_key: str
276+
self,
277+
redis_client: Redis,
278+
redis_latest_id_key: str,
279+
product_updates_stream_name: str = "product_updates",
280+
ocr_ready_stream_name: str = "ocr_ready",
239281
):
240282
self.redis_client = redis_client
241-
self.redis_stream_name = redis_stream_name
283+
self.product_updates_stream_name = product_updates_stream_name
284+
self.ocr_ready_stream_name = ocr_ready_stream_name
242285
self.redis_latest_id_key = redis_latest_id_key
243286

244287
def run(self):
245288
"""Run the update import daemon.
246289
247290
This daemon listens to the Redis stream containing information about
248-
product updates, and triggers
291+
product updates or OCR ready events, and processes them as they
292+
arrive.
249293
"""
250294
logger.info("Starting update listener daemon")
251295

@@ -265,17 +309,21 @@ def run(self):
265309
else:
266310
logger.info("No latest ID found")
267311

268-
for redis_update in get_new_updates(
269-
self.redis_client, stream_name=self.redis_stream_name, min_id=latest_id
312+
for event in get_new_updates_multistream(
313+
self.redis_client,
314+
min_id=latest_id,
270315
):
271316
try:
272-
self.process_redis_update(redis_update)
317+
if isinstance(event, OCRReadyEvent):
318+
self.process_ocr_ready(event)
319+
else:
320+
self.process_redis_update(event)
273321
except Exception as e:
274322
logger.exception(e)
275-
self.redis_client.set(self.redis_latest_id_key, redis_update.id)
323+
self.redis_client.set(self.redis_latest_id_key, event.id)
276324

277325
def process_updates_since(
278-
self, since: datetime.datetime, to: Optional[datetime.datetime] = None
326+
self, since: datetime.datetime, to: datetime.datetime | None = None
279327
):
280328
"""Process all the updates since the given timestamp.
281329
@@ -289,16 +337,23 @@ def process_updates_since(
289337
self.redis_client.ping()
290338

291339
processed = 0
292-
for product_update in get_processed_since(
293-
self.redis_client, stream_name=self.redis_stream_name, min_id=since
340+
for event in get_processed_since(
341+
self.redis_client,
342+
min_id=since,
294343
):
295-
if to is not None and product_update.timestamp > to:
344+
if to is not None and event.timestamp > to:
296345
break
297-
self.process_redis_update(product_update)
346+
if isinstance(event, OCRReadyEvent):
347+
self.process_ocr_ready(event)
348+
else:
349+
self.process_redis_update(event)
350+
298351
processed += 1
299352

300-
logger.info("Processed %d updates", processed)
353+
logger.info("Processed %d events", processed)
354+
355+
def process_redis_update(self, event: ProductUpdateEvent):
356+
pass
301357

302-
@abstractmethod
303-
def process_redis_update(self, redis_update: RedisUpdate):
358+
def process_ocr_ready(self, event: OCRReadyEvent):
304359
pass

0 commit comments

Comments
 (0)