diff --git a/.github/workflows/actions.yml b/.github/workflows/actions.yml index 6d6db58..2bec06f 100644 --- a/.github/workflows/actions.yml +++ b/.github/workflows/actions.yml @@ -3,14 +3,15 @@ on: [push, pull_request] jobs: test-client: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest + timeout-minutes: 30 strategy: matrix: python: [3.8, 3.9, "3.10", 3.12] extras: ["test", "test,queuable,sentry"] steps: - name: Setup Python - uses: actions/setup-python@v2.2.2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python }} - name: Check out repository code @@ -20,13 +21,14 @@ jobs: - name: Test working-directory: ./client run: | - pip install -e .[${{ matrix.extras }}] - py.test + pip --version + pip install --verbose .[${{ matrix.extras }}] + python -m pytest --import-mode=importlib ./datalake/tests/ test-docker: runs-on: ubuntu-latest steps: - name: Check out repository code - uses: actions/checkout@v2 + uses: actions/checkout@v4 with: fetch-depth: 0 - name: Test diff --git a/.gitignore b/.gitignore index 6f2d130..75bb75f 100644 --- a/.gitignore +++ b/.gitignore @@ -58,3 +58,6 @@ target/ # build artifacts version.txt + +# Claude Code +.claude/ diff --git a/Dockerfile b/Dockerfile index bc15457..9a7e43b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,29 +7,33 @@ ENV LC_ALL C.UTF-8 # TODO: keep requirements in one place RUN pip install \ - blinker>=1.4 \ - boto3>=1.1.3 \ - click>=5.1 \ - Flask>=0.10.1 \ - flask-swagger>=0.2.14 \ - memoized_property>=1.0.1 \ - python-dateutil>=2.4.2 \ - python-dotenv>=0.1.3 \ - pytz>=2015.4 \ - sentry-sdk[flask]>=0.19.5 \ - requests>=2.5 \ - simplejson>=3.3.1 \ - six>=1.10.0 \ + 'blinker>=1.4' \ + 'boto3==1.35.41' \ + 'botocore==1.35.64' \ + 'click>=5.1' \ + 'datalake<2' \ + 'Flask>=0.10.1' \ + 'flask-swagger>=0.2.14' \ + 'memoized_property>=1.0.1' \ + 'pyinotify>=0.9.4' \ + 'python-dateutil>=2.4.2' \ + 'python-dotenv>=0.1.3' \ + 'pytz>=2015.4' \ + 'raven>=5.0.0' \ + 'requests>=2.5' \ + 'sentry-sdk[flask]>=0.19.5' \ + 'simplejson>=3.3.1' \ + 'six>=1.10.0' \ # test requirements 'flake8>=2.5.0,<4.1' \ 'freezegun<1' \ - 'moto<3' \ + 'moto>=5,<6' \ 'pytest<8' \ + 'pytest-cov>=2.5.1,<4' \ + 'pytest-xdist' \ 'responses<0.22.0' \ - pyinotify>=0.9.4, \ - raven>=5.0.0 \ - 'tox>4,<5' \ - 'datalake<2' + 'tox>4,<5' + RUN mkdir -p /opt/ COPY . /opt/ diff --git a/Makefile b/Makefile index 9a7009f..974c37b 100644 --- a/Makefile +++ b/Makefile @@ -7,18 +7,27 @@ IMAGE="$(REPO)/$(REPO_PATH):$(VERSION)" docker: version docker build --build-arg VERSION=$(VERSION) -t $(IMAGE) . -.PHONY: devshell # Open a developer shell in the docker env -devshell: docker +.PHONY: dev # Open a developer shell in the docker env +dev: docker docker run --rm -it -v $$PWD:/opt --entrypoint /bin/bash $(IMAGE) test-client: docker - docker run --rm --entrypoint tox $(IMAGE) -c /opt/client/tox.ini + docker run --rm -t --entrypoint tox $(IMAGE) -c /opt/client/tox.ini test-ingester: docker - docker run --rm --entrypoint py.test $(IMAGE) ingester + docker run --rm -t --entrypoint pytest $(IMAGE) /opt/ingester -svvx test-api: docker - docker run --rm --entrypoint py.test $(IMAGE) api + docker run --rm -t --entrypoint pytest $(IMAGE) /opt/api -svvx + +testp-client: docker + docker run --rm -t --entrypoint /bin/bash $(IMAGE) -c "cd /opt/client && pytest -svvx -n auto" + +testp-ingester: docker + docker run --rm -t --entrypoint pytest $(IMAGE) /opt/ingester -svvx -n auto + +testp-api: docker + docker run --rm -t --entrypoint pytest $(IMAGE) /opt/api -svvx -n auto .PHONY: test # Run the tests test: @@ -27,6 +36,18 @@ test: $(MAKE) test-ingester $(MAKE) test-api +.PHONY: testp # Run all tests in parallel with pytest-xdist +testp: docker + docker run --rm -t --entrypoint /bin/bash $(IMAGE) -c "\ + set -e && \ + echo '==> Running client tests...' && \ + cd /opt/client && pytest -svvx -n auto && \ + echo '==> Running ingester tests...' && \ + cd /opt/ingester && pytest -svvx -n auto && \ + echo '==> Running API tests...' && \ + cd /opt/api && pytest -svvx -n auto && \ + echo '==> All tests passed!'" + .PHONY: push push: docker push $(IMAGE) diff --git a/api/datalake_api/v0.py b/api/datalake_api/v0.py index c1aad72..84ec951 100644 --- a/api/datalake_api/v0.py +++ b/api/datalake_api/v0.py @@ -18,6 +18,8 @@ from flask import current_app as app import os import simplejson as json +from datetime import datetime, timezone +import decimal from .querier import ArchiveQuerier, Cursor, InvalidCursor, \ DEFAULT_LOOKBACK_DAYS from .fetcher import ArchiveFileFetcher @@ -29,6 +31,38 @@ _archive_querier = None + +def unix_ms_to_utc_iso(unix_ms): + if unix_ms is None: + return unix_ms + unix_ms_to_iso = unix_ms + if isinstance(unix_ms_to_iso, decimal.Decimal): + unix_ms_to_iso = float(unix_ms_to_iso) + iso = datetime.fromtimestamp( + unix_ms_to_iso / 1000.0, tz=timezone.utc + ).isoformat(timespec='milliseconds').replace('+00:00', 'Z') + return iso + + +def add_utc_metadata(metadata): + """Add ISO-8601 UTC timestamp fields to metadata dict + + This function takes a metadata dict and adds start_iso and end_iso fields + based on existing start and end epoch timestamps + iso precision is set to milliseconds + Can be expanded to add any api-level metadata + """ + if not metadata: + return metadata + + start_iso = unix_ms_to_utc_iso(metadata['start']) + end_iso = unix_ms_to_utc_iso(metadata['end']) + + metadata['start_iso'] = start_iso + metadata['end_iso'] = end_iso + return metadata + + def _get_aws_kwargs(): kwargs = dict( region_name=app.config.get('AWS_REGION'), @@ -305,6 +339,14 @@ def files_get(): type: string description: 16-byte blake2 hash of the file content + start_iso: + type: string + description: the start time of the file in ISO + format UTC iso timezone + end_iso: + type: string + description: the end time of the file in ISO + format UTC iso timezone next: type: string @@ -349,7 +391,10 @@ def files_get(): where=params.get('where'), cursor=params.get('cursor')) - [r.update(http_url=_get_canonical_http_url(r)) for r in results] + for r in results: + r.update(http_url=_get_canonical_http_url(r)) + r['metadata'] = add_utc_metadata(r['metadata']) + response = { 'records': results, 'next': _get_next_url(flask.request, results), @@ -476,6 +521,7 @@ def file_get_metadata(file_id): id: DatalakeAPIError ''' f = _get_file(file_id) + f.metadata = add_utc_metadata(f.metadata) return Response(json.dumps(f.metadata), content_type='application/json') @@ -542,6 +588,7 @@ def latest_get(what, where): params = _validate_latest_params(params) f = _get_latest(what, where, params.get('lookback', DEFAULT_LOOKBACK_DAYS)) f.update(http_url=_get_canonical_http_url(f)) + f['metadata'] = add_utc_metadata(f['metadata']) return Response(json.dumps(f), content_type='application/json') diff --git a/api/setup.py b/api/setup.py index fa3e272..d45adde 100644 --- a/api/setup.py +++ b/api/setup.py @@ -30,7 +30,7 @@ def get_version_from_pyver(): if 'sdist' in sys.argv or 'bdist_wheel' in sys.argv: raise ImportError('You must install pyver to create a package') else: - return 'noversion' + return '0.0.0' version, version_info = pyver.get_version(pkg="datalake_api", public=True) return version diff --git a/api/tests/conftest.py b/api/tests/conftest.py index bf166fa..69fb58c 100644 --- a/api/tests/conftest.py +++ b/api/tests/conftest.py @@ -18,7 +18,7 @@ ClientError as BotoClientError, NoCredentialsError ) -from moto import mock_dynamodb2 +from moto import mock_aws from datalake_api import app as datalake_api from datalake.tests import * # noqa @@ -33,12 +33,15 @@ # This will cause moto to fail # But more critically, may impact production systems # So we test for real credentials and fail hard if they exist -sts = boto3.client('sts') -try: - sts.get_caller_identity() - pytest.exit("Real AWS credentials detected, aborting", 3) -except NoCredentialsError: - pass # no credentials are good +# Session fixture runs in pytest setup rather than at import time +@pytest.fixture(scope='session', autouse=True) +def verify_no_aws_credentials(): + sts = boto3.client('sts') + try: + sts.get_caller_identity() + pytest.exit("Real AWS credentials detected, aborting", 3) + except NoCredentialsError: + pass def get_client(): @@ -46,17 +49,11 @@ def get_client(): datalake_api.app.config.from_object(settings) datalake_api.app.config['TESTING'] = True + datalake_api.app.config['AWS_REGION'] = 'us-east-1' datalake_api.app.config['AWS_ACCESS_KEY_ID'] = 'abc' datalake_api.app.config['AWS_SECRET_ACCESS_KEY'] = '123' - # TODO: Sigh. The api caches the archive_fetcher and s3_bucket, which is - # the right thing. However, because moto<3 still uses httpretty, and - # because httpretty wreaks havoc on the python socket code, these cached - # parts end up in a bad state after their first use. The right thing to do - # here is to upgrade moto. But for that we will also have to move - # everything from boto to boto3. This is a near-term goal. But first lets - # get everything off of python2. - for a in ('archive_fetcher', 's3_bucket'): + for a in ('archive_fetcher', 's3_bucket', 'dynamodb'): try: delattr(datalake_api.app, a) except AttributeError: @@ -71,7 +68,7 @@ def client(): @pytest.fixture def dynamodb(request): - mock = mock_dynamodb2() + mock = mock_aws() mock.start() def tear_down(): @@ -79,7 +76,7 @@ def tear_down(): request.addfinalizer(tear_down) return boto3.resource('dynamodb', - region_name='us-west-2', + region_name='us-east-1', aws_secret_access_key='123', aws_access_key_id='abc') diff --git a/api/tests/test_metadata.py b/api/tests/test_metadata.py index 22f0047..7b25bec 100644 --- a/api/tests/test_metadata.py +++ b/api/tests/test_metadata.py @@ -11,6 +11,8 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations under # the License. +from datetime import datetime, timezone +from decimal import Decimal import pytest import simplejson as json @@ -32,7 +34,20 @@ def test_get_metadata(metadata_getter, s3_file_maker, random_metadata): res = metadata_getter('12345') assert res.status_code == 200 assert res.content_type == 'application/json' - assert json.loads(res.data) == random_metadata + res_data = json.loads(res.data) + for k, v in res_data.items(): + if k == 'start_iso' or k == 'end_iso': + k_epoch = k.replace('_iso','') + v_epoch = res_data[k_epoch] + if v is None: + assert v == v_epoch + + expected_v_iso = datetime.fromtimestamp( + v_epoch / 1000.0, tz=timezone.utc + ).isoformat(timespec='milliseconds').replace('+00:00', 'Z') + assert v == expected_v_iso + else: + assert v == random_metadata[k] def test_no_such_metadata(s3_bucket_maker, metadata_getter): diff --git a/client/datalake/tests/conftest.py b/client/datalake/tests/conftest.py index 99aac43..4cb8c2b 100644 --- a/client/datalake/tests/conftest.py +++ b/client/datalake/tests/conftest.py @@ -20,7 +20,7 @@ try: - from moto import mock_s3 + from moto import mock_aws import boto3 from six.moves.urllib.parse import urlparse import json @@ -125,24 +125,8 @@ def get_tmpfile(content): @pytest.fixture -def aws_connector(request): - - def create_connection(mocker, connector): - mock = mocker() - mock.start() - - def tear_down(): - mock.stop() - request.addfinalizer(tear_down) - - return connector() - - return create_connection - - -@pytest.fixture -def s3_connection(aws_connector): - with mock_s3(): +def s3_connection(): + with mock_aws(): yield boto3.resource('s3') diff --git a/client/pyproject.toml b/client/pyproject.toml index 6a147ef..95c45ec 100644 --- a/client/pyproject.toml +++ b/client/pyproject.toml @@ -21,7 +21,8 @@ classifiers = [ "Programming Language :: Python :: 3", ] dependencies = [ - 'boto3>=1.9.68', + 'boto3==1.35.41', + 'botocore==1.35.64', 'memoized_property>=1.0.1', 'pyblake2>=0.9.3; python_version<"3.6"', 'click>=4.1', @@ -37,7 +38,7 @@ dynamic = ["version"] test = [ 'pytest<8.0.0', 'pytest-cov>=2.5.1,<4', - 'moto[s3]>4,<5', + 'moto[s3]>5,<6', 'twine<4.0.0', 'pip>=20.0.0,<22.0.0', 'wheel<0.38.0', @@ -73,7 +74,7 @@ distance-dirty = "{base_version}+{distance}.{vcs}{rev}.dirty" # Example formatted version: 1.2.3+42.ge174a1f.dirty [tool.pytest.ini_options] -addopts = "--cov=planet.mc_client --cov-config .coveragerc" +addopts = "--cov=datalake --cov-config .coveragerc" markers = [ "slow: marks tests as slow (deselect with '-m \"not slow\"')" ] diff --git a/ingester/datalake_ingester/queue.py b/ingester/datalake_ingester/queue.py index aab0d02..8f413b6 100644 --- a/ingester/datalake_ingester/queue.py +++ b/ingester/datalake_ingester/queue.py @@ -13,7 +13,7 @@ # the License. from memoized_property import memoized_property -import boto.sqs +import boto3 import simplejson as json import logging import os @@ -39,16 +39,13 @@ def set_handler(self, h): self.handler = h @memoized_property - def _queue(self): - return self._connection.get_queue(self.queue_name) + def _connection(self): + region = os.environ.get('AWS_REGION', 'us-east-1') + return boto3.resource('sqs', region_name=region) @memoized_property - def _connection(self): - region = os.environ.get('AWS_REGION') - if region: - return boto.sqs.connect_to_region(region) - else: - return boto.connect_sqs() + def _queue(self): + return self._connection.get_queue_by_name(QueueName=self.queue_name) _LONG_POLL_TIMEOUT = 20 @@ -57,17 +54,19 @@ def drain(self, timeout=None): ''' long_poll_timeout = timeout or self._LONG_POLL_TIMEOUT while True: - raw_msg = self._queue.read(wait_time_seconds=long_poll_timeout) - if raw_msg is None: + messages = self._queue.receive_messages( + MaxNumberOfMessages=1, + WaitTimeSeconds=long_poll_timeout + ) + if not messages: if timeout: return else: continue - self._handle_raw_message(raw_msg) + self._handle_raw_message(messages[0]) def _handle_raw_message(self, raw_msg): - # eliminate newlines in raw message so it all logs to one line - raw = raw_msg.get_body().replace('\n', ' ') + raw = raw_msg.body.replace('\n', ' ') if not self.handler: self.logger.error('NO HANDLER CONFIGURED: %s', raw) return @@ -76,4 +75,4 @@ def _handle_raw_message(self, raw_msg): msg = json.loads(raw) self.handler(msg) - self._queue.delete_message(raw_msg) + raw_msg.delete() diff --git a/ingester/datalake_ingester/reporter.py b/ingester/datalake_ingester/reporter.py index 3517e0a..778b11c 100644 --- a/ingester/datalake_ingester/reporter.py +++ b/ingester/datalake_ingester/reporter.py @@ -1,4 +1,4 @@ -import boto.sns +import boto3 import simplejson as json import logging from memoized_property import memoized_property @@ -25,13 +25,10 @@ def _log_name(self): @memoized_property def _connection(self): - region = os.environ.get('AWS_REGION') - if region: - return boto.sns.connect_to_region(region) - else: - return boto.connect_sns() + region = os.environ.get('AWS_REGION', 'us-east-1') + return boto3.client('sns', region_name=region) def report(self, ingestion_report): message = json.dumps(ingestion_report) self.logger.info('REPORTING: %s', message) - self._connection.publish(topic=self.report_key, message=message) + self._connection.publish(TopicArn=self.report_key, Message=message) diff --git a/ingester/datalake_ingester/storage.py b/ingester/datalake_ingester/storage.py index 81838be..1c7a2da 100644 --- a/ingester/datalake_ingester/storage.py +++ b/ingester/datalake_ingester/storage.py @@ -14,9 +14,8 @@ from memoized_property import memoized_property -import boto.dynamodb2 -from boto.dynamodb2.table import Table -from boto.dynamodb2.exceptions import ConditionalCheckFailedException +import boto3 +from botocore.exceptions import ClientError import os from datalake.common.errors import InsufficientConfiguration import logging @@ -45,104 +44,94 @@ def _prepare_connection(self, connection): region = os.environ.get('AWS_REGION') if connection: self._connection = connection + self._client = connection.meta.client elif region: - self._connection = boto.dynamodb2.connect_to_region(region) + self._connection = boto3.resource('dynamodb', region_name=region) + self._client = self._connection.meta.client + else: msg = 'Please provide a connection or configure a region' raise InsufficientConfiguration(msg) @memoized_property def _table(self): - return Table(self.table_name, connection=self._connection) - + return self._connection.Table(self.table_name) + @memoized_property def _latest_table(self): - return Table(self.latest_table_name, connection=self._connection) + return self._connection.Table(self.latest_table_name) def store(self, record): try: - self._table.put_item(data=record) - except ConditionalCheckFailedException: - # Tolerate duplicate stores - pass + self._table.put_item(Item=record) + except ClientError as e: + if e.response['Error']['Code'] == 'ConditionalCheckFailedException': + pass + else: + raise if self.latest_table_name: self.store_latest(record) def update(self, record): - self._table.put_item(data=record, overwrite=True) + self._table.put_item(Item=record) def store_latest(self, record): """ - Record must utilize AttributeValue syntax - for the conditional put. + Store the latest record for a given what:where key with conditional put. """ - condition_expression = " attribute_not_exists(what_where_key) OR metadata.#metadata_start <= :new_start" + condition_expression = "attribute_not_exists(what_where_key) OR metadata.#metadata_start <= :new_start" + expression_attribute_values = { - ':new_start': {'N': str(record['metadata']['start'])} + ':new_start': record['metadata']['start'] } - # aliases for DynamoDB reserved names. expression_attribute_names = { '#metadata_start': "start" } + if record['metadata']['work_id'] is None: - work_id_value = {'NULL': True} + work_id_value = None else: - work_id_value = {'S': str(record['metadata']['work_id'])} + work_id_value = str(record['metadata']['work_id']) if record['metadata']['end'] is None: - end_time_value = {'NULL': True} + end_time_value = None else: - end_time_value = {'N': str(record['metadata']['end'])} + end_time_value = record['metadata']['end'] record = { - 'what_where_key': {"S": record['metadata']['what']+':'+record['metadata']['where']}, - 'time_index_key': {"S": record['time_index_key']}, - 'range_key': {"S": record['range_key']}, + 'what_where_key': record['metadata']['what']+':'+record['metadata']['where'], + 'time_index_key': record['time_index_key'], + 'range_key': record['range_key'], 'metadata': { - 'M': { - 'start': { - 'N': str(record['metadata']['start']) - }, - 'end': end_time_value, - 'id': { - 'S': str(record['metadata']['id']) - }, - 'path': { - 'S': str(record['metadata']['path']) - }, - 'hash': { - 'S': str(record['metadata']['hash']) - }, - 'version': { - 'N': str(record['metadata']['version']) - }, - 'what': { - 'S': str(record['metadata']['what']) - }, - 'where': { - 'S': str(record['metadata']['where']) - }, - 'work_id': work_id_value - } + 'start': record['metadata']['start'], + 'end': end_time_value, + 'id': str(record['metadata']['id']), + 'path': str(record['metadata']['path']), + 'hash': str(record['metadata']['hash']), + 'version': record['metadata']['version'], + 'what': str(record['metadata']['what']), + 'where': str(record['metadata']['where']), + 'work_id': work_id_value }, - 'url': {"S": record['url']}, - 'create_time': {'N': str(record['create_time'])} + 'url': record['url'], + 'create_time': record['create_time'] } self.logger.info(f"Attempting to store record: {record}") try: - self._connection.put_item( - table_name=self.latest_table_name, - item=record, - condition_expression=condition_expression, - expression_attribute_names=expression_attribute_names, - expression_attribute_values=expression_attribute_values, + self._latest_table.put_item( + Item=record, + ConditionExpression=condition_expression, + ExpressionAttributeNames=expression_attribute_names, + ExpressionAttributeValues=expression_attribute_values, ) self.logger.info("Record stored successfully.") - except ConditionalCheckFailedException: - self.logger.debug(f"Condition not met for record {record}," + except ClientError as e: + if e.response['Error']['Code'] == 'ConditionalCheckFailedException': + self.logger.debug(f"Condition not met for record {record}," "no operation was performed.") + else: + raise except Exception as e: self.logger.error(f"Error occurred while attempting {record}: {str(e)}") - diff --git a/ingester/setup.py b/ingester/setup.py index 792d5b4..2aff6f8 100644 --- a/ingester/setup.py +++ b/ingester/setup.py @@ -30,7 +30,7 @@ def get_version_from_pyver(): if 'sdist' in sys.argv or 'bdist_wheel' in sys.argv: raise ImportError('You must install pyver to create a package') else: - return 'noversion' + return '0.0.0' version, version_info = pyver.get_version(pkg="datalake_ingester", public=True) return version diff --git a/ingester/tests/conftest.py b/ingester/tests/conftest.py index 7c9fa9b..dd086d7 100644 --- a/ingester/tests/conftest.py +++ b/ingester/tests/conftest.py @@ -1,53 +1,53 @@ import pytest -from moto import ( - mock_sns_deprecated as mock_sns, - mock_sqs_deprecated as mock_sqs, - mock_dynamodb2_deprecated as mock_dynamodb2 -) +from moto import mock_aws import os import simplejson as json from glob import glob - -from boto.dynamodb2.table import Table -from boto.exception import JSONResponseError -from boto.dynamodb2.fields import HashKey, RangeKey -import boto.sns -import boto.sqs +import boto3 +from botocore.exceptions import ClientError from datalake.tests import * # noqa from datalake_ingester import SQSQueue - -@pytest.fixture -def dynamodb_connection(aws_connector): - return aws_connector(mock_dynamodb2, - lambda: boto.dynamodb2.connect_to_region('us-west-1')) - - def _delete_table_if_exists(conn, name): try: - table = Table(name, connection=conn) + table = conn.Table(name) table.delete() - except JSONResponseError as e: - if e.status == 400 and e.error_code == 'ResourceNotFoundException': - return - raise e + except ClientError as e: + if e.response['Error']['Code'] != 'ResourceNotFoundException': + raise + +@pytest.fixture +def dynamodb_connection(): + with mock_aws(): + yield boto3.resource('dynamodb', region_name='us-east-1') @pytest.fixture def dynamodb_table_maker(request, dynamodb_connection): - def table_maker(name, schema): + def table_maker(name, key_schema, attribute_definitions): _delete_table_if_exists(dynamodb_connection, name) - throughput = {'read': 5, 'write': 5} - table = Table.create(name, - schema=schema, - throughput=throughput, - connection=dynamodb_connection) + + table = dynamodb_connection.create_table( + TableName=name, + KeySchema=key_schema, + AttributeDefinitions=attribute_definitions, + ProvisionedThroughput={ + 'ReadCapacityUnits': 5, + 'WriteCapacityUnits': 5 + } + ) + + table.meta.client.get_waiter('table_exists').wait( + TableName=name, + WaiterConfig={'Delay': 1, 'MaxAttempts': 30} + ) def tear_down(): _delete_table_if_exists(dynamodb_connection, name) + request.addfinalizer(tear_down) return table @@ -58,44 +58,71 @@ def tear_down(): @pytest.fixture def dynamodb_users_table(dynamodb_table_maker): - schema = [HashKey('name'), RangeKey('last_name')] - return dynamodb_table_maker('users', schema) + key_schema = [ + {'AttributeName': 'name', 'KeyType': 'HASH'}, + {'AttributeName': 'last_name', 'KeyType': 'RANGE'} + ] + attribute_definitions = [ + {'AttributeName': 'name', 'AttributeType': 'S'}, + {'AttributeName': 'last_name', 'AttributeType': 'S'} + ] + return dynamodb_table_maker('users', key_schema, attribute_definitions) @pytest.fixture def dynamodb_records_table(dynamodb_table_maker): - schema = [HashKey('time_index_key'), RangeKey('range_key')] - return dynamodb_table_maker('records', schema) + key_schema = [ + {'AttributeName': 'time_index_key', 'KeyType': 'HASH'}, + {'AttributeName': 'range_key', 'KeyType': 'RANGE'} + ] + attribute_definitions = [ + {'AttributeName': 'time_index_key', 'AttributeType': 'S'}, + {'AttributeName': 'range_key', 'AttributeType': 'S'} + ] + return dynamodb_table_maker('records', key_schema, attribute_definitions) @pytest.fixture def dynamodb_latest_table(dynamodb_table_maker): - schema = [HashKey('what_where_key')] - return dynamodb_table_maker('latest', schema) + key_schema = [ + {'AttributeName': 'what_where_key', 'KeyType': 'HASH'} + ] + attribute_definitions = [ + {'AttributeName': 'what_where_key', 'AttributeType': 'S'} + ] + return dynamodb_table_maker('latest', key_schema, attribute_definitions) @pytest.fixture -def sns_connection(aws_connector): - return aws_connector(mock_sns, boto.connect_sns) +def sns_connection(): + with mock_aws(): + yield boto3.resource('sns', region_name='us-east-1') @pytest.fixture def sns_topic_arn(sns_connection): - topic = sns_connection.create_topic('foo') - return topic['CreateTopicResponse']['CreateTopicResult']['TopicArn'] + topic = sns_connection.create_topic(Name='foo') + return topic.arn @pytest.fixture -def sqs_connection(aws_connector): - return aws_connector(mock_sqs, boto.connect_sqs) +def sqs_connection(): + with mock_aws(): + yield boto3.resource('sqs', region_name='us-east-1') @pytest.fixture def bare_sqs_queue_maker(sqs_connection): def maker(queue_name): - return sqs_connection.get_queue(queue_name) or \ - sqs_connection.create_queue(queue_name) + try: + queue = sqs_connection.get_queue_by_name(QueueName=queue_name) + except ClientError as e: + if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue': + queue = sqs_connection.create_queue(QueueName=queue_name) + else: + raise + return queue return maker @@ -105,7 +132,7 @@ def sqs_queue_maker(bare_sqs_queue_maker): def maker(queue_name): q = bare_sqs_queue_maker(queue_name) - return SQSQueue(q.name) + return SQSQueue(queue_name) return maker @@ -125,8 +152,9 @@ def sqs_sender(bare_sqs_queue_maker): def sender(msg, queue_name='test-queue'): q = bare_sqs_queue_maker(queue_name) - msg = q.new_message(json.dumps(msg)) - q.write(msg) + q.send_message( + MessageBody=json.dumps(msg) + ) return sender diff --git a/ingester/tests/test_ingester.py b/ingester/tests/test_ingester.py index 3b7ccfe..d88b057 100644 --- a/ingester/tests/test_ingester.py +++ b/ingester/tests/test_ingester.py @@ -28,7 +28,7 @@ def test_ingest_random(storage, dynamodb_records_table, random_s3_file_maker): url, metadata = random_s3_file_maker() ingester = Ingester(storage) ingester.ingest(url) - records = [dict(r) for r in dynamodb_records_table.scan()] + records = [dict(r) for r in dynamodb_records_table.scan()['Items']] assert len(records) >= 1 for r in records: assert r['metadata'] == metadata @@ -38,10 +38,10 @@ def test_ingest_random_latest(storage, dynamodb_latest_table, random_s3_file_mak url, metadata = random_s3_file_maker() ingester = Ingester(storage) ingester.ingest(url) - records = [dict(r) for r in dynamodb_latest_table.scan()] + records = [dict(r) for r in dynamodb_latest_table.scan()['Items']] def convert_records(records): return {k: (decimal.Decimal(str(v)) if isinstance(v, (int, float)) else v) for k, v in records[0].items()} - + converted_records = convert_records(records) assert len(records) >= 1 for r in records: @@ -55,7 +55,7 @@ def test_ingest_no_end(storage, dynamodb_records_table, s3_file_from_metadata, s3_file_from_metadata(url, random_metadata) ingester = Ingester(storage) ingester.ingest(url) - records = [dict(r) for r in dynamodb_records_table.scan()] + records = [dict(r) for r in dynamodb_records_table.scan()['Items']] assert len(records) >= 1 # we expect a null end key to come back when the user leaves it out. @@ -97,7 +97,7 @@ def _sanitize(r): return r def comparator(expected_records): - records = [_sanitize(r) for r in dynamodb_records_table.scan()] + records = [_sanitize(r) for r in dynamodb_records_table.scan()['Items']] for r in expected_records: del(r['create_time']) assert dict_list_sorter(records) == dict_list_sorter(expected_records) @@ -113,8 +113,12 @@ class ReportListener(object): def __init__(self): self.messages = [] q = bare_sqs_queue_maker('reporter-queue') - self._queue = SQSQueue(q.name, self.handler) - sns_connection.subscribe_sqs_queue(sns_topic_arn, q) + self._queue = SQSQueue('reporter-queue', self.handler) + topic = sns_connection.Topic(sns_topic_arn) + topic.subscribe( + Protocol='sqs', + Endpoint=q.attributes['QueueArn'] + ) def handler(self, msg): self.messages.append(json.loads(msg['Message'])) diff --git a/ingester/tests/test_local_dynamodb.py b/ingester/tests/test_local_dynamodb.py index bf569f7..16b3179 100644 --- a/ingester/tests/test_local_dynamodb.py +++ b/ingester/tests/test_local_dynamodb.py @@ -19,7 +19,7 @@ def test_list_table(dynamodb_users_table, dynamodb_connection): - table_list = dynamodb_connection.list_tables() + table_list = dynamodb_connection.meta.client.list_tables() assert 'TableNames' in table_list table_list = table_list['TableNames'] assert len(table_list) == 1 diff --git a/ingester/tests/test_queue.py b/ingester/tests/test_queue.py index 6377a60..1545ed1 100644 --- a/ingester/tests/test_queue.py +++ b/ingester/tests/test_queue.py @@ -32,20 +32,21 @@ def __call__(self, msg): def test_sqs_queue_timeout(bare_sqs_queue, handler): - q = SQSQueue(bare_sqs_queue.name, handler) + queue_name = bare_sqs_queue.url.split('/')[-1] + q = SQSQueue(queue_name, handler) start = time.time() q.drain(timeout=1) duration = time.time() - start error = abs(duration - 1.0) - assert error < 0.1 + assert error < 0.12 assert handler.messages == [] def test_sqs_queue_drain(bare_sqs_queue, handler): - q = SQSQueue(bare_sqs_queue.name, handler) + queue_name = bare_sqs_queue.url.split('/')[-1] + q = SQSQueue(queue_name, handler) expected_msg = {'foo': 'bar'} - msg = bare_sqs_queue.new_message(json.dumps(expected_msg)) - bare_sqs_queue.write(msg) + bare_sqs_queue.send_message(MessageBody=json.dumps(expected_msg)) q.drain(timeout=1) assert handler.messages == [expected_msg] handler.messages = [] diff --git a/ingester/tests/test_reporter.py b/ingester/tests/test_reporter.py index 9b56a5e..96df35b 100644 --- a/ingester/tests/test_reporter.py +++ b/ingester/tests/test_reporter.py @@ -18,9 +18,14 @@ def test_snsreporter_sends(sns_connection, sns_topic_arn, bare_sqs_queue): - sns_connection.subscribe_sqs_queue(sns_topic_arn, bare_sqs_queue) + topic = sns_connection.Topic(sns_topic_arn) + topic.subscribe( + Protocol='sqs', + Endpoint=bare_sqs_queue.attributes['QueueArn'] + ) r = SNSReporter(sns_topic_arn) expected_msg = {'message': 'foo'} r.report(expected_msg) - msg = json.loads(bare_sqs_queue.read(1).get_body()) + messages = bare_sqs_queue.receive_messages(MaxNumberOfMessages=1) + msg = json.loads(messages[0].body) assert expected_msg == json.loads(msg['Message']) diff --git a/ingester/tests/test_storage.py b/ingester/tests/test_storage.py index 0aace6e..ed49eb7 100644 --- a/ingester/tests/test_storage.py +++ b/ingester/tests/test_storage.py @@ -6,7 +6,8 @@ def test_dynamodb_store(dynamodb_users_table, dynamodb_connection): storage = DynamoDBStorage('users', connection=dynamodb_connection) expected_user = {'name': 'John', 'last_name': 'Muir'} storage.store(expected_user) - user = dict(dynamodb_users_table.get_item(name='John', last_name='Muir')) + response = dynamodb_users_table.get_item(Key={'name': 'John', 'last_name': 'Muir'}) + user = dict(response['Item']) assert dict(user) == expected_user def test_store_duplicate(dynamodb_users_table, dynamodb_connection): @@ -14,7 +15,8 @@ def test_store_duplicate(dynamodb_users_table, dynamodb_connection): expected_user = {'name': 'Vanilla', 'last_name': 'Ice'} storage.store(expected_user) storage.store(expected_user) - user = dict(dynamodb_users_table.get_item(name='Vanilla', last_name='Ice')) + response = dynamodb_users_table.get_item(Key={'name': 'Vanilla', 'last_name': 'Ice'}) + user = dict(response['Item']) assert dict(user) == expected_user def test_insert_new_record(dynamodb_latest_table, dynamodb_connection): @@ -42,9 +44,10 @@ def test_insert_new_record(dynamodb_latest_table, dynamodb_connection): storage.store_latest(new_record) - stored_record = dynamodb_latest_table.get_item( - what_where_key=new_record['what_where_key'] + response = dynamodb_latest_table.get_item( + Key={'what_where_key': new_record['what_where_key']} ) + stored_record = response['Item'] assert stored_record['metadata']['start'] == new_record['metadata']['start'] @@ -82,7 +85,7 @@ def provide_test_records(): 'what': 'syslog', 'id': 'file2', 'hash': 'c5g3d8de24af342643d5b78a8f2b9b88' - + }, 'url': 's3://existingfile/url', 'create_time': 1314877177403 @@ -122,9 +125,10 @@ def test_store_conditional_put_latest_multiple_files(dynamodb_latest_table, dyna query_what_where = 'syslog:ground_server2' - records = [dict(i) for i in dynamodb_latest_table.scan()] + records = [dict(i) for i in dynamodb_latest_table.scan()['Items']] - res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where)) + response = dynamodb_latest_table.get_item(Key={'what_where_key': query_what_where}) + res = dict(response['Item']) assert res['metadata']['start'] == Decimal('1314877177413') assert len(records) == 2 assert file2 == res @@ -141,7 +145,8 @@ def test_store_conditional_put_newest_first(dynamodb_latest_table, dynamodb_conn query_what_where = 'syslog:ground_server2' - res = dict(dynamodb_latest_table.get_item(what_where_key=query_what_where)) + response = dynamodb_latest_table.get_item(Key={'what_where_key': query_what_where}) + res = dict(response['Item']) assert res['metadata']['id'] != file1['metadata']['id'] assert res['metadata']['id'] == file2['metadata']['id'] @@ -191,8 +196,9 @@ def test_verify_replace_record_same_start(dynamodb_latest_table, dynamodb_connec storage.store_latest(record1) storage.store_latest(record2) - stored_record = dynamodb_latest_table.get_item( - what_where_key="syslog:ground_server2" + response = dynamodb_latest_table.get_item( + Key={'what_where_key': "syslog:ground_server2"} ) + stored_record = response['Item'] assert stored_record['metadata']['start'] == record1['metadata']['start'] assert stored_record['metadata']['id'] == "abc123"