diff --git a/ros/lib/unleash.py b/ros/lib/unleash.py index b8a5def5..ce2a53a5 100644 --- a/ros/lib/unleash.py +++ b/ros/lib/unleash.py @@ -16,3 +16,15 @@ unleash_client.initialize_client() logger.info(f"Unleash client initialized: {unleash_client.is_initialized}") + + +def is_feature_flag_enabled(org_id, flag_name, service_name): + context = {"userId": org_id} + is_enabled = unleash_client.is_enabled(flag_name, context) + + logger.debug( + f"{service_name} - Feature flag {flag_name} is {'enabled' if is_enabled else 'disabled'} " + f"for org_id {org_id}" + ) + + return is_enabled diff --git a/ros/processor/insights_engine_consumer.py b/ros/processor/insights_engine_consumer.py index fecb47bb..5c167bed 100644 --- a/ros/processor/insights_engine_consumer.py +++ b/ros/processor/insights_engine_consumer.py @@ -10,7 +10,8 @@ METRICS_PORT, get_logger, CACHE_KEYWORD_FOR_DELETED_SYSTEM, - GROUP_ID + GROUP_ID, + UNLEASH_ROS_V2_FLAG ) from ros.processor.process_archive import get_performance_profile from ros.processor.notification_event_producer import new_suggestion_event @@ -26,6 +27,7 @@ from ros.processor.metrics import (processor_requests_success, processor_requests_failures, kafka_failures) +from ros.lib.unleash import is_feature_flag_enabled LOG = get_logger(__name__) @@ -70,6 +72,11 @@ def run(self): raise KafkaException(msg.error()) try: msg = json.loads(msg.value().decode("utf-8")) + + org_id = msg["input"]["platform_metadata"].get('org_id') + if is_feature_flag_enabled(org_id, UNLEASH_ROS_V2_FLAG, self.prefix): + continue + self.handle_msg(msg) except json.decoder.JSONDecodeError: kafka_failures.labels(reporter=self.reporter).inc() diff --git a/ros/processor/inventory_events_consumer.py b/ros/processor/inventory_events_consumer.py index 87d8d153..9fb40602 100644 --- a/ros/processor/inventory_events_consumer.py +++ b/ros/processor/inventory_events_consumer.py @@ -14,12 +14,14 @@ get_logger, CACHE_TIMEOUT_FOR_DELETED_SYSTEM, CACHE_KEYWORD_FOR_DELETED_SYSTEM, - GROUP_ID + GROUP_ID, + UNLEASH_ROS_V2_FLAG ) from ros.lib.cw_logging import commence_cw_log_streaming, threadctx from ros.processor.metrics import (processor_requests_success, processor_requests_failures, kafka_failures) +from ros.lib.unleash import is_feature_flag_enabled LOG = get_logger(__name__) @@ -82,6 +84,9 @@ def run(self): threadctx.account = account threadctx.org_id = org_id + if is_feature_flag_enabled(org_id, UNLEASH_ROS_V2_FLAG, self.prefix): + continue + if event_type in self.event_type_map.keys(): handler = self.event_type_map[event_type] handler(msg)