Skip to content

Commit bd87879

Browse files
Merge pull request #35 from gnanarepo/ujjawal_core
druva splits handling
2 parents b0066e2 + ece00be commit bd87879

File tree

15 files changed

+448
-60
lines changed

15 files changed

+448
-60
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,4 @@ USER appuser
6060

6161
EXPOSE 8000
6262

63-
CMD ["gunicorn", "aviso_core.wsgi:application", "--bind", "0.0.0.0:8000", "--workers", "1", "--timeout", "900"]
63+
CMD ["gunicorn", "aviso_core.wsgi:application", "--bind", "0.0.0.0:8000", "--workers", "3", "--timeout", "900"]

gbm_apis/api/data_load.py

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import json
12
import os
23

34
from aviso.settings import sec_context
45
from aviso.utils.dateUtils import TimeHorizon
6+
from aviso.utils import is_true
57

68
import logging
79
import collections
@@ -73,7 +75,9 @@ def __init__(
7375
period,
7476
run_type='chipotle',
7577
from_timestamp=0,
76-
changed_fields_only=False):
78+
changed_fields_only=False,
79+
return_oppids_only=False):
80+
7781
self.id_list = id_list
7882
self.from_timestamp = from_timestamp
7983
self.tenant_name = tenant_name
@@ -84,6 +88,7 @@ def __init__(
8488
self.changed_fields_only = changed_fields_only
8589
self.run_type = run_type
8690
self.period = period
91+
self.return_oppids_only = return_oppids_only
8792

8893
def get_basic_results(self):
8994
ds = Dataset.getByNameAndStage('OppDS', None)
@@ -113,7 +118,27 @@ def get_basic_results(self):
113118
deals = list(coll.find(criteria, {'_id': 0}))
114119

115120
logger.info(f"got result length of deals: {len(deals)}")
116-
121+
122+
if self.return_oppids_only:
123+
try:
124+
#opp_ids_records = sec_context.etl.uip('UIPIterator', dataset='OppDS', record_filter=criteria, fields_requested=['ID'])
125+
if deals:
126+
oppids_list = [deal['object']['extid'] for deal in deals]
127+
else:
128+
oppids_list = []
129+
logger.info("Number of oppids fetched: %s", len(oppids_list))
130+
131+
return {
132+
'oppids': oppids_list
133+
}
134+
except Exception as e:
135+
logger.error("Exception occurred while fetching opp_ids_records: %s", str(e))
136+
error_response = {
137+
"success": False,
138+
"error": str(e)
139+
}
140+
return error_response
141+
117142
final_deals = []
118143
from_timestamp_xl = (self.from_timestamp / 1000.0) / 86400 + 25569
119144
for deal in deals:
@@ -320,12 +345,23 @@ def revenue_schedule(self):
320345
expiration_date_fld = rev_schedule_config.get('expiration_date_fld', 'ExpirationDate')
321346
type_fld = rev_schedule_config.get('type_fld', 'Type')
322347
renewal_vals = rev_schedule_config.get('renewal_vals', ['Renewal'])
348+
349+
viewgen_config = oppds.models['common'].config.get('viewgen_config', {})
350+
splitted_fields = []
351+
if viewgen_config['split_config']:
352+
split_config = viewgen_config['split_config'][list(viewgen_config['split_config'])[0]]
353+
for _, dtls in split_config.items():
354+
if type(dtls) != list:
355+
dtls = [dtls]
356+
for element in dtls:
357+
splitted_fields += element['num_fields']
358+
323359
basic_results_dict_copy = deepcopy(basic_results_dict)
324360
basic_results_dict = {}
325361
for opp_id, res in basic_results_dict_copy.items():
326362
output_dict = generate_expiration_date_renewal_rec(rev_period, renewal_drilldown, close_date_fld,
327363
expiration_date_fld, type_fld, renewal_vals,
328-
opp_id, res)
364+
opp_id, res, splitted_fields)
329365
basic_results_dict.update(output_dict)
330366
else:
331367
basic_results_dict = generate_revenue_recs(rev_period, drilldown, close_date_fld, basic_results_dict)
@@ -414,6 +450,8 @@ def get(self, request, *args, **kwargs):
414450
changed_fields_param = request.GET.get('changed_fields_only', 'false')
415451
changed_fields_only = changed_fields_param.lower() == 'true'
416452

453+
return_oppids_only = is_true(request.GET.get('return_oppids_only', False))
454+
417455
if not all([tenant_name, stack]):
418456
return JsonResponse(
419457
{"error": "Missing required fields (tenant_name, stack)"},
@@ -433,12 +471,16 @@ def get(self, request, *args, **kwargs):
433471
period=period,
434472
run_type=run_type,
435473
from_timestamp=from_timestamp,
436-
changed_fields_only=changed_fields_only
474+
changed_fields_only=changed_fields_only,
475+
return_oppids_only=return_oppids_only
437476
)
438477

439478
# 5. Get Basic Results
440479
basic_results = loader.get_basic_results()
441480

481+
if return_oppids_only:
482+
return JsonResponse(basic_results, safe=False, status=200)
483+
442484
# 6. Initialize RevenueSchedule
443485
scheduler = RevenueSchedule(period=period, basic_results=basic_results)
444486
final_results = scheduler.revenue_schedule()

gbm_apis/api/drilldown_fields_v2.py

Lines changed: 105 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import itertools
2-
import json
2+
import json, os
33
import logging
44

55
from collections import defaultdict
@@ -17,6 +17,9 @@
1717
from django.utils.decorators import method_decorator
1818
from django.views.decorators.csrf import csrf_exempt
1919

20+
from aviso.framework.tenant_mongo_resolver import TenantMongoResolver
21+
from gbm_apis.deal_result.result_Utils import deals_results_by_period
22+
2023
logger = logging.getLogger("aviso-core.%s" % __name__)
2124

2225

@@ -177,9 +180,28 @@ def build_global_territory_owners(groups: Iterable[Iterable[str]], period: Optio
177180
logger.warning("sec_context.etl not available; returning empty drilldown set")
178181
return {}
179182

180-
records = sec_context.etl.uip(
181-
"UIPIterator", dataset="OppDS", record_filter=record_filter, fields_requested=list(required_fields)
182-
)
183+
# records = sec_context.etl.uip(
184+
# "UIPIterator", dataset="OppDS", record_filter=record_filter, fields_requested=list(required_fields),
185+
# skip_prepare=True
186+
# )
187+
188+
cname = os.environ.get("CNAME", "preprod")
189+
etl_db = TenantMongoResolver().ms_connection_mongo_client_db(tenant=sec_context.name, db_type='etl', cname=cname)
190+
191+
coll = etl_db[sec_context.name + '.OppDS._uip._data']
192+
193+
projection = {
194+
"object.terminal_date": 1
195+
}
196+
197+
for field in required_fields:
198+
projection[f"object.history.{field}"] = 1
199+
200+
records = coll.find(
201+
record_filter,
202+
projection=projection,
203+
no_cursor_timeout=True
204+
).batch_size(50000)
183205

184206
hierarchy = _fetch_raw_hierarchy()
185207
parents = {}
@@ -229,36 +251,79 @@ def build_global_territory_owners(groups: Iterable[Iterable[str]], period: Optio
229251
processed_count = 0
230252
skipped_count = 0
231253

254+
# for record in records:
255+
# #print("Processing record:", record)
256+
# # raise
257+
# feat_map = record['object']['history']
258+
# #print("Feature map keys:", feat_map.keys())
259+
# # raise
260+
# if territory_key not in feat_map:
261+
# skipped_count += 1
262+
# continue
263+
264+
# territory_data = feat_map[territory_key]
265+
# if not territory_data:
266+
# skipped_count += 1
267+
# continue
268+
269+
# #print("Territory data:", territory_data)
270+
# leaf_node = territory_data[-1][1]
271+
# if leaf_node not in global_children_set:
272+
# skipped_count += 1
273+
# continue
274+
275+
# processed_count += 1
276+
# for group_name, field_mappings in group_configs:
277+
# current_row = []
278+
# for fld_label, map_key in field_mappings:
279+
# if map_key in feat_map:
280+
# field_data = feat_map[map_key]
281+
# if field_data:
282+
# current_row.append([fld_label, field_data[-1][1]])
283+
# else:
284+
# current_row.append([fld_label, "N/A"])
285+
# else:
286+
# current_row.append([fld_label, "N/A"])
287+
# staging[group_name][leaf_node].add(tuple(tuple(pair) for pair in current_row))
288+
232289
for record in records:
233-
feat_map = record.featMap
234-
if territory_key not in feat_map:
290+
obj = record.get("object")
291+
if not obj:
292+
skipped_count += 1
293+
continue
294+
295+
history = obj.get("history")
296+
if not history:
235297
skipped_count += 1
236298
continue
237299

238-
territory_data = feat_map[territory_key]
300+
# Get territory data (single lookup only)
301+
territory_data = history.get(territory_key)
239302
if not territory_data:
240303
skipped_count += 1
241304
continue
242305

306+
# Get leaf node
243307
leaf_node = territory_data[-1][1]
308+
244309
if leaf_node not in global_children_set:
245310
skipped_count += 1
246311
continue
247312

248313
processed_count += 1
314+
315+
# Process groups
249316
for group_name, field_mappings in group_configs:
250-
current_row = []
317+
row = []
318+
251319
for fld_label, map_key in field_mappings:
252-
if map_key in feat_map:
253-
field_data = feat_map[map_key]
254-
if field_data:
255-
current_row.append([fld_label, field_data[-1][1]])
256-
else:
257-
current_row.append([fld_label, "N/A"])
258-
else:
259-
current_row.append([fld_label, "N/A"])
260-
staging[group_name][leaf_node].add(tuple(tuple(pair) for pair in current_row))
320+
field_data = history.get(map_key)
321+
value = field_data[-1][1] if field_data else "N/A"
322+
row.append((fld_label, value)) # use tuple directly
261323

324+
staging[group_name][leaf_node].add(tuple(row))
325+
326+
262327
logger.info(
263328
"build_global_territory_owners: processed %s records, skipped %s records",
264329
processed_count,
@@ -302,32 +367,32 @@ def drilldown_values_by_owner_v2(drilldown: str, periods: Iterable[str]):
302367
return output
303368

304369

305-
def deals_results_by_period(periods: Iterable[str]) -> Dict[str, Dict[str, Any]]:
306-
"""
307-
Lightweight helper to fetch deal results for the requested periods.
308-
"""
309-
results: Dict[str, Dict[str, Any]] = {}
310-
if sec_context and getattr(sec_context, "gbm", None):
311-
gbm_client = sec_context.gbm
312-
for period in periods:
313-
try:
314-
url = f"/gbm/deals_results?period={period}"
315-
results[period] = gbm_client.api(url, None) or {"results": {}}
316-
except Exception as exc:
317-
logger.exception("Failed to fetch deals results for %s", period, exc_info=exc)
318-
results[period] = {"results": {}}
319-
else:
320-
logger.warning("GBM client not configured; returning empty results for periods %s", list(periods))
321-
for period in periods:
322-
results[period] = {"results": {}}
323-
return results
324-
325-
326-
def drilldown_values_by_period(periods: Iterable[str], groups: Iterable[Iterable[str]]):
370+
# def deals_results_by_period(periods: Iterable[str]) -> Dict[str, Dict[str, Any]]:
371+
# """
372+
# Lightweight helper to fetch deal results for the requested periods.
373+
# """
374+
# results: Dict[str, Dict[str, Any]] = {}
375+
# if sec_context and getattr(sec_context, "gbm", None):
376+
# gbm_client = sec_context.gbm
377+
# for period in periods:
378+
# try:
379+
# url = f"/gbm/deals_results?period={period}"
380+
# results[period] = gbm_client.api(url, None) or {"results": {}}
381+
# except Exception as exc:
382+
# logger.exception("Failed to fetch deals results for %s", period, exc_info=exc)
383+
# results[period] = {"results": {}}
384+
# else:
385+
# logger.warning("GBM client not configured; returning empty results for periods %s", list(periods))
386+
# for period in periods:
387+
# results[period] = {"results": {}}
388+
# return results
389+
390+
391+
def drilldown_values_by_period(periods, groups,):
327392
"""
328393
Given a set of drilldown groups, return all possible combos of those field values.
329394
"""
330-
output: Dict[str, Dict[str, List[List[tuple]]]] = {}
395+
output = {}
331396
curr_period = current_period().mnemonic
332397

333398
for period in periods:

gbm_apis/data_load/splitter_service.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ def create_splits(self, record):
8383
output[field][split_id] = self.try_num_val(record[field], ratio)
8484
for field in prim_str_fields:
8585
output[field][split_id] = self.try_str_val(record[field], split_id)
86+
if field + '_ratios' not in output:
87+
output[field + '_ratios'] = {}
88+
output[field + '_ratios'][split_id] = ratio
8689
# for field in set(output) & self.ignore_fields:
8790
# output[field][split_id] = record[field]
8891

@@ -98,6 +101,9 @@ def create_splits(self, record):
98101
output[field][split_id] = self.try_num_val(record[field], ratio)
99102
for field in o_str_fields:
100103
output[field][split_id] = self.try_num_ratio(record[field], split_id)
104+
if field + '_ratios' not in output:
105+
output[field + '_ratios'] = {}
106+
output[field + '_ratios'][split_id] = ratio
101107
return output, split_ids
102108

103109
def try_num_val(self, val, ratio):

0 commit comments

Comments
 (0)