diff --git a/openfoodfacts/redis.py b/openfoodfacts/redis.py index fe1d82cd..c76b9b37 100644 --- a/openfoodfacts/redis.py +++ b/openfoodfacts/redis.py @@ -1,13 +1,11 @@ import datetime -from abc import ABC, abstractmethod -from typing import Any, Iterator, Optional, Union, cast +import logging +from typing import Any, Iterator, cast from pydantic import BaseModel, Json from redis import Redis -from openfoodfacts.utils import get_logger - -logger = get_logger(__name__) +logger = logging.getLogger(__name__) def get_redis_client(**kwargs) -> Redis: @@ -17,14 +15,15 @@ def get_redis_client(**kwargs) -> Redis: ) -class RedisUpdate(BaseModel): +class ProductUpdateEvent(BaseModel): """A class representing a product update from a Redis Stream.""" - # The Redis ID of the update + # The Redis ID of the event id: str # The name of the Redis stream where the update was published + # This will always be "product_updates" stream: str - # The timestamp of the update + # The timestamp of the event timestamp: datetime.datetime # The code of the product code: str @@ -40,7 +39,7 @@ class RedisUpdate(BaseModel): product_type: str # A JSON object representing the differences between the old and new # product data - diffs: Optional[Json[Any]] = None + diffs: Json[Any] | None = None def is_image_upload(self) -> bool: """Returns True if the update is an image upload.""" @@ -94,106 +93,132 @@ def is_image_deletion(self) -> bool: ) +class OCRReadyEvent(BaseModel): + """A class representing an OCR ready event from a Redis Stream. + + This event is published when the OCR processing (done by Google Cloud + Vision) of an image is complete. + + The OCR result (JSON file) is available at the URL provided in the + `json_url` field. + """ + + # The Redis ID of the event + id: str + # The name of the Redis stream where the event was published + # This will always be "ocr_ready" + stream: str + # The timestamp of the event + timestamp: datetime.datetime + # The code of the product + code: str + # the type of the product (food, product, petfood, beauty) + product_type: str + # The ID of the image (ex: "1") + image_id: str + # The URL of the OCR result (JSON file) + json_url: str + + def get_processed_since( redis_client: Redis, - stream_name: str, - min_id: Union[str, datetime.datetime], + min_id: str | datetime.datetime, + product_updates_stream_name: str = "product_updates", + ocr_ready_stream_name: str = "ocr_ready", batch_size: int = 100, -) -> Iterator[RedisUpdate]: - """Fetches all the updates that have been published since the given - timestamp. +) -> Iterator[ProductUpdateEvent | OCRReadyEvent]: + """Fetches all events (product update or ocr ready events) that have been + published since the given timestamp. :param redis_client: the Redis client - :param stream_name: the name of the Redis stream to read from :param min_id: the minimum ID to start from, or a datetime object + :param product_updates_stream_name: the name of the Redis stream for + product updates, defaults to "product_updates" + :param ocr_ready_stream_name: the name of the Redis stream for OCR ready + events, defaults to "ocr_ready" :param batch_size: the size of the batch to fetch, defaults to 100 - :yield: a RedisUpdate instance for each update + :yield: a ProductUpdateEvent or OCRReadyEvent instance for each update """ if isinstance(min_id, datetime.datetime): min_id = f"{int(min_id.timestamp() * 1000)}-0" - while True: - logger.debug( - "Fetching batch from Redis, stream %s, min_id %s, count %d", - stream_name, - min_id, - batch_size, - ) - batch = redis_client.xrange(stream_name, min=min_id, count=batch_size) - if not batch: - # We reached the end of the stream - break - - batch = cast(list[tuple[str, dict]], batch) - # We update the min_id to the last ID of the batch - min_id = f"({batch[-1][0]}" - for timestamp_id, item in batch: - # Get the timestamp from the ID - timestamp = int(timestamp_id.split("-")[0]) - yield RedisUpdate( - id=timestamp_id, - timestamp=timestamp, # type: ignore - stream=stream_name, - code=item["code"], - flavor=item["flavor"], - user_id=item["user_id"], - action=item["action"], - comment=item["comment"], - product_type=item["product_type"], - diffs=item.get("diffs"), + for stream_name in ( + product_updates_stream_name, + ocr_ready_stream_name, + ): + while True: + logger.debug( + "Fetching batch from Redis, stream %s, min_id %s, count %d", + stream_name, + min_id, + batch_size, ) + batch = redis_client.xrange(stream_name, min=min_id, count=batch_size) + if not batch: + # We reached the end of the stream + break + batch = cast(list[tuple[str, dict]], batch) + # We update the min_id to the last ID of the batch + min_id = f"({batch[-1][0]}" + for timestamp_id, item in batch: + # Get the timestamp from the ID + timestamp = int(timestamp_id.split("-")[0]) -def get_new_updates( - redis_client: Redis, - stream_name: str, - min_id: Union[str, datetime.datetime, None] = "$", - batch_size: int = 100, -) -> Iterator[RedisUpdate]: - """Reads new updates from a Redis Stream, starting from the moment this - function is called. - - The function will block until new updates are available. - - :param redis_client: the Redis client - :param stream_name: the name of the Redis stream to read from - :param min_id: the minimum ID to start from, defaults to "$". - A datetime object can also be passed. - :param batch_size: the size of the batch to fetch, defaults to 100 - :yield: a RedisUpdate instance for each update - """ - yield from get_new_updates_multistream( - redis_client=redis_client, - stream_names=[stream_name], - min_id=min_id, - batch_size=batch_size, - ) + if stream_name == ocr_ready_stream_name: + yield OCRReadyEvent( + id=timestamp_id, + timestamp=timestamp, # type: ignore + stream=stream_name, + code=item["code"], + product_type=item["product_type"], + image_id=item["image_id"], + json_url=item["json_url"], + ) + else: + yield ProductUpdateEvent( + id=timestamp_id, + timestamp=timestamp, # type: ignore + stream=stream_name, + code=item["code"], + flavor=item["flavor"], + user_id=item["user_id"], + action=item["action"], + comment=item["comment"], + product_type=item["product_type"], + diffs=item.get("diffs"), + ) def get_new_updates_multistream( redis_client: Redis, - stream_names: list[str], - min_id: Union[str, datetime.datetime, None] = "$", + product_updates_stream_name: str = "product_updates", + ocr_ready_stream_name: str = "ocr_ready", + min_id: str | datetime.datetime | None = "$", batch_size: int = 100, -) -> Iterator[RedisUpdate]: +) -> Iterator[ProductUpdateEvent | OCRReadyEvent]: """Reads new updates from Redis Stream, starting from the moment this function is called. The function will block until new updates are available. :param redis_client: the Redis client. - :param stream_names: the names of the Redis streams to read from. + :param product_updates_stream_name: the name of the Redis stream for + product updates, defaults to "product_updates". + :param ocr_ready_stream_name: the name of the Redis stream for OCR ready + events, defaults to "ocr_ready". :param min_id: the minimum ID to start from, defaults to "$". :param batch_size: the size of the batch to fetch, defaults to 100. - :yield: a RedisUpdate instance for each update. + :yield: a ProductUpdateEvent or OCRReadyEvent instance for each update. """ if min_id is None: min_id = "$" elif isinstance(min_id, datetime.datetime): min_id = f"{int(min_id.timestamp() * 1000)}-0" + stream_names = [product_updates_stream_name, ocr_ready_stream_name] # We start from the last ID - min_ids: dict[Union[bytes, str, memoryview], Union[int, bytes, str, memoryview]] = { + min_ids: dict[bytes | str | memoryview, int | bytes | str | memoryview] = { stream_name: min_id for stream_name in stream_names } while True: @@ -212,40 +237,59 @@ def get_new_updates_multistream( for timestamp_id, item in batch: # Get the timestamp from the ID timestamp = int(timestamp_id.split("-")[0]) - yield RedisUpdate( - id=timestamp_id, - timestamp=timestamp, # type: ignore - stream=stream_name, - code=item["code"], - flavor=item["flavor"], - user_id=item["user_id"], - action=item["action"], - comment=item["comment"], - product_type=item["product_type"], - diffs=item.get("diffs"), - ) - - -class UpdateListener(ABC): - """A class representing a daemon that listens to updates from a Redis + + if stream_name == ocr_ready_stream_name: + yield OCRReadyEvent( + id=timestamp_id, + stream=stream_name, + timestamp=timestamp, # type: ignore + code=item["code"], + product_type=item["product_type"], + image_id=item["image_id"], + json_url=item["json_url"], + ) + else: + yield ProductUpdateEvent( + id=timestamp_id, + stream=stream_name, + timestamp=timestamp, # type: ignore + code=item["code"], + flavor=item["flavor"], + user_id=item["user_id"], + action=item["action"], + comment=item["comment"], + product_type=item["product_type"], + diffs=item.get("diffs"), + ) + + +class UpdateListener: + """A class representing a daemon that listens to events from a Redis stream and processes them. The class is meant to be subclassed to implement the processing logic. - Subclasses must implement the `process_redis_update` method. + Subclasses can implement the `process_redis_update` and + `process_ocr_ready` methods. """ def __init__( - self, redis_client: Redis, redis_stream_name: str, redis_latest_id_key: str + self, + redis_client: Redis, + redis_latest_id_key: str, + product_updates_stream_name: str = "product_updates", + ocr_ready_stream_name: str = "ocr_ready", ): self.redis_client = redis_client - self.redis_stream_name = redis_stream_name + self.product_updates_stream_name = product_updates_stream_name + self.ocr_ready_stream_name = ocr_ready_stream_name self.redis_latest_id_key = redis_latest_id_key def run(self): """Run the update import daemon. This daemon listens to the Redis stream containing information about - product updates, and triggers + product updates or OCR ready events, and processes them as they + arrive. """ logger.info("Starting update listener daemon") @@ -265,17 +309,21 @@ def run(self): else: logger.info("No latest ID found") - for redis_update in get_new_updates( - self.redis_client, stream_name=self.redis_stream_name, min_id=latest_id + for event in get_new_updates_multistream( + self.redis_client, + min_id=latest_id, ): try: - self.process_redis_update(redis_update) + if isinstance(event, OCRReadyEvent): + self.process_ocr_ready(event) + else: + self.process_redis_update(event) except Exception as e: logger.exception(e) - self.redis_client.set(self.redis_latest_id_key, redis_update.id) + self.redis_client.set(self.redis_latest_id_key, event.id) def process_updates_since( - self, since: datetime.datetime, to: Optional[datetime.datetime] = None + self, since: datetime.datetime, to: datetime.datetime | None = None ): """Process all the updates since the given timestamp. @@ -289,16 +337,23 @@ def process_updates_since( self.redis_client.ping() processed = 0 - for product_update in get_processed_since( - self.redis_client, stream_name=self.redis_stream_name, min_id=since + for event in get_processed_since( + self.redis_client, + min_id=since, ): - if to is not None and product_update.timestamp > to: + if to is not None and event.timestamp > to: break - self.process_redis_update(product_update) + if isinstance(event, OCRReadyEvent): + self.process_ocr_ready(event) + else: + self.process_redis_update(event) + processed += 1 - logger.info("Processed %d updates", processed) + logger.info("Processed %d events", processed) + + def process_redis_update(self, event: ProductUpdateEvent): + pass - @abstractmethod - def process_redis_update(self, redis_update: RedisUpdate): + def process_ocr_ready(self, event: OCRReadyEvent): pass diff --git a/poetry.lock b/poetry.lock index c930965e..a928c67d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. [[package]] name = "annotated-types" @@ -19,7 +19,7 @@ description = "Timeout context manager for asyncio programs" optional = true python-versions = ">=3.8" groups = ["main"] -markers = "python_full_version < \"3.11.3\"" +markers = "extra == \"redis\" and python_full_version < \"3.11.3\"" files = [ {file = "async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c"}, {file = "async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3"}, @@ -469,7 +469,7 @@ description = "Backport of PEP 654 (exception groups)" optional = false python-versions = ">=3.7" groups = ["dev"] -markers = "python_version < \"3.11\"" +markers = "python_version == \"3.10\"" files = [ {file = "exceptiongroup-1.2.2-py3-none-any.whl", hash = "sha256:3111b9d131c238bec2f8f516e123e14ba243563fb135d3fe885990585aa7795b"}, {file = "exceptiongroup-1.2.2.tar.gz", hash = "sha256:47c2edf7c6738fafb49fd34290706d1a1a2f4d1c6df275526b62cbb4aa5393cc"}, @@ -502,6 +502,7 @@ description = "HTTP/2-based RPC framework" optional = true python-versions = ">=3.8" groups = ["main"] +markers = "extra == \"ml\"" files = [ {file = "grpcio-1.67.1-cp310-cp310-linux_armv7l.whl", hash = "sha256:8b0341d66a57f8a3119b77ab32207072be60c9bf79760fa609c5609f2deb1f3f"}, {file = "grpcio-1.67.1-cp310-cp310-macosx_12_0_universal2.whl", hash = "sha256:f5a27dddefe0e2357d3e617b9079b4bfdc91341a91565111a21ed6ebbc51b22d"}, @@ -698,6 +699,7 @@ description = "Fundamental package for array computing in Python" optional = true python-versions = ">=3.9" groups = ["main"] +markers = "extra == \"ml\"" files = [ {file = "numpy-2.0.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:51129a29dbe56f9ca83438b706e2e69a39892b5eda6cedcb6b0c9fdc9b0d3ece"}, {file = "numpy-2.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:f15975dfec0cf2239224d80e32c3170b1d168335eaedee69da84fbe9f1f9cd04"}, @@ -753,6 +755,7 @@ description = "Wrapper package for OpenCV python bindings." optional = true python-versions = ">=3.6" groups = ["main"] +markers = "extra == \"ml\"" files = [ {file = "opencv-python-headless-4.12.0.88.tar.gz", hash = "sha256:cfdc017ddf2e59b6c2f53bc12d74b6b0be7ded4ec59083ea70763921af2b6c09"}, {file = "opencv_python_headless-4.12.0.88-cp37-abi3-macosx_13_0_arm64.whl", hash = "sha256:1e58d664809b3350c1123484dd441e1667cd7bed3086db1b9ea1b6f6cb20b50e"}, @@ -777,6 +780,7 @@ files = [ {file = "packaging-24.2-py3-none-any.whl", hash = "sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759"}, {file = "packaging-24.2.tar.gz", hash = "sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f"}, ] +markers = {main = "extra == \"ml\""} [[package]] name = "pathspec" @@ -797,6 +801,7 @@ description = "Python Imaging Library (Fork)" optional = true python-versions = ">=3.9" groups = ["main"] +markers = "extra == \"pillow\" or extra == \"ml\"" files = [ {file = "pillow-11.3.0-cp310-cp310-macosx_10_10_x86_64.whl", hash = "sha256:1b9c17fd4ace828b3003dfd1e30bff24863e0eb59b535e8f80194d9cc7ecf860"}, {file = "pillow-11.3.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:65dc69160114cdd0ca0f35cb434633c75e8e7fad4cf855177a05bf38678f73ad"}, @@ -955,6 +960,7 @@ description = "" optional = true python-versions = ">=3.8" groups = ["main"] +markers = "extra == \"ml\"" files = [ {file = "protobuf-5.29.5-cp310-abi3-win32.whl", hash = "sha256:3f1c6468a2cfd102ff4703976138844f78ebd1fb45f49011afc5139e9e283079"}, {file = "protobuf-5.29.5-cp310-abi3-win_amd64.whl", hash = "sha256:3f76e3a3675b4a4d867b52e4a5f5b78a2ef9565549d4037e06cf7b0942b1d3fc"}, @@ -1186,6 +1192,7 @@ description = "Python wrapper around rapidjson" optional = true python-versions = ">=3.6" groups = ["main"] +markers = "extra == \"ml\"" files = [ {file = "python_rapidjson-1.20-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:eeaa8487fdd8db409bd2e0c41c59cee3b9f1d08401fc75520f7d35c7a22d8789"}, {file = "python_rapidjson-1.20-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:425c2bb8e778a04497953482c251944b2736f61012d897f17b73da3eca060c27"}, @@ -1269,6 +1276,7 @@ description = "Python client for Redis database and key-value store" optional = true python-versions = ">=3.9" groups = ["main"] +markers = "extra == \"redis\"" files = [ {file = "redis-6.4.0-py3-none-any.whl", hash = "sha256:f0544fa9604264e9464cdf4814e7d4830f74b165d52f2a330a760a88dd248b7f"}, {file = "redis-6.4.0.tar.gz", hash = "sha256:b01bc7282b8444e28ec36b261df5375183bb47a07eb9c603f284e89cbc5ef010"}, @@ -1394,6 +1402,7 @@ description = "Python client library and utilities for communicating with Triton optional = true python-versions = "*" groups = ["main"] +markers = "extra == \"ml\"" files = [ {file = "tritonclient-2.59.0-py3-none-any.whl", hash = "sha256:dc06a52e21e0f072930d664a677db005851624f448d46b23fe7ec5dea9927ec7"}, {file = "tritonclient-2.59.0-py3-none-manylinux1_x86_64.whl", hash = "sha256:4fd772f08cefdac6c7e53129861ce58cc0be9a009651b44b8bc07f61351318c9"}, @@ -1540,5 +1549,5 @@ redis = ["redis"] [metadata] lock-version = "2.1" -python-versions = ">=3.9,<4.0" -content-hash = "66802aeb85d778da268bc3373d21fdecf801ab6d08434d45f487daa04fb60a2c" +python-versions = ">=3.10,<4.0" +content-hash = "99f3a2c94d2f4f99140b3533c52057af749343a4181b6aee74c3d201ace19dc5" diff --git a/pyproject.toml b/pyproject.toml index 4bf95633..d77cf740 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ authors = [ description = "Official Python SDK of Open Food Facts" readme = "README.md" license = "MIT" -requires-python = ">=3.9" +requires-python = ">=3.10" dependencies = [ "requests>=2.20.0", "pydantic>=2.0.0,<3.0.0", @@ -42,7 +42,7 @@ ensure_newline_before_comments = true line_length = 88 [tool.poetry.dependencies] -python = ">=3.9,<4.0" +python = ">=3.10,<4.0" redis = { version = "~6.4.0", optional = true, extras = ["hiredis"] } Pillow = { version = ">=9.3,<12", optional = true } tritonclient = {extras = ["grpc"], version = ">2.0.0,<3.0.0", optional = true} diff --git a/tests/unit/test_redis.py b/tests/unit/test_redis.py index 08a4174e..bb693eee 100644 --- a/tests/unit/test_redis.py +++ b/tests/unit/test_redis.py @@ -1,10 +1,15 @@ +import datetime import json from typing import Optional, cast import pytest from redis import Redis -from openfoodfacts.redis import RedisUpdate, get_new_updates, get_processed_since +from openfoodfacts.redis import ( + ProductUpdateEvent, + get_new_updates_multistream, + get_processed_since, +) class TestRedisUpdate: @@ -17,7 +22,7 @@ class TestRedisUpdate: ], ) def test_is_image_upload(self, diffs, expected): - update = RedisUpdate( + update = ProductUpdateEvent( id="1629878400000-0", stream="product_updates", timestamp=1629878400000, @@ -41,7 +46,7 @@ def test_is_image_upload(self, diffs, expected): ], ) def test_is_product_type_change(self, diffs, expected): - update = RedisUpdate( + update = ProductUpdateEvent( id="1629878400000-0", stream="product_updates", timestamp=1629878400000, @@ -70,7 +75,7 @@ def test_is_product_type_change(self, diffs, expected): ], ) def test_is_field_updated(self, diffs, field_name, expected): - update = RedisUpdate( + update = ProductUpdateEvent( id="1629878400000-0", stream="product_updates", timestamp=1629878400000, @@ -99,7 +104,7 @@ def test_is_field_updated(self, diffs, field_name, expected): ], ) def test_is_field_added(self, diffs, field_name, expected): - update = RedisUpdate( + update = ProductUpdateEvent( id="1629878400000-0", stream="product_updates", timestamp=1629878400000, @@ -134,7 +139,7 @@ def test_is_field_added(self, diffs, field_name, expected): ], ) def test_is_field_added_or_updated(self, diffs, field_name, expected): - update = RedisUpdate( + update = ProductUpdateEvent( id="1629878400000-0", stream="product_updates", timestamp=1629878400000, @@ -165,7 +170,7 @@ def test_is_field_added_or_updated(self, diffs, field_name, expected): ], ) def test_is_image_deletion(self, diffs, expected): - update = RedisUpdate( + update = ProductUpdateEvent( id="1629878400000-0", stream="product_updates", timestamp=1629878400000, @@ -188,7 +193,7 @@ def __init__(self, xrange_return_values: list): def xrange( self, name: str, min: str = "-", max: str = "+", count: Optional[int] = None ): - assert name == "product_updates" + assert name in ("product_updates", "ocr_ready") assert max == "+" assert count == 100 if self.call_count >= len(self.xrange_return_values): @@ -219,21 +224,20 @@ def test_get_processed_since(): results = list( get_processed_since( redis_client, - stream_name=stream_name, min_id=start_timestamp_ms, ) ) # Assertions assert len(results) == 2 - assert results[0] == RedisUpdate( + assert results[0] == ProductUpdateEvent( id="1629878400000-0", stream=stream_name, timestamp=1629878400000, code="2", **base_values, ) - assert results[1] == RedisUpdate( + assert results[1] == ProductUpdateEvent( id="1629878400001-0", stream=stream_name, timestamp=1629878400001, @@ -248,7 +252,7 @@ def __init__(self, xread_return_values: list): self.call_count = 0 def xread(self, streams: dict, block: int, count: Optional[int] = None): - assert set(streams.keys()) == {"product_updates"} + assert set(streams.keys()) == {"product_updates", "ocr_ready"} assert block == 0 assert count == 100 if self.call_count >= len(self.xread_return_values): @@ -257,51 +261,75 @@ def xread(self, streams: dict, block: int, count: Optional[int] = None): return self.xread_return_values[self.call_count - 1] -def test_get_new_updates(): - redis_stream_name = "product_updates" - base_values = { +def test_get_new_updates_multistream(): + product_updates_stream_name = "product_updates" + ocr_ready_stream_name = "ocr_ready" + base_values_product_updates = { "flavor": "off", "user_id": "user1", "action": "updated", "comment": "comment", "product_type": "beauty", } + ocr_ready_event = { + "product_type": "beauty", + "code": "3215495849204", + "image_id": "2", + "json_url": "https://images.openfoodfacts.org/images/products/321/549/584/9204/2.json", + } return_values = [ [ ( - redis_stream_name, - [("1629878400002-0", {"code": "4", **base_values})], - ) + product_updates_stream_name, + [("1629878400000-0", {"code": "4", **base_values_product_updates})], + ), + ], + [ + ( + ocr_ready_stream_name, + [("1629878400001-0", ocr_ready_event)], + ), ], [ ( - redis_stream_name, - [("1629878400000-0", {"code": "1", **base_values})], + product_updates_stream_name, + [("1629878400002-0", {"code": "1", **base_values_product_updates})], ) ], [ ( - redis_stream_name, - [("1629878400001-0", {"code": "2", **base_values})], + product_updates_stream_name, + [("1629878400003-0", {"code": "2", **base_values_product_updates})], ) ], [ ( - redis_stream_name, - [("1629878400003-0", {"code": "3", **base_values})], + product_updates_stream_name, + [("1629878400004-0", {"code": "3", **base_values_product_updates})], ) ], ] redis_client = cast(Redis, RedisXreadClient(return_values)) # Call the function and iterate over the results - updates_iter = get_new_updates(redis_client, stream_name=redis_stream_name) + updates_iter = get_new_updates_multistream(redis_client) - results = next(updates_iter) - assert results == RedisUpdate( - id="1629878400002-0", - stream=redis_stream_name, - timestamp=1629878400002, + product_update_result = next(updates_iter) + assert product_update_result == ProductUpdateEvent( + id="1629878400000-0", + stream=product_updates_stream_name, + timestamp=1629878400000, code="4", - **base_values, + **base_values_product_updates, + ) + + ocr_ready_result = next(updates_iter) + assert ocr_ready_result.id == "1629878400001-0" + assert ocr_ready_result.stream == ocr_ready_stream_name + assert ocr_ready_result.timestamp == datetime.datetime.fromtimestamp( + 1629878400.001, tz=datetime.timezone.utc ) + assert ocr_ready_result.code == ocr_ready_event["code"] + assert ocr_ready_result.product_type == ocr_ready_event["product_type"] + assert ocr_ready_result.image_id == ocr_ready_event["image_id"] + assert ocr_ready_result.json_url == ocr_ready_event["json_url"]