File tree Expand file tree Collapse file tree 3 files changed +26
-2
lines changed Expand file tree Collapse file tree 3 files changed +26
-2
lines changed Original file line number Diff line number Diff line change 1616
1717unleash_client .initialize_client ()
1818logger .info (f"Unleash client initialized: { unleash_client .is_initialized } " )
19+
20+
21+ def is_feature_flag_enabled (org_id , flag_name , service_name ):
22+ context = {"userId" : org_id }
23+ is_enabled = unleash_client .is_enabled (flag_name , context )
24+
25+ logger .debug (
26+ f"{ service_name } - Feature flag { flag_name } is { 'enabled' if is_enabled else 'disabled' } "
27+ f"for org_id { org_id } "
28+ )
29+
30+ return is_enabled
Original file line number Diff line number Diff line change 1010 METRICS_PORT ,
1111 get_logger ,
1212 CACHE_KEYWORD_FOR_DELETED_SYSTEM ,
13- GROUP_ID
13+ GROUP_ID ,
14+ UNLEASH_ROS_V2_FLAG
1415)
1516from ros .processor .process_archive import get_performance_profile
1617from ros .processor .notification_event_producer import new_suggestion_event
2627from ros .processor .metrics import (processor_requests_success ,
2728 processor_requests_failures ,
2829 kafka_failures )
30+ from ros .lib .unleash import is_feature_flag_enabled
2931
3032LOG = get_logger (__name__ )
3133
@@ -70,6 +72,11 @@ def run(self):
7072 raise KafkaException (msg .error ())
7173 try :
7274 msg = json .loads (msg .value ().decode ("utf-8" ))
75+
76+ org_id = msg ["input" ]["platform_metadata" ].get ('org_id' )
77+ if is_feature_flag_enabled (org_id , UNLEASH_ROS_V2_FLAG , self .prefix ):
78+ continue
79+
7380 self .handle_msg (msg )
7481 except json .decoder .JSONDecodeError :
7582 kafka_failures .labels (reporter = self .reporter ).inc ()
Original file line number Diff line number Diff line change 1414 get_logger ,
1515 CACHE_TIMEOUT_FOR_DELETED_SYSTEM ,
1616 CACHE_KEYWORD_FOR_DELETED_SYSTEM ,
17- GROUP_ID
17+ GROUP_ID ,
18+ UNLEASH_ROS_V2_FLAG
1819)
1920from ros .lib .cw_logging import commence_cw_log_streaming , threadctx
2021from ros .processor .metrics import (processor_requests_success ,
2122 processor_requests_failures ,
2223 kafka_failures )
24+ from ros .lib .unleash import is_feature_flag_enabled
2325
2426LOG = get_logger (__name__ )
2527
@@ -82,6 +84,9 @@ def run(self):
8284 threadctx .account = account
8385 threadctx .org_id = org_id
8486
87+ if is_feature_flag_enabled (org_id , UNLEASH_ROS_V2_FLAG , self .prefix ):
88+ continue
89+
8590 if event_type in self .event_type_map .keys ():
8691 handler = self .event_type_map [event_type ]
8792 handler (msg )
You can’t perform that action at this time.
0 commit comments