Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 162 additions & 107 deletions openfoodfacts/redis.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import datetime
from abc import ABC, abstractmethod
from typing import Any, Iterator, Optional, Union, cast
import logging
from typing import Any, Iterator, cast

from pydantic import BaseModel, Json
from redis import Redis

from openfoodfacts.utils import get_logger

logger = get_logger(__name__)
logger = logging.getLogger(__name__)


def get_redis_client(**kwargs) -> Redis:
Expand All @@ -17,14 +15,15 @@ def get_redis_client(**kwargs) -> Redis:
)


class RedisUpdate(BaseModel):
class ProductUpdateEvent(BaseModel):
"""A class representing a product update from a Redis Stream."""

# The Redis ID of the update
# The Redis ID of the event
id: str
# The name of the Redis stream where the update was published
# This will always be "product_updates"
stream: str
# The timestamp of the update
# The timestamp of the event
timestamp: datetime.datetime
# The code of the product
code: str
Expand All @@ -40,7 +39,7 @@ class RedisUpdate(BaseModel):
product_type: str
# A JSON object representing the differences between the old and new
# product data
diffs: Optional[Json[Any]] = None
diffs: Json[Any] | None = None

def is_image_upload(self) -> bool:
"""Returns True if the update is an image upload."""
Expand Down Expand Up @@ -94,106 +93,132 @@ def is_image_deletion(self) -> bool:
)


class OCRReadyEvent(BaseModel):
"""A class representing an OCR ready event from a Redis Stream.

This event is published when the OCR processing (done by Google Cloud
Vision) of an image is complete.

The OCR result (JSON file) is available at the URL provided in the
`json_url` field.
"""

# The Redis ID of the event
id: str
# The name of the Redis stream where the event was published
# This will always be "ocr_ready"
stream: str
# The timestamp of the event
timestamp: datetime.datetime
# The code of the product
code: str
# the type of the product (food, product, petfood, beauty)
product_type: str
# The ID of the image (ex: "1")
image_id: str
# The URL of the OCR result (JSON file)
json_url: str


def get_processed_since(
redis_client: Redis,
stream_name: str,
min_id: Union[str, datetime.datetime],
min_id: str | datetime.datetime,
product_updates_stream_name: str = "product_updates",
ocr_ready_stream_name: str = "ocr_ready",
batch_size: int = 100,
) -> Iterator[RedisUpdate]:
"""Fetches all the updates that have been published since the given
timestamp.
) -> Iterator[ProductUpdateEvent | OCRReadyEvent]:
"""Fetches all events (product update or ocr ready events) that have been
published since the given timestamp.

:param redis_client: the Redis client
:param stream_name: the name of the Redis stream to read from
:param min_id: the minimum ID to start from, or a datetime object
:param product_updates_stream_name: the name of the Redis stream for
product updates, defaults to "product_updates"
:param ocr_ready_stream_name: the name of the Redis stream for OCR ready
events, defaults to "ocr_ready"
:param batch_size: the size of the batch to fetch, defaults to 100
:yield: a RedisUpdate instance for each update
:yield: a ProductUpdateEvent or OCRReadyEvent instance for each update
"""
if isinstance(min_id, datetime.datetime):
min_id = f"{int(min_id.timestamp() * 1000)}-0"

while True:
logger.debug(
"Fetching batch from Redis, stream %s, min_id %s, count %d",
stream_name,
min_id,
batch_size,
)
batch = redis_client.xrange(stream_name, min=min_id, count=batch_size)
if not batch:
# We reached the end of the stream
break

batch = cast(list[tuple[str, dict]], batch)
# We update the min_id to the last ID of the batch
min_id = f"({batch[-1][0]}"
for timestamp_id, item in batch:
# Get the timestamp from the ID
timestamp = int(timestamp_id.split("-")[0])
yield RedisUpdate(
id=timestamp_id,
timestamp=timestamp, # type: ignore
stream=stream_name,
code=item["code"],
flavor=item["flavor"],
user_id=item["user_id"],
action=item["action"],
comment=item["comment"],
product_type=item["product_type"],
diffs=item.get("diffs"),
for stream_name in (
product_updates_stream_name,
ocr_ready_stream_name,
):
while True:
logger.debug(
"Fetching batch from Redis, stream %s, min_id %s, count %d",
stream_name,
min_id,
batch_size,
)
batch = redis_client.xrange(stream_name, min=min_id, count=batch_size)
if not batch:
# We reached the end of the stream
break

batch = cast(list[tuple[str, dict]], batch)
# We update the min_id to the last ID of the batch
min_id = f"({batch[-1][0]}"
for timestamp_id, item in batch:
# Get the timestamp from the ID
timestamp = int(timestamp_id.split("-")[0])

def get_new_updates(
redis_client: Redis,
stream_name: str,
min_id: Union[str, datetime.datetime, None] = "$",
batch_size: int = 100,
) -> Iterator[RedisUpdate]:
"""Reads new updates from a Redis Stream, starting from the moment this
function is called.

The function will block until new updates are available.

:param redis_client: the Redis client
:param stream_name: the name of the Redis stream to read from
:param min_id: the minimum ID to start from, defaults to "$".
A datetime object can also be passed.
:param batch_size: the size of the batch to fetch, defaults to 100
:yield: a RedisUpdate instance for each update
"""
yield from get_new_updates_multistream(
redis_client=redis_client,
stream_names=[stream_name],
min_id=min_id,
batch_size=batch_size,
)
if stream_name == ocr_ready_stream_name:
yield OCRReadyEvent(
id=timestamp_id,
timestamp=timestamp, # type: ignore
stream=stream_name,
code=item["code"],
product_type=item["product_type"],
image_id=item["image_id"],
json_url=item["json_url"],
)
else:
yield ProductUpdateEvent(
id=timestamp_id,
timestamp=timestamp, # type: ignore
stream=stream_name,
code=item["code"],
flavor=item["flavor"],
user_id=item["user_id"],
action=item["action"],
comment=item["comment"],
product_type=item["product_type"],
diffs=item.get("diffs"),
)


def get_new_updates_multistream(
redis_client: Redis,
stream_names: list[str],
min_id: Union[str, datetime.datetime, None] = "$",
product_updates_stream_name: str = "product_updates",
ocr_ready_stream_name: str = "ocr_ready",
min_id: str | datetime.datetime | None = "$",
batch_size: int = 100,
) -> Iterator[RedisUpdate]:
) -> Iterator[ProductUpdateEvent | OCRReadyEvent]:
"""Reads new updates from Redis Stream, starting from the moment this
function is called.

The function will block until new updates are available.

:param redis_client: the Redis client.
:param stream_names: the names of the Redis streams to read from.
:param product_updates_stream_name: the name of the Redis stream for
product updates, defaults to "product_updates".
:param ocr_ready_stream_name: the name of the Redis stream for OCR ready
events, defaults to "ocr_ready".
:param min_id: the minimum ID to start from, defaults to "$".
:param batch_size: the size of the batch to fetch, defaults to 100.
:yield: a RedisUpdate instance for each update.
:yield: a ProductUpdateEvent or OCRReadyEvent instance for each update.
"""
if min_id is None:
min_id = "$"
elif isinstance(min_id, datetime.datetime):
min_id = f"{int(min_id.timestamp() * 1000)}-0"

stream_names = [product_updates_stream_name, ocr_ready_stream_name]
# We start from the last ID
min_ids: dict[Union[bytes, str, memoryview], Union[int, bytes, str, memoryview]] = {
min_ids: dict[bytes | str | memoryview, int | bytes | str | memoryview] = {
stream_name: min_id for stream_name in stream_names
}
while True:
Expand All @@ -212,40 +237,59 @@ def get_new_updates_multistream(
for timestamp_id, item in batch:
# Get the timestamp from the ID
timestamp = int(timestamp_id.split("-")[0])
yield RedisUpdate(
id=timestamp_id,
timestamp=timestamp, # type: ignore
stream=stream_name,
code=item["code"],
flavor=item["flavor"],
user_id=item["user_id"],
action=item["action"],
comment=item["comment"],
product_type=item["product_type"],
diffs=item.get("diffs"),
)


class UpdateListener(ABC):
"""A class representing a daemon that listens to updates from a Redis

if stream_name == ocr_ready_stream_name:
yield OCRReadyEvent(
id=timestamp_id,
stream=stream_name,
timestamp=timestamp, # type: ignore
code=item["code"],
product_type=item["product_type"],
image_id=item["image_id"],
json_url=item["json_url"],
)
else:
yield ProductUpdateEvent(
id=timestamp_id,
stream=stream_name,
timestamp=timestamp, # type: ignore
code=item["code"],
flavor=item["flavor"],
user_id=item["user_id"],
action=item["action"],
comment=item["comment"],
product_type=item["product_type"],
diffs=item.get("diffs"),
)


class UpdateListener:
"""A class representing a daemon that listens to events from a Redis
stream and processes them.

The class is meant to be subclassed to implement the processing logic.
Subclasses must implement the `process_redis_update` method.
Subclasses can implement the `process_redis_update` and
`process_ocr_ready` methods.
"""

def __init__(
self, redis_client: Redis, redis_stream_name: str, redis_latest_id_key: str
self,
redis_client: Redis,
redis_latest_id_key: str,
product_updates_stream_name: str = "product_updates",
ocr_ready_stream_name: str = "ocr_ready",
):
self.redis_client = redis_client
self.redis_stream_name = redis_stream_name
self.product_updates_stream_name = product_updates_stream_name
self.ocr_ready_stream_name = ocr_ready_stream_name
self.redis_latest_id_key = redis_latest_id_key

def run(self):
"""Run the update import daemon.

This daemon listens to the Redis stream containing information about
product updates, and triggers
product updates or OCR ready events, and processes them as they
arrive.
"""
logger.info("Starting update listener daemon")

Expand All @@ -265,17 +309,21 @@ def run(self):
else:
logger.info("No latest ID found")

for redis_update in get_new_updates(
self.redis_client, stream_name=self.redis_stream_name, min_id=latest_id
for event in get_new_updates_multistream(
self.redis_client,
min_id=latest_id,
):
try:
self.process_redis_update(redis_update)
if isinstance(event, OCRReadyEvent):
self.process_ocr_ready(event)
else:
self.process_redis_update(event)
except Exception as e:
logger.exception(e)
self.redis_client.set(self.redis_latest_id_key, redis_update.id)
self.redis_client.set(self.redis_latest_id_key, event.id)

def process_updates_since(
self, since: datetime.datetime, to: Optional[datetime.datetime] = None
self, since: datetime.datetime, to: datetime.datetime | None = None
):
"""Process all the updates since the given timestamp.

Expand All @@ -289,16 +337,23 @@ def process_updates_since(
self.redis_client.ping()

processed = 0
for product_update in get_processed_since(
self.redis_client, stream_name=self.redis_stream_name, min_id=since
for event in get_processed_since(
self.redis_client,
min_id=since,
):
if to is not None and product_update.timestamp > to:
if to is not None and event.timestamp > to:
break
self.process_redis_update(product_update)
if isinstance(event, OCRReadyEvent):
self.process_ocr_ready(event)
else:
self.process_redis_update(event)

processed += 1

logger.info("Processed %d updates", processed)
logger.info("Processed %d events", processed)

def process_redis_update(self, event: ProductUpdateEvent):
pass

@abstractmethod
def process_redis_update(self, redis_update: RedisUpdate):
def process_ocr_ready(self, event: OCRReadyEvent):
pass
Loading
Loading