Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions alembic/versions/0b1a8b3e1e0f_drop_result_collector_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Drop result collector task

Revision ID: 0b1a8b3e1e0f
Revises: 7c643a1823db
Create Date: 2017-11-13 14:40:16.176999

"""

# revision identifiers, used by Alembic.
revision = '0b1a8b3e1e0f'
down_revision = '7c643a1823db'
branch_labels = None
depends_on = None

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('package_worker_results',
sa.Column('s3_version_id',
sa.String(length=255), nullable=True))
op.drop_column('package_worker_results', 'task_result')
op.add_column('worker_results',
sa.Column('s3_version_id',
sa.String(length=255), nullable=True))
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('worker_results', 's3_version_id')
op.add_column('package_worker_results',
sa.Column('task_result',
postgresql.JSONB(astext_type=sa.Text()),
autoincrement=False,
nullable=True))
op.drop_column('package_worker_results', 's3_version_id')
# ### end Alembic commands ###
14 changes: 1 addition & 13 deletions f8a_worker/dispatcher/flows/bayesianFlow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,8 @@
- from: 'bayesianAnalysisFlow'
to: 'FinalizeTask'
- from: 'FinalizeTask'
to: 'ResultCollector'
condition:
name: 'envEqual'
args:
env: 'BAYESIAN_SYNC_S3'
value: '1'
to: 'GraphImporterTask'
- from: 'FinalizeTaskError'
to: 'ResultCollector'
condition:
name: 'envEqual'
args:
env: 'BAYESIAN_SYNC_S3'
value: '1'
- from: 'ResultCollector'
to: 'GraphImporterTask'
failures:
- nodes:
Expand Down
2 changes: 0 additions & 2 deletions f8a_worker/dispatcher/flows/bayesianPackageFlow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
- from: 'bayesianPackageTaggingFlow'
to: 'PackageFinalizeTask'
- from: 'PackageFinalizeTask'
to: 'PackageResultCollector'
- from: 'PackageResultCollector'
to: 'PackageGraphImporterTask'
- from: 'PackageFinalizeTaskError'
to: 'PackageGraphImporterTask'
Expand Down
26 changes: 12 additions & 14 deletions f8a_worker/dispatcher/nodes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,31 +177,31 @@
import: 'f8a_worker.workers'
max_retry: 0
queue: '{DEPLOYMENT_PREFIX}_{WORKER_ADMINISTRATION_REGION}_GraphAggregatorTask_v0'
storage: 'BayesianPostgres'
storage: 'StackPostgres'
- name: 'stack_aggregator'
classname: 'StackAggregatorTask'
import: 'f8a_worker.workers'
max_retry: 0
queue: '{DEPLOYMENT_PREFIX}_{WORKER_ADMINISTRATION_REGION}_StackAggregatorTask_v0'
storage: 'BayesianPostgres'
storage: 'StackPostgres'
- name: 'stack_aggregator_v2'
classname: 'StackAggregatorV2Task'
import: 'f8a_worker.workers'
max_retry: 0
queue: '{DEPLOYMENT_PREFIX}_{WORKER_ADMINISTRATION_REGION}_StackAggregatorV2Task_v0'
storage: 'BayesianPostgres'
storage: 'StackPostgres'
- name: 'recommendation'
classname: 'RecommendationTask'
import: 'f8a_worker.workers'
max_retry: 0
queue: '{DEPLOYMENT_PREFIX}_{WORKER_ADMINISTRATION_REGION}_RecommendationTask_v0'
storage: 'BayesianPostgres'
storage: 'StackPostgres'
- name: 'recommendation_v2'
classname: 'RecommendationV2Task'
import: 'f8a_worker.workers'
max_retry: 0
queue: '{DEPLOYMENT_PREFIX}_{WORKER_ADMINISTRATION_REGION}_RecommendationV2Task_v0'
storage: 'BayesianPostgres'
storage: 'StackPostgres'
- name: 'InitAnalysisFlow'
import: 'f8a_worker.workers'
max_retry: 0
Expand All @@ -210,14 +210,6 @@
#selective_run_function:
# name: 'selective_run_function'
# import: 'f8a_worker.dispatcher.selective'
- name: 'ResultCollector'
import: 'f8a_worker.workers'
max_retry: 0
queue: '{DEPLOYMENT_PREFIX}_{WORKER_ADMINISTRATION_REGION}_ResultCollector_v0'
- name: 'PackageResultCollector'
import: 'f8a_worker.workers'
max_retry: 0
queue: '{DEPLOYMENT_PREFIX}_{WORKER_ADMINISTRATION_REGION}_PackageResultCollector_v0'
- name: 'BigQueryTask'
import: 'f8a_worker.workers'
max_retry: 0
Expand Down Expand Up @@ -355,6 +347,12 @@
configuration:
<<: *postgresConfiguration

- name: 'StackPostgres'
classname: 'StackPostgres'
import: 'f8a_worker.storages'
configuration:
<<: *postgresConfiguration

- name: 'AmazonS3'
import: 'f8a_worker.storages'
configuration: &configurationS3
Expand Down Expand Up @@ -503,6 +501,6 @@

global:
trace:
import: 'f8a_worker.dispatcher.trace'
function: 'trace_func'
import: 'f8a_worker.dispatcher.trace'
predicates_module: 'f8a_worker.dispatcher.predicates'
16 changes: 5 additions & 11 deletions f8a_worker/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,6 @@ class Analysis(Base):

version = relationship(Version, back_populates='analyses', lazy='joined')

@property
def analyses(self):
s = Session.object_session(self)
if s:
worker_results = s.query(WorkerResult).filter(WorkerResult.analysis_id == self.id)
return {wr.worker: wr.task_result for wr in worker_results}
return {}

@property
def raw_analyses(self):
s = Session.object_session(self)
Expand Down Expand Up @@ -246,8 +238,10 @@ class WorkerResult(Base):
# the value of `analysis_id` should be `NULL`
external_request_id = Column(String(64), index=True)
analysis_id = Column(ForeignKey(Analysis.id), index=True)
# Used only in stack analysis now
task_result = Column(JSONB)
error = Column(Boolean, nullable=False, default=False)
s3_version_id = Column(String(255), index=False, nullable=True)

analysis = relationship(Analysis)

Expand Down Expand Up @@ -314,18 +308,18 @@ class PackageWorkerResult(Base):
worker = Column(String(255), index=True)
worker_id = Column(String(64), unique=True)
external_request_id = Column(String(64))
task_result = Column(JSONB)
error = Column(Boolean, nullable=False, default=False)
s3_version_id = Column(String(255), index=False, nullable=True)

package_analysis = relationship(PackageAnalysis)

@property
def ecosystem(self):
return self.diagnosis.package.ecosystem
return self.package_analysis.package.ecosystem

@property
def package(self):
return self.diagnosis.package
return self.package_analysis.package


class StackAnalysisRequest(Base):
Expand Down
1 change: 1 addition & 0 deletions f8a_worker/storages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from .postgres import BayesianPostgres
from .package_postgres import PackagePostgres
from .stack_postgres import StackPostgres
from .s3 import AmazonS3
from .s3_artifacts import S3Artifacts
from .s3_temp_artifacts import S3TempArtifacts
Expand Down
45 changes: 23 additions & 22 deletions f8a_worker/storages/package_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,12 @@ def s3(self):
self._s3 = StoragePool.get_connected_storage('S3PackageData')
return self._s3

def _create_result_entry(self, node_args, flow_name, task_name, task_id, result, error=None):
if error is None and isinstance(result, dict):
error = result.get('status') == 'error'

def _create_result_entry(self, node_args, flow_name, task_name, task_id, error=False):
return PackageWorkerResult(
worker=task_name,
worker_id=task_id,
package_analysis_id=(node_args.get('document_id')
if isinstance(node_args, dict) else None),
task_result=result,
error=error,
external_request_id=(node_args.get('external_request_id')
if isinstance(node_args, dict) else None)
Expand Down Expand Up @@ -112,25 +108,26 @@ def get_finished_task_names(analysis_id):

return list(chain(*task_names))

def get_latest_task_result(self, ecosystem, package, task_name):
def get_latest_task_result(self, ecosystem, package, task_name, error=False):
"""Get latest task result based on task name

:param ecosystem: name of the ecosystem
:param package: name of the package
:param task_name: name of task for which the latest result should be obtained
:param error: if False, avoid returning entries that track errors
"""
# TODO: we should store date timestamps directly in PackageWorkerResult
# TODO: we should store t date timestamps directly in PackageWorkerResult
if not self.is_connected():
self.connect()

try:
entry = PostgresBase.session.query(PackageWorkerResult.task_result).\
join(PackageAnalysis).\
join(Package).join(Ecosystem).\
entry = PostgresBase.session.query(PackageWorkerResult).\
join(PackageAnalysis).join(Package).join(Ecosystem).\
filter(PackageWorkerResult.worker == task_name).\
filter(Package.name == package).\
filter(Ecosystem.name == ecosystem).\
filter(PackageWorkerResult.error.is_(False)).\
filter(PackageWorkerResult.error.is_(error)).\
filter(PackageWorkerResult.s3_version_id.isnot(None)).\
order_by(PackageAnalysis.finished_at.desc()).first()
except SQLAlchemyError:
PostgresBase.session.rollback()
Expand All @@ -139,10 +136,7 @@ def get_latest_task_result(self, ecosystem, package, task_name):
if not entry:
return None

if not self.is_real_task_result(entry.task_result):
return self.s3.retrieve_task_result(ecosystem, package, task_name)

return entry.task_result
return self.s3.retrieve_task_result(ecosystem, package, task_name)

def get_task_result_by_analysis_id(self, ecosystem, package, task_name, analysis_id):
"""Get latest task result based analysis id.
Expand All @@ -157,7 +151,7 @@ def get_task_result_by_analysis_id(self, ecosystem, package, task_name, analysis
self.connect()

try:
entry = PostgresBase.session.query(PackageWorkerResult.task_result).\
entry = PostgresBase.session.query(PackageWorkerResult).\
join(PackageAnalysis).\
join(Package).join(Ecosystem).\
filter(PackageWorkerResult.worker == task_name).\
Expand All @@ -172,12 +166,20 @@ def get_task_result_by_analysis_id(self, ecosystem, package, task_name, analysis
if not entry:
return None

if not self.is_real_task_result(entry.task_result):
# This can be confusing as we do not retrieve directly version that
# is referenced but we rather replace it with the latest.
return self.s3.retrieve_task_result(ecosystem, package, task_name)
return self.s3.retrieve_task_result(
ecosystem,
package,
task_name,
version_id=entry.s3_version_id
)

return entry.task_result
def _retrieve_task_result(self, record):
return self.s3.retrieve_task_result(
record.ecosystem.name,
record.package.name,
record.worker,
record.s3_version_id
)

def get_latest_task_entry(self, ecosystem, package, task_name, error=False):
"""Get latest task result based on task name
Expand All @@ -186,7 +188,6 @@ def get_latest_task_entry(self, ecosystem, package, task_name, error=False):
:param package: name of the package
:param task_name: name of task for which the latest result should be obtained
:param error: if False, avoid returning entries that track errors
:param real: if False, do not check results that are stored on S3 but
rather return Postgres entry
"""
# TODO: we should store date timestamps directly in PackageWorkerResult
Expand Down
34 changes: 21 additions & 13 deletions f8a_worker/storages/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,43 +29,48 @@ def s3(self):
self._s3 = StoragePool.get_connected_storage('S3Data')
return self._s3

def _create_result_entry(self, node_args, flow_name, task_name, task_id, result, error=None):
if error is None and isinstance(result, dict):
error = result.get('status') == 'error'

def _create_result_entry(self, node_args, flow_name, task_name, task_id, error=False):
return WorkerResult(
worker=task_name,
worker_id=task_id,
analysis_id=node_args.get('document_id') if isinstance(node_args, dict) else None,
task_result=result,
error=error,
external_request_id=(node_args.get('external_request_id')
if isinstance(node_args, dict) else None)
)

def get_latest_task_result(self, ecosystem, package, version, task_name):
def _retrieve_task_result(self, record):
return self.s3.retrieve_task_result(
record.ecosystem.name,
record.package.name,
record.version.identifier,
record.worker,
record.s3_version_id
)

def get_latest_task_result(self, ecosystem, package, version, task_name, error=False):
"""Get latest task result based on task name

:param ecosystem: name of the ecosystem
:param package: name of the package
:param version: package version
:param task_name: name of task for which the latest result should be obtained
:param error: if False, avoid returning entries that track errors
:param real: if False, do not check results that are stored on S3 but
rather return Postgres entry
"""
# TODO: we should store date timestamps directly in WorkerResult
if not self.is_connected():
self.connect()

try:
entry = PostgresBase.session.query(WorkerResult.task_result).\
entry = PostgresBase.session.query(WorkerResult).\
join(Analysis).join(Version).join(Package).join(Ecosystem).\
filter(WorkerResult.worker == task_name).\
filter(Package.name == package).\
filter(Version.identifier == version).\
filter(Ecosystem.name == ecosystem).\
filter(WorkerResult.error.is_(False)).\
filter(WorkerResult.error.is_(error)).\
filter(WorkerResult.s3_version_id.isnot(None)).\
order_by(Analysis.finished_at.desc()).first()
except SQLAlchemyError:
PostgresBase.session.rollback()
Expand All @@ -74,10 +79,13 @@ def get_latest_task_result(self, ecosystem, package, version, task_name):
if not entry:
return None

if not self.is_real_task_result(entry.task_result):
return self.s3.retrieve_task_result(ecosystem, package, version, task_name)

return entry.task_result
return self.s3.retrieve_task_result(
ecosystem,
package,
version,
task_name,
object_version_id=entry.s3_version_id
)

def get_latest_task_entry(self, ecosystem, package, version, task_name, error=False):
"""Get latest task result based on task name
Expand Down
Loading