Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ros/lib/unleash.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,15 @@

unleash_client.initialize_client()
logger.info(f"Unleash client initialized: {unleash_client.is_initialized}")


def is_feature_flag_enabled(org_id, flag_name, service_name):
context = {"userId": org_id}
is_enabled = unleash_client.is_enabled(flag_name, context)

logger.debug(
f"{service_name} - Feature flag {flag_name} is {'enabled' if is_enabled else 'disabled'} "
f"for org_id {org_id}"
)

return is_enabled
9 changes: 8 additions & 1 deletion ros/processor/insights_engine_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
METRICS_PORT,
get_logger,
CACHE_KEYWORD_FOR_DELETED_SYSTEM,
GROUP_ID
GROUP_ID,
UNLEASH_ROS_V2_FLAG
)
from ros.processor.process_archive import get_performance_profile
from ros.processor.notification_event_producer import new_suggestion_event
Expand All @@ -26,6 +27,7 @@
from ros.processor.metrics import (processor_requests_success,
processor_requests_failures,
kafka_failures)
from ros.lib.unleash import is_feature_flag_enabled

LOG = get_logger(__name__)

Expand Down Expand Up @@ -70,6 +72,11 @@ def run(self):
raise KafkaException(msg.error())
try:
msg = json.loads(msg.value().decode("utf-8"))

org_id = msg["input"]["platform_metadata"].get('org_id')
if is_feature_flag_enabled(org_id, UNLEASH_ROS_V2_FLAG, self.prefix):
continue

self.handle_msg(msg)
except json.decoder.JSONDecodeError:
kafka_failures.labels(reporter=self.reporter).inc()
Expand Down
7 changes: 6 additions & 1 deletion ros/processor/inventory_events_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
get_logger,
CACHE_TIMEOUT_FOR_DELETED_SYSTEM,
CACHE_KEYWORD_FOR_DELETED_SYSTEM,
GROUP_ID
GROUP_ID,
UNLEASH_ROS_V2_FLAG
)
from ros.lib.cw_logging import commence_cw_log_streaming, threadctx
from ros.processor.metrics import (processor_requests_success,
processor_requests_failures,
kafka_failures)
from ros.lib.unleash import is_feature_flag_enabled

LOG = get_logger(__name__)

Expand Down Expand Up @@ -82,6 +84,9 @@ def run(self):
threadctx.account = account
threadctx.org_id = org_id

if is_feature_flag_enabled(org_id, UNLEASH_ROS_V2_FLAG, self.prefix):
continue

if event_type in self.event_type_map.keys():
handler = self.event_type_map[event_type]
handler(msg)
Expand Down
Loading