Skip to content

Conversation

@rodrigonull
Copy link
Member

@rodrigonull rodrigonull commented Nov 26, 2025

Overview

This PR is being created to address RHINENG-22193, RHINENG-22194 and RHINENG-22297.

This PR implements a new Kafka consumer to ingest host application data from downstream services (Advisor, Vulnerability, Patch, Remediations, Compliance, Malware, and Image Builder) and adds comprehensive metrics for monitoring consumer data processing.

OBS: IQE tests will be addressed in follow up PRs.

PR Checklist

  • Keep PR title short, ideally under 72 characters
  • Descriptive comments provided in complex code blocks
  • Include raw query examples in the PR description, if adding/modifying SQL query
  • Tests: validate optimal/expected output
  • Tests: validate exceptions and failure scenarios
  • Tests: edge cases
  • Recovers or fails gracefully during potential resource outages (e.g. DB, Kafka)
  • Uses type hinting, if convenient
  • Documentation, if this PR changes the way other services interact with host inventory
  • Links to related PRs

Secure Coding Practices Documentation Reference

You can find documentation on this checklist here.

Secure Coding Checklist

  • Input Validation
  • Output Encoding
  • Authentication and Password Management
  • Session Management
  • Access Control
  • Cryptographic Practices
  • Error Handling and Logging
  • Data Protection
  • Communication Security
  • System Configuration
  • Database Security
  • File Management
  • Memory Management
  • General Coding Practices

Summary by Sourcery

Implement Kafka consumer and supporting infrastructure for ingesting host application data into inventory views

New Features:

  • Add HostAppMessageConsumer to process host application data from a dedicated Kafka topic
  • Introduce per-application schemas and repository upsert helper for storing host app data
  • Expose configuration for host app data Kafka topic and consumer group

Enhancements:

  • Generalize message parsing and consumer metrics to support multiple consumer types
  • Extend instrumentation with logging for successful host app data upserts

Tests:

  • Add comprehensive unit tests covering host app consumer validation, per-application processing, metrics, edge cases, and DB error handling

@rodrigonull rodrigonull requested a review from a team as a code owner November 26, 2025 13:10
@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented Nov 26, 2025

Reviewer's Guide

Implements a new Kafka consumer pipeline for host application data (Unified Inventory Views), including schemas, metrics, configuration wiring, DB upsert helper, and extensive tests, while generalizing message handling and parsing to support per-consumer metrics and headers.

Sequence diagram for HostAppMessageConsumer Kafka processing

sequenceDiagram
    participant KafkaBroker
    participant InventoryMQService
    participant HostAppMessageConsumer
    participant parse_operation_message
    participant HostAppOperationSchema
    participant host_app_repository
    participant DB
    participant Metrics

    KafkaBroker->>InventoryMQService: Kafka message on host_app_data_topic
    InventoryMQService->>HostAppMessageConsumer: msg.value(), msg.headers()

    activate HostAppMessageConsumer
    HostAppMessageConsumer->>HostAppMessageConsumer: _extract_headers(headers)
    HostAppMessageConsumer-->>InventoryMQService: ValidationException on missing or unknown application
    deactivate HostAppMessageConsumer

    Note over HostAppMessageConsumer,Metrics: For valid application header

    activate HostAppMessageConsumer
    HostAppMessageConsumer->>parse_operation_message: message, HostAppOperationSchema, host_app_message_parsing_time, host_app_data_parsing_failure
    activate parse_operation_message
    parse_operation_message->>HostAppOperationSchema: schema().load(parsed_message)
    HostAppOperationSchema-->>parse_operation_message: validated_msg
    parse_operation_message-->>HostAppMessageConsumer: validated_msg
    deactivate parse_operation_message

    HostAppMessageConsumer->>HostAppMessageConsumer: process_message(application, validated_msg)
    HostAppMessageConsumer->>HostAppMessageConsumer: _get_app_classes(application)
    HostAppMessageConsumer->>HostAppMessageConsumer: _validate_and_prepare_hosts(hosts, schema_class, application, org_id, timestamp)
    HostAppMessageConsumer-->>HostAppMessageConsumer: valid_hosts_list

    alt valid_hosts_list empty
        HostAppMessageConsumer-->>InventoryMQService: OperationResult with success_logger(log_host_app_data_upsert_via_mq)
    else valid_hosts_list not empty
        HostAppMessageConsumer->>host_app_repository: upsert_host_app_data(model_class, application, org_id, hosts_data)
        activate host_app_repository
        host_app_repository->>DB: INSERT ... ON CONFLICT DO UPDATE
        DB-->>host_app_repository: success
        host_app_repository-->>HostAppMessageConsumer: success_count
        deactivate host_app_repository

        HostAppMessageConsumer->>Metrics: host_app_data_processing_success.labels(application, org_id).inc(success_count)
        HostAppMessageConsumer-->>InventoryMQService: OperationResult with success_logger(log_host_app_data_upsert_via_mq)
    end
    deactivate HostAppMessageConsumer

    InventoryMQService->>InventoryMQService: append OperationResult to processed_rows
    InventoryMQService->>Metrics: success_metric.inc() or failure_metric.inc() based on outcome

    Note over InventoryMQService: After batch commit, HostAppMessageConsumer.post_process_rows() calls success_logger()
Loading

Class diagram for new HostAppMessageConsumer pipeline

classDiagram
    class HBIMessageConsumerBase {
        - notification_event_producer
        - processed_rows list~OperationResult~
        + success_metric
        + failure_metric
        + process_message()
        + handle_message(message, headers) OperationResult
        + post_process_rows() void
        + _process_batch() void
        + event_loop(interrupt) void
    }

    class WorkspaceMessageConsumer {
        + handle_message(message, headers) OperationResult
        + process_message()
        + post_process_rows() void
    }

    class HostMessageConsumer {
        + handle_message(message, headers) OperationResult
        + process_message()
        + post_process_rows() void
    }

    class HostAppMessageConsumer {
        + APPLICATION_MAP
        + handle_message(message, headers) OperationResult
        + process_message(application, validated_msg) OperationResult
        - _extract_headers(headers) tuple
        - _get_app_classes(application) tuple
        - _validate_and_prepare_hosts(hosts, schema_class, application, org_id, timestamp) list~dict~
        - _upsert_hosts(model_class, application, org_id, hosts_data) void
        + post_process_rows() void
    }

    class OperationResult {
        + host
        + platform_metadata
        + events
        + operation
        + notification_message
        + success_logger
    }

    class HostAppDataSchema {
        + id UUID
        + data Dict
    }

    class HostAppOperationSchema {
        + org_id Str
        + timestamp DateTime
        + hosts List~HostAppDataSchema~
    }

    class AdvisorDataSchema {
        + recommendations Int
        + incidents Int
    }

    class VulnerabilityDataSchema {
        + total_cves Int
        + critical_cves Int
        + high_severity_cves Int
        + cves_with_security_rules Int
        + cves_with_known_exploits Int
    }

    class PatchDataSchema {
        + installable_advisories Int
        + template Str
        + rhsm_locked_version Str
    }

    class RemediationsDataSchema {
        + remediations_plans Int
    }

    class ComplianceDataSchema {
        + policies Int
        + last_scan DateTime
    }

    class MalwareDataSchema {
        + last_status Str
        + last_matches Int
        + last_scan DateTime
    }

    class ImageBuilderDataSchema {
        + image_name Str
        + image_status Str
    }

    class host_app_repository {
        + upsert_host_app_data(model_class, application, org_id, hosts_data) Int
    }

    class metrics {
        + host_app_message_handler_time
        + host_app_data_processing_success
        + host_app_data_parsing_failure
        + host_app_data_validation_failure
        + host_app_data_processing_failure
        + ingress_message_handler_success
        + ingress_message_handler_failure
        + ingress_message_parsing_time
        + ingress_message_parsing_failure
    }

    class instrumentation {
        + log_host_app_data_upsert_via_mq(logger, application, org_id, host_ids) void
    }

    class parse_operation_message_fn {
        + parse_operation_message(message, schema, parsing_time_metric, parsing_failure_metric) dict
    }

    HBIMessageConsumerBase <|-- WorkspaceMessageConsumer
    HBIMessageConsumerBase <|-- HostMessageConsumer
    HBIMessageConsumerBase <|-- HostAppMessageConsumer

    HostAppOperationSchema --> HostAppDataSchema : hosts

    HostAppMessageConsumer --> HostAppOperationSchema : uses
    HostAppMessageConsumer --> parse_operation_message_fn : calls
    HostAppMessageConsumer --> host_app_repository : calls upsert_host_app_data
    HostAppMessageConsumer --> metrics : records
    HostAppMessageConsumer --> instrumentation : uses success_logger via OperationResult

    WorkspaceMessageConsumer --> metrics
    HostMessageConsumer --> metrics

    OperationResult <-- HBIMessageConsumerBase : processed_rows
    HostAppMessageConsumer --> OperationResult : returns

    AdvisorDataSchema <.. HostAppMessageConsumer : dynamic
    VulnerabilityDataSchema <.. HostAppMessageConsumer : dynamic
    PatchDataSchema <.. HostAppMessageConsumer : dynamic
    RemediationsDataSchema <.. HostAppMessageConsumer : dynamic
    ComplianceDataSchema <.. HostAppMessageConsumer : dynamic
    MalwareDataSchema <.. HostAppMessageConsumer : dynamic
    ImageBuilderDataSchema <.. HostAppMessageConsumer : dynamic
Loading

File-Level Changes

Change Details Files
Generalize base MQ consumer and message parsing to support headers and per-consumer metrics.
  • Add HostAppDataSchema and HostAppOperationSchema for generic host app envelope validation in host_mq
  • Change HBIMessageConsumerBase.handle_message signature to accept message and optional headers, with overridable success/failure metric properties
  • Update HostMessageConsumer and WorkspaceMessageConsumer to match new handle_message signature and keep timing metrics
  • Refactor parse_operation_message to accept configurable timing and failure metrics and wrap parsing in the metric timer
  • Update main MQ batch loop to pass Kafka headers to handle_message and use the new success/failure metric properties instead of hard‑coded metrics
app/queue/host_mq.py
tests/helpers/mq_utils.py
Introduce HostAppMessageConsumer to ingest host application data from Kafka with per-application validation, upsert, and metrics.
  • Implement HostAppMessageConsumer with header extraction (application, request_id), schema-based validation of HostAppOperationSchema, and dispatch to application-specific data schemas
  • Wire HostAppMessageConsumer into inv_mq_service topic mapping for the new host app data topic
  • Implement per-application host data validation and preparation, handling partial failures at per-host level and building OperationResult with a success logger
  • Add post_process_rows override to call log_host_app_data_upsert_via_mq after commit
app/queue/host_mq.py
inv_mq_service.py
app/instrumentation.py
Add application-specific Marshmallow schemas and a repository helper to upsert host app data efficiently.
  • Define AdvisorDataSchema, VulnerabilityDataSchema, PatchDataSchema, RemediationsDataSchema, ComplianceDataSchema, MalwareDataSchema, and ImageBuilderDataSchema with type and length validation aligned to host_app_events spec
  • Create lib.host_app_repository.upsert_host_app_data using PostgreSQL INSERT .. ON CONFLICT DO UPDATE keyed on (org_id, host_id), updating non-key columns and logging success/failure
app/models/schemas.py
lib/host_app_repository.py
Introduce host app–specific Prometheus metrics and logging hooks.
  • Add host_app_message_handler_time, host_app_data_processing_success, host_app_data_parsing_failure, host_app_data_validation_failure, and host_app_data_processing_failure counters/summaries
  • Expose a new log_host_app_data_upsert_via_mq instrumentation helper used by HostAppMessageConsumer post-processing
app/queue/metrics.py
app/instrumentation.py
Wire configuration for the new host app Kafka topic and consumer group.
  • Add host_app_data_topic to both Clowder and non-Clowder Kafka configuration, with default topic name platform.inventory.host-apps
  • Add host_app_data_consumer_group configuration and log both topic and consumer group in log_configuration
app/config.py
Add comprehensive tests for HostAppMessageConsumer behavior, validation, metrics, and DB error paths.
  • Create tests exercising validation errors (missing/unknown application, malformed JSON, missing fields), per-application happy paths, batch processing, and edge cases like empty host lists and null values
  • Mock metrics and DB errors to verify correct metric increments and error propagation
  • Extend FakeMessage in mq_utils to support headers used by the updated consumer loop
tests/test_host_app_mq_service.py
tests/helpers/mq_utils.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes - here's some feedback:

  • In HostAppMessageConsumer, the handle_message and _extract_headers signatures expect headers: list[tuple[str, bytes]] but _process_batch may pass None; consider updating the type hints/defaults to accept None explicitly to better reflect actual usage and avoid confusion for future callers.
  • The APPLICATION_MAP in HostAppMessageConsumer stores model and schema names as strings and uses getattr at runtime; you could reference the model and schema classes directly to get better static checking and avoid potential runtime errors from typos or refactors.
  • You generalized parse_operation_message to accept metric parameters, but HostAppMessageConsumer still uses the default ingress metrics; if you intend to keep host-app parsing metrics fully separate, consider passing host_app-specific metrics into parse_operation_message to avoid double-counting or mixing metrics.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `HostAppMessageConsumer`, the `handle_message` and `_extract_headers` signatures expect `headers: list[tuple[str, bytes]]` but `_process_batch` may pass `None`; consider updating the type hints/defaults to accept `None` explicitly to better reflect actual usage and avoid confusion for future callers.
- The `APPLICATION_MAP` in `HostAppMessageConsumer` stores model and schema names as strings and uses `getattr` at runtime; you could reference the model and schema classes directly to get better static checking and avoid potential runtime errors from typos or refactors.
- You generalized `parse_operation_message` to accept metric parameters, but `HostAppMessageConsumer` still uses the default ingress metrics; if you intend to keep host-app parsing metrics fully separate, consider passing `host_app`-specific metrics into `parse_operation_message` to avoid double-counting or mixing metrics.

## Individual Comments

### Comment 1
<location> `app/queue/host_mq.py:575` </location>
<code_context>
+    }
+
+    @metrics.host_app_message_handler_time.time()
+    def handle_message(self, message: str | bytes, headers: list[tuple[str, bytes]]) -> OperationResult:
+        application, request_id = self._extract_headers(headers)
+
</code_context>

<issue_to_address>
**issue (bug_risk):** The HostApp consumer’s handle_message signature is stricter than the base class and doesn’t accept None headers.

`HBIMessageConsumerBase.handle_message` declares `headers: list[tuple[str, bytes]] | None = None`, and `_process_batch` may pass `msg.headers()`, which can be `None`. This override removes `| None` and the default, so it’s not type-compatible with the base and may break when `headers` is `None`. Please align the signature to `def handle_message(self, message: str | bytes, headers: list[tuple[str, bytes]] | None = None) -> OperationResult:` and handle a `None` value inside.
</issue_to_address>

### Comment 2
<location> `lib/host_app_repository.py:41-43` </location>
<code_context>
+        # Create INSERT statement with all host data
+        stmt = insert(model_class).values(hosts_data)
+
+        # Create update dictionary excluding primary key columns (org_id, host_id)
+        # On conflict, update all other columns with the new values
+        update_dict = {col: stmt.excluded[col] for col in hosts_data[0].keys() if col not in ["org_id", "host_id"]}
+
+        # Add ON CONFLICT clause to perform upsert
</code_context>

<issue_to_address>
**issue:** upsert_host_app_data assumes hosts_data is non-empty, which will raise IndexError if called with an empty list.

This helper does `hosts_data[0].keys()` without handling an empty list. Although `HostAppMessageConsumer` currently guarantees a non-empty input, this is a shared helper and may be called differently elsewhere. Consider returning early when `not hosts_data` (e.g., `return 0`) before building `update_dict` so it fails safely in all contexts.
</issue_to_address>

### Comment 3
<location> `lib/host_app_repository.py:37-46` </location>
<code_context>
                 else:
                     logger.debug("Message received")

                     try:
-                        self.processed_rows.append(self.handle_message(msg.value()))
+                        self.processed_rows.append(self.handle_message(msg.value(), headers=msg.headers()))
</code_context>

<issue_to_address>
**suggestion:** Errors are logged both in the repository helper and in the caller, which can lead to noisy, duplicated log entries.

Right now the repository helper both logs and re-raises, and `_upsert_hosts` logs and records metrics for the same exceptions. That leads to double-logging errors like `OperationalError`/`IntegrityError` with nearly identical context. Consider making only one layer responsible for logging/metrics and having the other just propagate the exception.
</issue_to_address>

### Comment 4
<location> `tests/test_host_app_mq_service.py:316-325` </location>
<code_context>
+class TestHostAppMessageConsumerMetrics:
</code_context>

<issue_to_address>
**suggestion (testing):** Metrics tests are good; consider adding one that asserts `post_process_rows` triggers the success logger for host-app batches.

One gap is testing `post_process_rows`, which should call `log_host_app_data_upsert_via_mq` after a batch commit. Consider adding a test that:

- Processes one or more valid host-app messages with `HostAppMessageConsumer`
- Calls `post_process_rows()` (with `log_host_app_data_upsert_via_mq` patched to avoid relying on logging internals)
- Asserts the success logger is called with the expected `application`, `org_id`, and host IDs

This will confirm that batching, `processed_rows`, and success logging are correctly wired end to end.

Suggested implementation:

```python
class TestHostAppMessageConsumerMetrics:
    """Test that metrics are properly recorded."""

    @patch("app.queue.host_mq.metrics")
    def test_metrics_on_success(self, mock_metrics, host_app_consumer, db_create_host):
        """Test that success metrics are incremented."""
        host = db_create_host()
        org_id = host.org_id
        host_id = str(host.id)

        advisor_data = {"recommendations": 5, "incidents": 2}

    @patch("app.queue.host_mq.log_host_app_data_upsert_via_mq")
    def test_post_process_rows_logs_success_for_host_app_batches(
        self, mock_log_upsert, host_app_consumer, db_create_host
    ):
        """post_process_rows should trigger the success logger with application, org_id and host IDs."""
        # Arrange: create a host and a valid host-app message
        host = db_create_host()
        org_id = host.org_id
        host_id = str(host.id)

        # This payload should be consistent with what HostAppMessageConsumer expects
        # for a successful host-app upsert. We only care that it is treated as a
        # successful message so that post_process_rows processes it.
        advisor_data = {"recommendations": 3, "incidents": 1}
        message = {
            "operation": "update",
            "host": {
                "id": host_id,
                "org_id": org_id,
            },
            "application": "advisor",
            "data": advisor_data,
        }

        headers = [
            ("application", b"advisor"),
            ("request_id", generate_uuid().encode("utf-8")),
        ]

        # Act: process the message and then trigger post_process_rows
        host_app_consumer.handle_message(json.dumps(message), headers=headers)
        host_app_consumer.post_process_rows()

        # Assert: log_host_app_data_upsert_via_mq is called with the expected
        # application, org_id, and host IDs. We support both positional and
        # keyword-style calls to avoid depending on the exact signature.
        mock_log_upsert.assert_called_once()
        call = mock_log_upsert.call_args
        args, kwargs = call

        # Extract application, org_id and host_ids, regardless of how they're passed
        application = kwargs.get("application") if "application" in kwargs else args[0]
        logged_org_id = kwargs.get("org_id") if "org_id" in kwargs else args[1]
        host_ids = kwargs.get("host_ids") if "host_ids" in kwargs else args[2]

        assert application == "advisor"
        assert logged_org_id == org_id
        # host_ids may be a list/tuple; normalize before asserting
        assert host_id in list(host_ids)

```

If `HostAppMessageConsumer` expects a different message shape (for example, wrapping host/app fields inside a `value` or `payload` key, or using a different `operation` or `application` name), adjust the `message` dict in the new test to match the existing tests in this file.

If `log_host_app_data_upsert_via_mq` has a different import path or argument ordering, update the `@patch("app.queue.host_mq.log_host_app_data_upsert_via_mq")` path and the extraction of `application`, `org_id` and `host_ids` from `call_args` accordingly.
</issue_to_address>

### Comment 5
<location> `app/queue/host_mq.py:563` </location>
<code_context>
+class HostAppMessageConsumer(HBIMessageConsumerBase):
+    """Consumer for host application data from downstream services (Advisor, Vulnerability, Patch, etc.)."""
+
+    # Mapping of application names to their corresponding model and schema classes
+    APPLICATION_MAP = {
+        "advisor": ("HostAppDataAdvisor", "AdvisorDataSchema"),
</code_context>

<issue_to_address>
**issue (complexity):** Consider mapping applications directly to their model and schema classes instead of class-name strings to simplify `HostAppMessageConsumer.APPLICATION_MAP` and `_get_app_classes`.

You can simplify the `HostAppMessageConsumer.APPLICATION_MAP` and `_get_app_classes` without losing any functionality, and it will make the code easier to read and safer (no string-based indirection).

Instead of storing class **names** and doing `getattr` at runtime:

```python
class HostAppMessageConsumer(HBIMessageConsumerBase):
    APPLICATION_MAP = {
        "advisor": ("HostAppDataAdvisor", "AdvisorDataSchema"),
        "vulnerability": ("HostAppDataVulnerability", "VulnerabilityDataSchema"),
        "patch": ("HostAppDataPatch", "PatchDataSchema"),
        "remediations": ("HostAppDataRemediations", "RemediationsDataSchema"),
        "compliance": ("HostAppDataCompliance", "ComplianceDataSchema"),
        "malware": ("HostAppDataMalware", "MalwareDataSchema"),
        "image_builder": ("HostAppDataImageBuilder", "ImageBuilderDataSchema"),
    }

    def _get_app_classes(self, application: str) -> tuple[type, type]:
        model_class_name, schema_class_name = self.APPLICATION_MAP[application]
        model_class = getattr(host_app_data, model_class_name)
        schema_class = getattr(model_schemas, schema_class_name)
        return model_class, schema_class
```

you can map directly to the classes:

```python
from app.models import host_app_data
from app.models import schemas as model_schemas

class HostAppMessageConsumer(HBIMessageConsumerBase):
    APPLICATION_MAP = {
        "advisor": (host_app_data.HostAppDataAdvisor, model_schemas.AdvisorDataSchema),
        "vulnerability": (host_app_data.HostAppDataVulnerability, model_schemas.VulnerabilityDataSchema),
        "patch": (host_app_data.HostAppDataPatch, model_schemas.PatchDataSchema),
        "remediations": (host_app_data.HostAppDataRemediations, model_schemas.RemediationsDataSchema),
        "compliance": (host_app_data.HostAppDataCompliance, model_schemas.ComplianceDataSchema),
        "malware": (host_app_data.HostAppDataMalware, model_schemas.MalwareDataSchema),
        "image_builder": (host_app_data.HostAppDataImageBuilder, model_schemas.ImageBuilderDataSchema),
    }

    def _get_app_classes(self, application: str) -> tuple[type, type]:
        return self.APPLICATION_MAP[application]
```

If import cycles are a concern, you can still avoid `getattr` by moving the imports inside `_get_app_classes` but returning real classes:

```python
def _get_app_classes(self, application: str) -> tuple[type, type]:
    from app.models import host_app_data
    from app.models import schemas as model_schemas

    application_map = {
        "advisor": (host_app_data.HostAppDataAdvisor, model_schemas.AdvisorDataSchema),
        # ...
    }
    return application_map[application]
```

Benefits:

* Eliminates dynamic `getattr` lookups and late failures due to typos.
* Makes the mapping self-documenting and friendlier to static analysis/IDE refactoring.
* Keeps all current functionality and the `APPLICATION_MAP` abstraction intact.
</issue_to_address>

### Comment 6
<location> `tests/test_host_app_mq_service.py:120` </location>
<code_context>
    def test_advisor_upsert_update_record(self, host_app_consumer, db_create_host):
        """Test updating existing Advisor data."""
        host = db_create_host()
        org_id = host.org_id
        host_id = str(host.id)

        # Insert initial data
        initial_data = {"recommendations": 5, "incidents": 2}
        message1 = create_host_app_message(org_id=org_id, host_id=host_id, data=initial_data)
        headers = [("application", b"advisor"), ("request_id", generate_uuid().encode("utf-8"))]
        host_app_consumer.handle_message(json.dumps(message1), headers=headers)

        # Update with new data
        updated_data = {"recommendations": 10, "incidents": 3}
        message2 = create_host_app_message(org_id=org_id, host_id=host_id, data=updated_data)
        headers2 = [("application", b"advisor"), ("request_id", generate_uuid().encode("utf-8"))]
        host_app_consumer.handle_message(json.dumps(message2), headers=headers2)

        # Verify the record was updated
        app_data = db.session.query(HostAppDataAdvisor).filter_by(org_id=org_id, host_id=host.id).first()

        assert app_data is not None
        assert app_data.recommendations == 10
        assert app_data.incidents == 3

</code_context>

<issue_to_address>
**issue (code-quality):** We've found these issues:

- Move assignment closer to its usage within a block ([`move-assign-in-block`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/move-assign-in-block/))
- Extract duplicate code into method ([`extract-duplicate-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-duplicate-method/))
</issue_to_address>

### Comment 7
<location> `tests/test_host_app_mq_service.py:575` </location>
<code_context>
    def test_multiple_hosts_partial_validation_failure(self, host_app_consumer, db_create_host):
        """Test that when one host has invalid data, others are still processed."""
        host1 = db_create_host()
        org_id = host1.org_id
        host2 = db_create_host(extra_data={"org_id": org_id})
        host3 = db_create_host(extra_data={"org_id": org_id})

        # Create message with 3 hosts: 1 invalid, 2 valid
        message = {
            "org_id": org_id,
            "timestamp": datetime.now(UTC).isoformat(),
            "hosts": [
                {"id": str(host1.id), "data": {"recommendations": 5, "incidents": 2}},  # Valid
                {"id": str(host2.id), "data": {"recommendations": "invalid", "incidents": 1}},  # Invalid
                {"id": str(host3.id), "data": {"recommendations": 10, "incidents": 3}},  # Valid
            ],
        }

        headers = [("application", b"advisor"), ("request_id", generate_uuid().encode("utf-8"))]
        with patch("app.queue.host_mq.metrics.host_app_data_validation_failure") as mock_metric:
            host_app_consumer.handle_message(json.dumps(message), headers=headers)

            # Should have incremented validation failure metric once (for host2)
            mock_metric.labels.assert_called_with(application="advisor", reason="invalid_host_data")
            mock_metric.labels.return_value.inc.assert_called_once()

        # host1 and host3 should be in database
        app_data1 = db.session.query(HostAppDataAdvisor).filter_by(org_id=org_id, host_id=host1.id).first()
        assert app_data1 is not None
        assert app_data1.recommendations == 5

        # host2 should not be in database
        app_data2 = db.session.query(HostAppDataAdvisor).filter_by(org_id=org_id, host_id=host2.id).first()
        assert app_data2 is None

        # host3 should be in database
        app_data3 = db.session.query(HostAppDataAdvisor).filter_by(org_id=org_id, host_id=host3.id).first()
        assert app_data3 is not None
        assert app_data3.recommendations == 10

</code_context>

<issue_to_address>
**issue (code-quality):** Extract duplicate code into method ([`extract-duplicate-method`](https://docs.sourcery.ai/Reference/Default-Rules/refactorings/extract-duplicate-method/))
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@rodrigonull rodrigonull force-pushed the RHINENG-22193 branch 4 times, most recently from 1b72556 to e2f0d10 Compare November 27, 2025 11:14
Copy link
Collaborator

@kruai kruai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had some suggestions; we also need a new deployment (and env vars) added to clowdapp.yml for this code to actually deploy.
In addition, the new tests seem like they should be refactored, since they all do very similar things. I've opened #3228 against this PR with a suggested refactor.

Comment on lines +635 to +642
return OperationResult(
None,
None,
None,
None,
None,
partial(log_host_app_data_upsert_via_mq, logger, application, org_id, []),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to raise an exception here to indicate that no matching hosts were found (or that none of the data was valid). But I guess if the goal is simply to log that the batch was processed and no hosts were updated, this might be OK.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, the goal is just to log the message.

app/config.py Outdated

self.host_ingress_consumer_group = os.environ.get("KAFKA_HOST_INGRESS_GROUP", "inventory-mq")
self.inv_export_service_consumer_group = os.environ.get("KAFKA_EXPORT_SERVICE_GROUP", "inv-export-service")
self.host_app_data_consumer_group = os.environ.get("KAFKA_HOST_APP_DATA_GROUP", "inventory-views")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like you're using this value anywhere besides the below log. You'll need to set this on the consumer in order for the correct group to be used. You can either:

  • Create the consumer separately, like inv_export_service does
  • Reuse the KAFKA_HOST_INGRESS_GROUP var, and just set it to a different value in clowdapp.yml for this deployment

Assuming we plan to have a separate deployment for these host_app_data consumer pods, I'd go with the second option.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Let's use the second option.

Comment on lines 25 to 30
@pytest.fixture
def host_app_consumer(flask_app, event_producer):
"""Fixture to create HostAppMessageConsumer for testing."""
fake_consumer = MagicMock()
fake_notification_event_producer = MagicMock()
return HostAppMessageConsumer(fake_consumer, flask_app, event_producer, fake_notification_event_producer)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend putting this in mq_fixtures.py with the other consumer mocks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants