Skip to content

Commit 72a110e

Browse files
committed
Tech Debt: Upgrading boto to boto3 for api & ingester. Needed to use newer moto needed by client.
1 parent 31f5104 commit 72a110e

15 files changed

Lines changed: 271 additions & 152 deletions

File tree

.github/workflows/actions.yml

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ on: [push, pull_request]
33

44
jobs:
55
test-client:
6-
runs-on: ubuntu-20.04
6+
runs-on: ubuntu-latest
7+
timeout-minutes: 30 # Add this line
78
strategy:
89
matrix:
910
python: [3.8, 3.9, "3.10", 3.12]
1011
extras: ["test", "test,queuable,sentry"]
1112
steps:
1213
- name: Setup Python
13-
uses: actions/setup-python@v2.2.2
14+
uses: actions/setup-python@v5
1415
with:
1516
python-version: ${{ matrix.python }}
1617
- name: Check out repository code
@@ -20,13 +21,16 @@ jobs:
2021
- name: Test
2122
working-directory: ./client
2223
run: |
23-
pip install -e .[${{ matrix.extras }}]
24-
py.test
24+
pip --version
25+
pip install --verbose .[${{ matrix.extras }}]
26+
27+
# Run pytest with specific path and import mode
28+
python -m pytest --import-mode=importlib ./datalake/tests/
2529
test-docker:
2630
runs-on: ubuntu-latest
2731
steps:
2832
- name: Check out repository code
29-
uses: actions/checkout@v2
33+
uses: actions/checkout@v4
3034
with:
3135
fetch-depth: 0
3236
- name: Test

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,6 @@ target/
5858

5959
# build artifacts
6060
version.txt
61-
.claude/settings.local.json
61+
62+
# Claude Code
63+
.claude/

Dockerfile

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,34 @@ ENV LC_ALL C.UTF-8
77

88
# TODO: keep requirements in one place
99
RUN pip install \
10-
blinker>=1.4 \
11-
boto3>=1.1.3 \
12-
click>=5.1 \
13-
Flask>=0.10.1 \
14-
flask-swagger>=0.2.14 \
15-
memoized_property>=1.0.1 \
16-
python-dateutil>=2.4.2 \
17-
python-dotenv>=0.1.3 \
18-
pytz>=2015.4 \
19-
sentry-sdk[flask]>=0.19.5 \
20-
requests>=2.5 \
21-
simplejson>=3.3.1 \
22-
six>=1.10.0 \
23-
# test requirements
10+
'boto==2.49.0' \
11+
'boto3==1.35.41' \
12+
'botocore==1.35.64' \
13+
'datalake<2' \
2414
'flake8>=2.5.0,<4.1' \
2515
'freezegun<1' \
26-
'moto<3' \
16+
'moto>=5,<6' \
2717
'pytest<8' \
18+
'pytest-xdist' \
2819
'responses<0.22.0' \
29-
pyinotify>=0.9.4, \
30-
raven>=5.0.0 \
3120
'tox>4,<5' \
32-
'datalake<2'
21+
# test requirements
22+
'pytest-cov>=2.5.1,<4' \
23+
'blinker>=1.4' \
24+
'click>=5.1' \
25+
'flask-swagger>=0.2.14' \
26+
'Flask>=0.10.1' \
27+
'memoized_property>=1.0.1' \
28+
'pyinotify>=0.9.4' \
29+
python-dateutil>=2.4.2 \
30+
python-dotenv>=0.1.3 \
31+
pytz>=2015.4 \
32+
raven>=5.0.0 \
33+
requests>=2.5 \
34+
sentry-sdk[flask]>=0.19.5 \
35+
simplejson>=3.3.1 \
36+
six>=1.10.0
37+
3338

3439
RUN mkdir -p /opt/
3540
COPY . /opt/

api/tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
ClientError as BotoClientError,
1919
NoCredentialsError
2020
)
21-
from moto import mock_dynamodb2
21+
from moto import mock_aws
2222

2323
from datalake_api import app as datalake_api
2424
from datalake.tests import * # noqa
@@ -71,7 +71,7 @@ def client():
7171

7272
@pytest.fixture
7373
def dynamodb(request):
74-
mock = mock_dynamodb2()
74+
mock = mock_aws()
7575
mock.start()
7676

7777
def tear_down():

client/datalake/tests/conftest.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,10 @@
1919
import six
2020

2121

22-
try:
23-
from moto import mock_s3
24-
import boto3
25-
from six.moves.urllib.parse import urlparse
26-
import json
27-
except ImportError:
28-
# if developers use s3-test features without having installed s3 stuff,
29-
# things will fail. So it goes.
30-
pass
22+
from moto import mock_aws
23+
import boto3
24+
from six.moves.urllib.parse import urlparse
25+
import json
3126

3227

3328
@pytest.fixture
@@ -142,7 +137,7 @@ def tear_down():
142137

143138
@pytest.fixture
144139
def s3_connection(aws_connector):
145-
with mock_s3():
140+
with mock_aws():
146141
yield boto3.resource('s3')
147142

148143

client/pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ dynamic = ["version"]
3737
test = [
3838
'pytest<8.0.0',
3939
'pytest-cov>=2.5.1,<4',
40-
'moto[s3]>4,<5',
40+
'moto[s3]>5,<6',
4141
'twine<4.0.0',
4242
'pip>=20.0.0,<22.0.0',
4343
'wheel<0.38.0',
@@ -73,7 +73,7 @@ distance-dirty = "{base_version}+{distance}.{vcs}{rev}.dirty"
7373
# Example formatted version: 1.2.3+42.ge174a1f.dirty
7474

7575
[tool.pytest.ini_options]
76-
addopts = "--cov=planet.mc_client --cov-config .coveragerc"
76+
addopts = "--cov=datalake --cov-config .coveragerc"
7777
markers = [
7878
"slow: marks tests as slow (deselect with '-m \"not slow\"')"
7979
]

ingester/datalake_ingester/queue.py

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
# the License.
1414

1515
from memoized_property import memoized_property
16-
import boto.sqs
16+
import boto3
17+
from botocore.exceptions import ClientError
1718
import simplejson as json
1819
import logging
1920
import os
@@ -39,16 +40,16 @@ def set_handler(self, h):
3940
self.handler = h
4041

4142
@memoized_property
42-
def _queue(self):
43-
return self._connection.get_queue(self.queue_name)
43+
def _connection(self):
44+
# boto2 defaulted to us-east-1
45+
region = os.environ.get('AWS_REGION', 'us-east-1')
46+
return boto3.resource('sqs', region_name=region)
4447

4548
@memoized_property
46-
def _connection(self):
47-
region = os.environ.get('AWS_REGION')
48-
if region:
49-
return boto.sqs.connect_to_region(region)
50-
else:
51-
return boto.connect_sqs()
49+
def _queue(self):
50+
# Let boto3 raise ClientError naturally if queue doesn't exist
51+
# The error will include the queue name and a clear error code
52+
return self._connection.get_queue_by_name(QueueName=self.queue_name)
5253

5354
_LONG_POLL_TIMEOUT = 20
5455

@@ -57,23 +58,38 @@ def drain(self, timeout=None):
5758
'''
5859
long_poll_timeout = timeout or self._LONG_POLL_TIMEOUT
5960
while True:
60-
raw_msg = self._queue.read(wait_time_seconds=long_poll_timeout)
61-
if raw_msg is None:
61+
messages = self._queue.receive_messages(
62+
MaxNumberOfMessages=1,
63+
WaitTimeSeconds=long_poll_timeout
64+
)
65+
if not messages:
6266
if timeout:
6367
return
6468
else:
6569
continue
66-
self._handle_raw_message(raw_msg)
70+
self._handle_raw_message(messages[0])
6771

6872
def _handle_raw_message(self, raw_msg):
6973
# eliminate newlines in raw message so it all logs to one line
70-
raw = raw_msg.get_body().replace('\n', ' ')
74+
raw = raw_msg.body.replace('\n', ' ')
7175
if not self.handler:
7276
self.logger.error('NO HANDLER CONFIGURED: %s', raw)
7377
return
7478

7579
self.logger.info('RECEIVED: %s', raw)
76-
msg = json.loads(raw)
7780

81+
try:
82+
msg = json.loads(raw)
83+
except Exception as e:
84+
self.logger.warning('Failed to parse JSON (%s), using raw message: %s', str(e), raw)
85+
msg = raw
86+
87+
# Process the message with the handler
88+
# If this fails, the message stays in the queue and will be retried
89+
# after the visibility timeout expires
7890
self.handler(msg)
79-
self._queue.delete_message(raw_msg)
91+
92+
# Only delete the message after successful processing
93+
# This ensures message reliability - if processing fails above,
94+
# the message remains in the queue for retry
95+
raw_msg.delete()

ingester/datalake_ingester/reporter.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import boto.sns
1+
import boto3
22
import simplejson as json
33
import logging
44
from memoized_property import memoized_property
@@ -25,13 +25,11 @@ def _log_name(self):
2525

2626
@memoized_property
2727
def _connection(self):
28-
region = os.environ.get('AWS_REGION')
29-
if region:
30-
return boto.sns.connect_to_region(region)
31-
else:
32-
return boto.connect_sns()
28+
# boto2 defaulted to us-east-1
29+
region = os.environ.get('AWS_REGION', 'us-east-1')
30+
return boto3.client('sns', region_name=region)
3331

3432
def report(self, ingestion_report):
3533
message = json.dumps(ingestion_report)
3634
self.logger.info('REPORTING: %s', message)
37-
self._connection.publish(topic=self.report_key, message=message)
35+
self._connection.publish(TopicArn=self.report_key, Message=message)

ingester/datalake_ingester/storage.py

Lines changed: 44 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,8 @@
1414

1515

1616
from memoized_property import memoized_property
17-
import boto.dynamodb2
18-
from boto.dynamodb2.table import Table
19-
from boto.dynamodb2.exceptions import ConditionalCheckFailedException
17+
import boto3
18+
from botocore.exceptions import ClientError
2019
import os
2120
from datalake.common.errors import InsufficientConfiguration
2221
import logging
@@ -42,43 +41,55 @@ def from_config(cls):
4241

4342
def _prepare_connection(self, connection):
4443
self.logger.info("Preparing connection...")
45-
region = os.environ.get('AWS_REGION')
44+
# boto2 defaulted to us-east-1, maintaining consistency across all AWS services
45+
region = os.environ.get('AWS_REGION', 'us-east-1')
4646
if connection:
47+
# When connection is provided from outside, we need to ensure _client is set
4748
self._connection = connection
48-
elif region:
49-
self._connection = boto.dynamodb2.connect_to_region(region)
49+
50+
# Check if _connection has a client attribute (added in our tests)
51+
# or create a new client if it doesn't
52+
if hasattr(connection, 'client'):
53+
self._client = connection.client
54+
else:
55+
self._client = boto3.client('dynamodb', region_name=region)
5056
else:
51-
msg = 'Please provide a connection or configure a region'
52-
raise InsufficientConfiguration(msg)
57+
self._connection = boto3.resource('dynamodb', region_name=region)
58+
self._client = boto3.client('dynamodb', region_name=region)
5359

5460
@memoized_property
5561
def _table(self):
56-
return Table(self.table_name, connection=self._connection)
57-
62+
return self._connection.Table(self.table_name)
63+
5864
@memoized_property
5965
def _latest_table(self):
60-
return Table(self.latest_table_name, connection=self._connection)
66+
return self._connection.Table(self.latest_table_name)
6167

6268
def store(self, record):
6369
try:
64-
self._table.put_item(data=record)
65-
except ConditionalCheckFailedException:
66-
# Tolerate duplicate stores
67-
pass
70+
self._table.put_item(Item=record)
71+
except ClientError as e:
72+
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
73+
# Tolerate duplicate stores
74+
pass
75+
else:
76+
raise
6877
if self.latest_table_name:
6978
self.store_latest(record)
7079

7180
def update(self, record):
72-
self._table.put_item(data=record, overwrite=True)
81+
self._table.put_item(Item=record)
7382

7483
def store_latest(self, record):
7584
"""
7685
Record must utilize AttributeValue syntax
7786
for the conditional put.
7887
"""
79-
condition_expression = " attribute_not_exists(what_where_key) OR metadata.#metadata_start <= :new_start"
88+
# Boto3 requires different parameter naming: condition_expression -> ConditionExpression
89+
condition_expression = "attribute_not_exists(what_where_key) OR metadata.#metadata_start <= :new_start"
90+
8091
expression_attribute_values = {
81-
':new_start': {'N': str(record['metadata']['start'])}
92+
':new_start': {'N': str(record['metadata']['start'])} # Must use typed dict here
8293
}
8394

8495
# aliases for DynamoDB reserved names.
@@ -132,17 +143,21 @@ def store_latest(self, record):
132143
}
133144
self.logger.info(f"Attempting to store record: {record}")
134145
try:
135-
self._connection.put_item(
136-
table_name=self.latest_table_name,
137-
item=record,
138-
condition_expression=condition_expression,
139-
expression_attribute_names=expression_attribute_names,
140-
expression_attribute_values=expression_attribute_values,
146+
self._client.put_item(
147+
TableName=self.latest_table_name,
148+
Item=record,
149+
ConditionExpression=condition_expression,
150+
ExpressionAttributeNames=expression_attribute_names,
151+
ExpressionAttributeValues=expression_attribute_values,
141152
)
142153
self.logger.info("Record stored successfully.")
143-
except ConditionalCheckFailedException:
144-
self.logger.debug(f"Condition not met for record {record},"
145-
"no operation was performed.")
146-
except Exception as e:
147-
self.logger.error(f"Error occurred while attempting {record}: {str(e)}")
154+
except ClientError as e:
155+
# Handle conditional check failures (expected when record is older than existing one)
156+
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
157+
self.logger.debug(f"Condition not met for record {record}, "
158+
"no operation was performed.")
159+
else:
160+
# All other AWS/boto3 errors - log and re-raise
161+
self.logger.error(f"Error occurred while attempting {record}: {str(e)}")
162+
raise
148163

0 commit comments

Comments
 (0)