Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions f8a_worker/dispatcher/nodes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,18 @@
versioned: false
days_to_expire: 31

- name: 'GremlinHttp'
import: 'f8a_worker.storages'
configuration: &configurationGremlin
# Can be overwritten using env variables supplied on deployment
gremlin_http_host: bayesian-gremlin-http
gremlin_http_port: 8182

- name: 'PackageGremlinHttp'
import: 'f8a_worker.storages'
configuration:
<<: *configurationGremlin

global:
trace:
import: 'f8a_worker.dispatcher.trace'
Expand Down
4 changes: 3 additions & 1 deletion f8a_worker/storages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@
from .s3_description_repository import S3RepositoryDescription
from .s3_keywords_summary import S3KeywordsSummary
from .s3_userintent import S3UserIntent
from .s3_manual_tagging import S3ManualTagging
from .gremlin import GremlinHttp
from .gremlin import PackageGremlinHttp
from .s3_manual_tagging import S3ManualTagging
26 changes: 26 additions & 0 deletions f8a_worker/storages/gremlin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python3

import os
import requests


class GremlinHttpBase(object):
def __init__(self, gremlin_http_host=None, gremlin_http_port=None):
self.gremlin_http_host = os.getenv('GREMLIN_HTTP_HOST', gremlin_http_host)
self.gremlin_http_port = os.getenv('GREMLIN_HTTP_PORT', gremlin_http_port)


class GremlinHttp(GremlinHttpBase):
def store_task_result(self, ecosystem, name, version, task_name, task_result):
# TODO: implement
print("#"*80)
print("Storing result for {ecosystem}/{name}/{version}, task {task_name}".format(**locals()))
print("#"*80)


class PackageGremlinHttp(GremlinHttpBase):
def store_task_result(self, ecosystem, name, task_name, task_result):
# TODO: implement
print("#"*80)
print("Storing result for {ecosystem}/{name}/{version}, task {task_name}".format(**locals()))
print("#"*80)
4 changes: 4 additions & 0 deletions f8a_worker/storages/postgres_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ def _create_result_entry(self, node_args, flow_name, task_name, task_id, result,
raise NotImplementedError()

def store(self, node_args, flow_name, task_name, task_id, result):
if not result:
# Do not store empty results
return

# Sanity checks
if not self.is_connected():
self.connect()
Expand Down
7 changes: 7 additions & 0 deletions f8a_worker/storages/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ def retrieve_latest_version_id(self, object_key):

return self._s3.Object(self.bucket_name, object_key).version_id

def list_available_task_results(self, arguments):
"""Get names of all task results stored on S3."""
object_key = self._construct_base_file_name(arguments)
bucket = self._s3.Bucket(self.bucket_name)
objects = bucket.objects.filter(Prefix=object_key)
return list(x.key for x in objects if x.key.endswith('.json'))
Copy link
Contributor Author

@fridex fridex Oct 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@miteshvp This will probably list task results with the whole object key ("path"). If you don't want to do it that way, feel free to adjust to return only task name.

EDIT: it will be probably needed anyway as this is handled by adapter.


@staticmethod
def is_enabled():
""":return: True if S3 sync is enabled, False otherwise."""
Expand Down
71 changes: 29 additions & 42 deletions f8a_worker/workers/graph_importer.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,42 @@
from f8a_worker.base import BaseTask
import requests
from os import environ
from selinon import StoragePool
from selinon import FatalTaskError


class GraphImporterTask(BaseTask):
_SERVICE_HOST = environ.get("BAYESIAN_DATA_IMPORTER_SERVICE_HOST", "bayesian-data-importer")
_SERVICE_PORT = environ.get("BAYESIAN_DATA_IMPORTER_SERVICE_PORT", "9192")
_INGEST_SERVICE_ENDPOINT = "api/v1/ingest_to_graph"
_SELECTIVE_SERVICE_ENDPOINT = "api/v1/selective_ingest"
_INGEST_API_URL = "http://{host}:{port}/{endpoint}".format(host=_SERVICE_HOST,
port=_SERVICE_PORT,
endpoint=_INGEST_SERVICE_ENDPOINT)

_SELECTIVE_API_URL = "http://{host}:{port}/{endpoint}".format(
host=_SERVICE_HOST,
port=_SERVICE_PORT,
endpoint=_SELECTIVE_SERVICE_ENDPOINT)
"""Sync data from S3 to graph database."""

add_audit_info = False

def execute(self, arguments):
self._strict_assert(arguments.get('ecosystem'))
self._strict_assert(arguments.get('name'))
self._strict_assert(arguments.get('document_id'))

package_list = [
{
'ecosystem': arguments['ecosystem'],
'name': arguments['name'],
'version': arguments.get('version')
}
]

# If we force graph sync, sync all task results, otherwise only
# finished in this analysis run
if not arguments.get('force_graph_sync'):
# Tasks that need sync to graph start lowercase.
param = {
'select_ingest': [task_name
for task_name in self.storage.get_finished_task_names(
arguments['document_id'])
if task_name[0].islower()],
'package_list': package_list
}
endpoint = self._SELECTIVE_API_URL
else:
param = package_list
endpoint = self._INGEST_API_URL
if self.task_name not in ('PackageGraphImporterTask', 'GraphImporterTask'):
raise FatalTaskError("Unknown task name - cannot distinguish package vs. version level data")

version_level = self.task_name == 'GraphImporterTask'

self.log.info("Invoke graph importer at url: '%s' for %s", endpoint, param)
response = requests.post(endpoint, json=param)
postgres = StoragePool.get_connected_storage('BayesianPostgres' if version_level else 'PackagePostgres')
s3 = StoragePool.get_connected_storage('S3Data' if version_level else 'S3PackageData')
gremlin = StoragePool.get_connected_storage('GremlinHttp' if version_level else 'PackageGremlinHttp')

if response.status_code != 200:
raise RuntimeError("Failed to invoke graph import at '%s' for %s" % (endpoint, param))
adapter_kwargs = {
'ecosystem': arguments['ecosystem'],
'name': arguments['name']
}
if version_level:
self._strict_assert(arguments.get('version'))
adapter_kwargs['version'] = arguments['version']

if arguments.get('force_graph_sync'):
tasks_to_sync = s3.list_available_task_results(arguments)
self.log.info("Force sync of all task results available on S3: %s", tasks_to_sync)
else:
tasks_to_sync = postgres.get_finished_task_names(arguments['document_id'])
self.log.info("Syncing results of tasks in the current run: %s", tasks_to_sync)

self.log.info("Graph import succeeded with response: %s", response.text)
for task_name in tasks_to_sync:
task_result = s3.retrieve_task_result(task_name=task_name, **adapter_kwargs)
gremlin.store_task_result(task_name=task_name, task_result=task_result, **adapter_kwargs)