Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion rest_api/rest_api_server/controllers/report_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ACTIVE_IMPORT_THRESHOLD = 1800 # 30 min
DEFAULT_NOT_PROCESSED_REPORT_THRESHOLD_SECONDS = 10800 # 3 hrs
DEFAULT_QUEUE_MESSAGE_EXPIRATION_SECONDS = 10800 # 3 hrs
IMPORT_TIMEOUT_REASON = 'Import timed out while waiting in queue'
LOG = logging.getLogger(__name__)


Expand All @@ -47,10 +48,42 @@ def create(self, cloud_account_id, import_file=None, recalculate=False, priority
report_import, 'recalculation_started')
return report_import

def _fail_timed_out_imports(self, cloud_account_id, settings, now=None):
if now is None:
now = opttime.utcnow().timestamp()
message_ttl = int(settings.get(
'message_expiration_secs',
DEFAULT_QUEUE_MESSAGE_EXPIRATION_SECONDS
))
if message_ttl <= 0:
return
timeout_ts = now - message_ttl
timed_out_imports = self.session.query(ReportImport).filter(
ReportImport.cloud_account_id == cloud_account_id,
ReportImport.deleted_at.is_(False),
or_(
and_(ReportImport.state == ImportStates.IN_PROGRESS,
ReportImport.updated_at < timeout_ts),
and_(ReportImport.state == ImportStates.SCHEDULED,
ReportImport.created_at < timeout_ts)
)
).all()
for report_import in timed_out_imports:
report_id = report_import.id
LOG.warning(
'Marking report import %s as failed due to queue timeout',
report_id
)
self.edit(report_id, state=ImportStates.FAILED.value,
state_reason=IMPORT_TIMEOUT_REASON)

def check_unprocessed_imports(self, cloud_account_id):
dt = opttime.utcnow().timestamp()
report_import_settings = self._config.report_imports_setting()
self._fail_timed_out_imports(cloud_account_id, report_import_settings,
now=dt)
scheduled_threshold = dt - int(
self._config.report_imports_setting().get(
report_import_settings.get(
'not_processed_threshold_secs',
DEFAULT_NOT_PROCESSED_REPORT_THRESHOLD_SECONDS
)
Expand Down
34 changes: 32 additions & 2 deletions rest_api/rest_api_server/tests/unittests/test_schedule_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
from rest_api.rest_api_server.controllers.report_import import (
DEFAULT_QUEUE_MESSAGE_EXPIRATION_SECONDS,
DEFAULT_NOT_PROCESSED_REPORT_THRESHOLD_SECONDS,
IMPORT_TIMEOUT_REASON,
)
from rest_api.rest_api_server.models.db_factory import DBFactory, DBType
from rest_api.rest_api_server.models.db_base import BaseDB
from rest_api.rest_api_server.models.models import CloudAccount
from rest_api.rest_api_server.models.models import CloudAccount, ReportImport
from rest_api.rest_api_server.models.enums import (
CloudTypes
CloudTypes, ImportStates
)
from rest_api.rest_api_server.tests.unittests.test_api_base import TestApiBase
from rest_api.rest_api_server.utils import MAX_32_INT, encode_config
Expand Down Expand Up @@ -199,6 +200,35 @@ def test_schedule_org_account_type_without_org_id(self):
self.assertEqual(ret['error']['error_code'],
'OE0529')

def test_mark_stale_import_failed(self):
cloud_acc_id = self._create_cloud_acc_object(import_period=0)
db = DBFactory(DBType.Test, None).db
session = BaseDB.session(db.engine)()
outdated_ts = opttime.utcnow_timestamp() - (
DEFAULT_QUEUE_MESSAGE_EXPIRATION_SECONDS + 1)
stale_import = ReportImport(
cloud_account_id=cloud_acc_id,
state=ImportStates.IN_PROGRESS,
updated_at=outdated_ts
)
session.add(stale_import)
session.commit()
stale_import_id = stale_import.id
session.close()

code, ret = self.client.schedule_import(0)
self.assertEqual(code, 201)
self.assertEqual(len(ret['report_imports']), 1)
self.assertNotEqual(ret['report_imports'][0]['id'], stale_import_id)

session = BaseDB.session(db.engine)()
updated = session.query(ReportImport).filter(
ReportImport.id == stale_import_id
).one()
self.assertEqual(updated.state, ImportStates.FAILED)
self.assertEqual(updated.state_reason, IMPORT_TIMEOUT_REASON)
session.close()

def test_create_scheduled_duplicate(self):
code, org2 = self.client.organization_create({'name': 'org2'})
self.assertEqual(code, 201)
Expand Down