Skip to content

Commit 5b41105

Browse files
Fix timedelta serialization (#1198)
Co-authored-by: Hugh Wells <hugh.wells@wise.com>
1 parent 3228292 commit 5b41105

4 files changed

Lines changed: 69 additions & 2 deletions

File tree

singer-connectors/tap-mysql/tap_mysql/sync_strategies/binlog.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,10 @@ def row_to_singer_record(catalog_entry, version, db_column_map, row, time_extrac
224224
elif isinstance(val, datetime.timedelta):
225225
if property_format == 'time':
226226
# this should convert time column into 'HH:MM:SS' formatted string
227-
row_to_persist[column_name] = str(val)
227+
_total_seconds = int(val.total_seconds())
228+
_hours, _remainder = divmod(_total_seconds, 3600)
229+
_minutes, _seconds = divmod(_remainder, 60)
230+
row_to_persist[column_name] = f"{_hours:02}:{_minutes:02}:{_seconds:02}"
228231
else:
229232
timedelta_from_epoch = datetime.datetime.utcfromtimestamp(0) + val
230233
row_to_persist[column_name] = timedelta_from_epoch.isoformat() + '+00:00'

singer-connectors/tap-mysql/tap_mysql/sync_strategies/common.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,10 @@ def row_to_singer_record(catalog_entry, version, row, columns, time_extracted):
103103

104104
elif isinstance(elem, datetime.timedelta):
105105
if property_format == 'time':
106-
row_to_persist += (str(elem),) # this should convert time column into 'HH:MM:SS' formatted string
106+
_total_seconds = int(elem.total_seconds())
107+
_hours, _remainder = divmod(_total_seconds, 3600)
108+
_minutes, _seconds = divmod(_remainder, 60)
109+
row_to_persist += (f"{_hours:02}:{_minutes:02}:{_seconds:02}",) # this should convert time column into 'HH:MM:SS' formatted string
107110
else:
108111
epoch = datetime.datetime.utcfromtimestamp(0)
109112
timedelta_from_epoch = epoch + elem

singer-connectors/tap-mysql/tests/unit/sync_strategies/test_binlog.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2147,3 +2147,29 @@ def test_calculate_gtid_bookmark_for_mysql_no_gtid_found_expect_exception(self):
21472147

21482148
self.assertEqual("Couldn't find any gtid in state bookmarks to resume logical replication",
21492149
str(context.exception))
2150+
2151+
def test_row_to_singer_record(self):
2152+
catalog_entry = CatalogEntry(
2153+
stream='stream',
2154+
schema=Schema.from_dict({
2155+
'type': 'object',
2156+
'properties': {
2157+
'time': {
2158+
'type': 'string',
2159+
'format': 'time',
2160+
},
2161+
},
2162+
}),
2163+
)
2164+
message = binlog.row_to_singer_record(
2165+
catalog_entry,
2166+
version=1,
2167+
row={'time': datetime.timedelta(hours=8, minutes=30)},
2168+
db_column_map={},
2169+
time_extracted=datetime.datetime.now(datetime.timezone.utc),
2170+
)
2171+
2172+
assert message.stream == 'stream'
2173+
assert message.version == 1
2174+
assert message.record == {'time': '08:30:00'}
2175+
assert message.time_extracted is not None
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import datetime
2+
3+
from singer.catalog import CatalogEntry
4+
from singer.schema import Schema
5+
6+
from tap_mysql.sync_strategies import common
7+
8+
9+
class TestCommonSyncStrategyHelpers:
10+
11+
def test_row_to_singer_record(self):
12+
catalog_entry = CatalogEntry(
13+
stream='stream',
14+
schema=Schema.from_dict({
15+
'type': 'object',
16+
'properties': {
17+
'time': {
18+
'type': 'string',
19+
'format': 'time',
20+
},
21+
},
22+
}),
23+
)
24+
message = common.row_to_singer_record(
25+
catalog_entry,
26+
version=1,
27+
row=(datetime.timedelta(hours=8, minutes=30),),
28+
columns=['time'],
29+
time_extracted=datetime.datetime.now(datetime.timezone.utc),
30+
)
31+
32+
assert message.stream == 'stream'
33+
assert message.version == 1
34+
assert message.record == {'time': '08:30:00'}
35+
assert message.time_extracted is not None

0 commit comments

Comments
 (0)