Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion diworker/diworker/importers/aws.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python
import csv
import logging
import time
import math
import os
import re
Expand All @@ -18,6 +19,8 @@
import pyarrow.parquet as pq

LOG = logging.getLogger(__name__)
# File is large; silence pylint too-many-lines warning until refactored.
# pylint: disable=too-many-lines
CHUNK_SIZE = 200
IGNORE_EXPENSE_TYPES = ['Credit']
RI_PLATFORMS = [
Expand Down Expand Up @@ -436,8 +439,10 @@ def _convert_to_legacy_csv_columns(self, columns, dict_format=False):

def load_csv_report(self, report_path, account_id_ca_id_map,
billing_period, skipped_accounts):
parse_started_at = time.time()
date_start = opttime.utcnow()
with open(report_path, newline='') as csvfile:
imported_rows = 0
with open(report_path, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
reader.fieldnames = self._convert_to_legacy_csv_columns(
reader.fieldnames)
Expand Down Expand Up @@ -498,18 +503,26 @@ def load_csv_report(self, report_path, account_id_ca_id_map,
self._set_resource_id(row)
row['created_at'] = self.import_start_ts
chunk.append(row)
imported_rows += 1

if chunk:
self.update_raw_records(chunk)
LOG.info(
'Parse phase (CSV): %s raw rows, %s imported rows from %s in %.2fs',
record_number, imported_rows, report_path,
time.time() - parse_started_at
)
return billing_period, skipped_accounts

def load_parquet_report(self, report_path, account_id_ca_id_map,
billing_period, skipped_accounts):
parse_started_at = time.time()
date_start = opttime.utcnow()
dataframe = pq.read_pandas(report_path).to_pandas()
new_columns = self._convert_to_legacy_csv_columns(
dataframe.columns, dict_format=True)
dataframe.rename(columns=new_columns, inplace=True)
imported_rows = 0
for i in range(0, dataframe.shape[0], CHUNK_SIZE):
expense_chunk = self._extract_nested_objects(
dataframe.iloc[i:i + CHUNK_SIZE, :].to_dict(), parquet=True)
Expand Down Expand Up @@ -576,10 +589,15 @@ def load_parquet_report(self, report_path, account_id_ca_id_map,
self._set_resource_id(expense)
if expenses:
self.update_raw_records(expenses)
imported_rows += len(expenses)
now = opttime.utcnow()
if (now - date_start).total_seconds() > 60:
LOG.info('report %s: processed %s rows', report_path, i)
date_start = now
LOG.info(
'Parse phase (Parquet): %s imported rows from %s in %.2fs',
imported_rows, report_path, time.time() - parse_started_at
)
return billing_period, skipped_accounts

def collect_tags(self, expense):
Expand Down
12 changes: 12 additions & 0 deletions diworker/diworker/importers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def get_raw_upsert_filters(self, expense):
for f in self.get_unique_field_list()}

def update_raw_records(self, chunk):
phase_started_at = time.time()
update_fields = self.get_update_fields()
upsert_bulk = []
for e in chunk:
Expand All @@ -119,6 +120,10 @@ def update_raw_records(self, chunk):
))
r = retry_mongo_upsert(self.mongo_raw.bulk_write, upsert_bulk)
LOG.debug('updated: %s', r.bulk_api_result)
LOG.info(
'Write phase: upserted %s raw records in %.2fs',
len(chunk), time.time() - phase_started_at
)

@staticmethod
def _get_fake_cad_extras(expense):
Expand Down Expand Up @@ -647,6 +652,8 @@ def _get_legacy_key(self, old_key):
return

def _download_report_files(self, current_reports, last_import_modified_at):
download_started_at = time.time()
downloaded = 0
for date, reports in current_reports.items():
for report in reports:
if last_import_modified_at < report['LastModified']:
Expand All @@ -665,6 +672,11 @@ def _download_report_files(self, current_reports, last_import_modified_at):
self.cloud_adapter.download_report_file(report['Key'],
f_report)
self.report_files[date].append(target_path)
downloaded += 1
LOG.info(
'Download phase: downloaded %s report files in %.2fs',
downloaded, time.time() - download_started_at
)
return last_import_modified_at

@staticmethod
Expand Down
75 changes: 41 additions & 34 deletions diworker/diworker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,17 @@ def __init__(self, connection, rabbitmq_conn_str, diworker_settings,
)

def heartbeat(self, config_params):
config_cl = self.get_config_cl(config_params)
rest_cl = self.get_rest_cl(config_cl)
config_client = self.get_config_cl(config_params)
rest_cl = self.get_rest_cl(config_client)
while self.running:
with self.active_reports_lock:
report_import_ids = list(self.active_report_import_ids)
for report_import_id in report_import_ids:
try:
rest_cl.report_import_update(report_import_id, {})
except Exception as e:
LOG.warning("Heartbeat update failed for %s: %s", report_import_id, e)
except Exception as exc: # pylint: disable=broad-except
LOG.warning("Heartbeat update failed for %s: %s",
report_import_id, exc)
time.sleep(HEARTBEAT_INTERVAL)
rest_cl.close()

Expand All @@ -82,20 +83,20 @@ def get_config_cl(config_params):
return ConfigClient(**config_params)

@staticmethod
def get_rest_cl(config_cl):
url = config_cl.restapi_url()
secret = config_cl.cluster_secret()
def get_rest_cl(config_client):
url = config_client.restapi_url()
secret = config_client.cluster_secret()
return RestClient(url=url, verify=False, secret=secret)

@staticmethod
def get_mongo_cl(config_cl):
mongo_params = config_cl.mongo_params()
def get_mongo_cl(config_client):
mongo_params = config_client.mongo_params()
return MongoClient(mongo_params[0])

@staticmethod
def get_clickhouse_cl(config_cl):
def get_clickhouse_cl(config_client):
user, password, host, db_name, port, secure = (
config_cl.clickhouse_params())
config_client.clickhouse_params())
return clickhouse_connect.get_client(
host=host, password=password, database=db_name, user=user,
port=port, secure=secure)
Expand All @@ -110,13 +111,13 @@ def publish_activities_task(self, organization_id, object_id, object_type,
'meta': meta
}
queue_conn = QConnection(self.rabbitmq_conn_str)
task_exchange = Exchange(ACTIVITIES_EXCHANGE_NAME, type='topic')
activities_exchange = Exchange(ACTIVITIES_EXCHANGE_NAME, type='topic')
with producers[queue_conn].acquire(block=True) as producer:
producer.publish(
task,
serializer='json',
exchange=task_exchange,
declare=[task_exchange],
exchange=activities_exchange,
declare=[activities_exchange],
routing_key=routing_key,
retry=True
)
Expand All @@ -134,10 +135,11 @@ def get_consumers(self, Consumer, channel):
),
)]

def report_import(self, task, config_cl, rest_cl, mongo_cl, clickhouse_cl):
def report_import(self, task, config_client, rest_cl, mongo_cl,
clickhouse_cl):
report_import_id = task.get('report_import_id')
if not report_import_id:
raise Exception('invalid task received: {}'.format(task))
raise Exception(f'invalid task received: {task}')

with self.active_reports_lock:
self.active_report_import_ids.add(report_import_id)
Expand All @@ -148,7 +150,7 @@ def report_import(self, task, config_cl, rest_cl, mongo_cl, clickhouse_cl):
imports = list(filter(
lambda x: x['id'] != report_import_id, resp['report_imports']))
if imports:
reason = 'Import cancelled due another import: %s' % imports[0]['id']
reason = f"Import cancelled due another import: {imports[0]['id']}"
rest_cl.report_import_update(
report_import_id,
{'state': 'failed', 'state_reason': reason}
Expand All @@ -162,7 +164,7 @@ def report_import(self, task, config_cl, rest_cl, mongo_cl, clickhouse_cl):
importer_params = {
'cloud_account_id': cloud_acc_id,
'rest_cl': rest_cl,
'config_cl': config_cl,
'config_cl': config_client,
'mongo_raw': mongo_cl.restapi['raw_expenses'],
'mongo_resources': mongo_cl.restapi['resources'],
'clickhouse_cl': clickhouse_cl,
Expand All @@ -177,8 +179,10 @@ def report_import(self, task, config_cl, rest_cl, mongo_cl, clickhouse_cl):
organization_id = ca.get('organization_id')
_, org = rest_cl.organization_get(organization_id)
if org.get('disabled'):
reason = ('Import cancelled due to disabled '
'organization: %s') % report_import_id
reason = (
f"Import cancelled due to disabled "
f"organization: {report_import_id}"
)
rest_cl.report_import_update(
report_import_id,
{'state': 'failed', 'state_reason': reason}
Expand Down Expand Up @@ -206,7 +210,7 @@ def report_import(self, task, config_cl, rest_cl, mongo_cl, clickhouse_cl):
organization_id, organization_id, 'organization',
'report_import_passed',
'organization.report_import.passed')
except Exception as exc:
except Exception as exc: # pylint: disable=broad-except
if hasattr(exc, 'details'):
# pylint: disable=E1101
LOG.error('Mongo exception details: %s', exc.details)
Expand Down Expand Up @@ -244,14 +248,14 @@ def process_task(self, body, message):
self.executor.submit(self._process_task, body, message)

def _process_task(self, body, message):
config_cl = self.get_config_cl(self.config_cl_params)
rest_cl = self.get_rest_cl(config_cl)
mongo_cl = self.get_mongo_cl(config_cl)
clickhouse_cl = self.get_clickhouse_cl(config_cl)
config_client = self.get_config_cl(self.config_cl_params)
rest_cl = self.get_rest_cl(config_client)
mongo_cl = self.get_mongo_cl(config_client)
clickhouse_cl = self.get_clickhouse_cl(config_client)
try:
self.report_import(body, config_cl=config_cl, rest_cl=rest_cl,
self.report_import(body, config_client=config_client, rest_cl=rest_cl,
mongo_cl=mongo_cl, clickhouse_cl=clickhouse_cl)
except Exception as exc:
except Exception as exc: # pylint: disable=broad-except
LOG.exception('Data import failed: %s', str(exc))
finally:
mongo_cl.close()
Expand All @@ -273,17 +277,20 @@ def _process_task(self, body, message):
'host': os.environ.get('HX_ETCD_HOST'),
'port': int(os.environ.get('HX_ETCD_PORT'))
}
config_cl = ConfigClient(**config_cl_params)
config_cl.wait_configured()
migrator = Migrator(config_cl, 'restapi', 'diworker/diworker/migrations')
config_client = ConfigClient(**config_cl_params)
config_client.wait_configured()
migrator = Migrator(config_client, 'restapi', 'diworker/diworker/migrations')
# Use lock to avoid migration problems with several diworkers
# starting at the same time on cluster
with EtcdLock(config_cl, 'diworker_migrations'):
with EtcdLock(config_client, 'diworker_migrations'):
migrator.migrate()
LOG.info("starting worker")
conn_str = 'amqp://{user}:{pass}@{host}:{port}'.format(
**config_cl.read_branch('/rabbit'))
dw_settings = config_cl.diworker_settings()
rabbit_params = config_client.read_branch('/rabbit')
conn_str = (
f"amqp://{rabbit_params['user']}:{rabbit_params['pass']}"
f"@{rabbit_params['host']}:{rabbit_params['port']}"
)
dw_settings = config_client.diworker_settings()
with QConnection(conn_str) as conn:
try:
worker = DIWorker(conn, conn_str, dw_settings, config_cl_params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ def _is_flavor_usage(expense):
def upgrade(self):
cloud_accounts_ids = self.get_cloud_accounts_ids()
for i, cloud_account_id in enumerate(cloud_accounts_ids):
LOG.info('Started processing for cloud account: %s (%s/%s)' % (
cloud_account_id, i+1, len(cloud_accounts_ids)))
LOG.info('Started processing for cloud account: %s (%s/%s)',
cloud_account_id, i + 1, len(cloud_accounts_ids))
is_processed = self.mongo_temp_table.find({
'cloud_account_id': cloud_account_id})
if is_processed:
LOG.info('Cloud account %s already processed' % cloud_account_id)
LOG.info('Cloud account %s already processed',
cloud_account_id)
continue
update_bulk = []
raw_expenses = self.mongo_raw.find({
Expand All @@ -83,17 +84,18 @@ def upgrade(self):
try:
self.mongo_temp_table.drop()
except Exception as exc:
LOG.warning('Failed to drop temp table: %s' % str(exc))
LOG.warning('Failed to drop temp table: %s', exc)

def downgrade(self):
cloud_accounts_ids = self.get_cloud_accounts_ids()
for i, cloud_account_id in enumerate(cloud_accounts_ids):
LOG.info('Started processing for cloud account: %s (%s/%s)' % (
cloud_account_id, i+1, len(cloud_accounts_ids)))
LOG.info('Started processing for cloud account: %s (%s/%s)',
cloud_account_id, i + 1, len(cloud_accounts_ids))
is_processed = self.mongo_temp_table.find({
'cloud_account_id': cloud_account_id})
if is_processed:
LOG.info('Cloud account %s already processed' % cloud_account_id)
LOG.info('Cloud account %s already processed',
cloud_account_id)
continue
update_bulk = []
raw_expenses = self.mongo_raw.find({
Expand All @@ -116,4 +118,4 @@ def downgrade(self):
try:
self.mongo_temp_table.drop()
except Exception as exc:
LOG.warning('Failed to drop temp table: %s' % str(exc))
LOG.warning('Failed to drop temp table: %s', exc)
Loading