diff --git a/.gitignore b/.gitignore index 14cb723..9aa5b22 100644 --- a/.gitignore +++ b/.gitignore @@ -90,3 +90,6 @@ ENV/ .ropeproject .pypirc + +# PyCharm +.idea/ diff --git a/setup.py b/setup.py index 4d658d3..4a881f7 100755 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ import subprocess setup(name="singer-python", - version='4.1.0', + version='5.0.3', description="Singer.io utility library", author="Stitch", classifiers=['Programming Language :: Python :: 3 :: Only'], diff --git a/singer/catalog.py b/singer/catalog.py index 9cbbe52..7807033 100644 --- a/singer/catalog.py +++ b/singer/catalog.py @@ -4,6 +4,7 @@ import sys from singer.schema import Schema +from jsonschema import Draft4Validator, FormatChecker # pylint: disable=too-many-instance-attributes class CatalogEntry(object): @@ -55,6 +56,8 @@ def to_dict(self): result['stream'] = self.stream if self.row_count is not None: result['row_count'] = self.row_count + if self.stream_alias is not None: + result['stream_alias'] = self.stream_alias if self.metadata is not None: result['metadata'] = self.metadata return result @@ -95,6 +98,7 @@ def from_dict(cls, data): entry.schema = Schema.from_dict(stream.get('schema')) entry.is_view = stream.get('is_view') entry.stream_alias = stream.get('stream_alias') + entry.metadata = stream.get('metadata') streams.append(entry) return Catalog(streams) @@ -109,3 +113,30 @@ def get_stream(self, tap_stream_id): if stream.tap_stream_id == tap_stream_id: return stream return None + + +CATALOG_SCHEMA = { + 'type': 'object', + 'required': ['streams'], + 'properties': { + 'streams' : { + 'type': 'array', + 'items': { + 'type': 'object', + 'required': ['stream', 'tap_stream_id', 'schema'], + 'properties': { + 'stream': {'type': 'string'}, + 'tap_stream_id': {'type': 'string'}, + 'schema': {'type': 'object'} + } + } + } + } +} + +CATALOG_VALIDATOR = Draft4Validator(CATALOG_SCHEMA, + format_checker=FormatChecker()) + +def write_catalog(streams): + CATALOG_VALIDATOR.validate(streams) + json.dump(streams, sys.stdout, indent=2) diff --git a/singer/messages.py b/singer/messages.py index fddd1de..330605f 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -1,6 +1,8 @@ import sys import simplejson as json - +import singer.utils as u +import dateutil +import pytz class Message(object): '''Base class for messages.''' @@ -37,10 +39,14 @@ class RecordMessage(Message): ''' - def __init__(self, stream, record, version=None): + def __init__(self, stream, record, version=None, time_extracted=None): self.stream = stream self.record = record self.version = version + self.time_extracted = time_extracted + if time_extracted and not time_extracted.tzinfo: + raise ValueError("'time_extracted' must be either None " + + "or an aware datetime (with a time zone)") def asdict(self): result = { @@ -50,6 +56,9 @@ def asdict(self): } if self.version is not None: result['version'] = self.version + if self.time_extracted: + as_utc = self.time_extracted.astimezone(pytz.utc) + result['time_extracted'] = as_utc.strftime(u.DATETIME_FMT) return result def __str__(self): @@ -76,18 +85,28 @@ class SchemaMessage(Message): >>> key_properties=['id']) ''' - def __init__(self, stream, schema, key_properties): + def __init__(self, stream, schema, key_properties, bookmark_properties=None): self.stream = stream self.schema = schema self.key_properties = key_properties + if isinstance(bookmark_properties, (str, bytes)): + bookmark_properties = [bookmark_properties] + if bookmark_properties and not isinstance(bookmark_properties, list): + raise Exception("bookmark_properties must be a string or list of strings") + + self.bookmark_properties = bookmark_properties + def asdict(self): - return { + result = { 'type': 'SCHEMA', 'stream': self.stream, 'schema': self.schema, 'key_properties': self.key_properties } + if self.bookmark_properties: + result['bookmark_properties'] = self.bookmark_properties + return result class StateMessage(Message): @@ -157,14 +176,20 @@ def parse_message(msg): msg_type = _required_key(obj, 'type') if msg_type == 'RECORD': + time_extracted = obj.get('time_extracted') + if time_extracted: + time_extracted = dateutil.parser.parse(time_extracted) return RecordMessage(stream=_required_key(obj, 'stream'), record=_required_key(obj, 'record'), - version=obj.get('version')) + version=obj.get('version'), + time_extracted=time_extracted) + elif msg_type == 'SCHEMA': return SchemaMessage(stream=_required_key(obj, 'stream'), schema=_required_key(obj, 'schema'), - key_properties=_required_key(obj, 'key_properties')) + key_properties=_required_key(obj, 'key_properties'), + bookmark_properties=obj.get('bookmark_properties')) elif msg_type == 'STATE': return StateMessage(value=_required_key(obj, 'value')) @@ -183,12 +208,14 @@ def write_message(message): sys.stdout.flush() -def write_record(stream_name, record, stream_alias=None): +def write_record(stream_name, record, stream_alias=None, time_extracted=None): """Write a single record for the given stream. >>> write_record("users", {"id": 2, "email": "mike@stitchdata.com"}) """ - write_message(RecordMessage(stream=(stream_alias or stream_name), record=record)) + write_message(RecordMessage(stream=(stream_alias or stream_name), + record=record, + time_extracted=time_extracted)) def write_records(stream_name, records): @@ -202,7 +229,7 @@ def write_records(stream_name, records): write_record(stream_name, record) -def write_schema(stream_name, schema, key_properties, stream_alias=None): +def write_schema(stream_name, schema, key_properties, bookmark_properties=None, stream_alias=None): """Write a schema message. >>> stream = 'test' @@ -214,11 +241,13 @@ def write_schema(stream_name, schema, key_properties, stream_alias=None): key_properties = [key_properties] if not isinstance(key_properties, list): raise Exception("key_properties must be a string or list of strings") + write_message( SchemaMessage( stream=(stream_alias or stream_name), schema=schema, - key_properties=key_properties)) + key_properties=key_properties, + bookmark_properties=bookmark_properties)) def write_state(value): diff --git a/singer/utils.py b/singer/utils.py index 5315484..048c138 100644 --- a/singer/utils.py +++ b/singer/utils.py @@ -11,7 +11,7 @@ from singer.catalog import Catalog DATETIME_PARSE = "%Y-%m-%dT%H:%M:%SZ" -DATETIME_FMT = "%04Y-%m-%dT%H:%M:%S.%fZ" +DATETIME_FMT = "%Y-%m-%dT%H:%M:%S.%fZ" def now(): return datetime.datetime.utcnow().replace(tzinfo=pytz.UTC) @@ -29,10 +29,10 @@ def strptime(dtime): except Exception: return datetime.datetime.strptime(dtime, DATETIME_PARSE) -def strftime(dtime): +def strftime(dtime, format_str=DATETIME_FMT): if dtime.utcoffset() != datetime.timedelta(0): raise Exception("datetime must be pegged at UTC tzoneinfo") - return dtime.strftime(DATETIME_FMT) + return dtime.strftime(format_str) def ratelimit(limit, every): def limitdecorator(func): diff --git a/tests/test_catalog.py b/tests/test_catalog.py index 61a3901..271f993 100644 --- a/tests/test_catalog.py +++ b/tests/test_catalog.py @@ -1,15 +1,15 @@ import unittest +import singer.catalog from singer.schema import Schema from singer.catalog import Catalog, CatalogEntry -class TestToDictAndFromDict(unittest.TestCase): - - dict_form = { +dict_form = { 'streams': [ { 'stream': 'users', 'tap_stream_id': 'prod_users', + 'stream_alias': 'users_alias', 'database_name': 'prod', 'table_name': 'users', 'schema': { @@ -20,6 +20,17 @@ class TestToDictAndFromDict(unittest.TestCase): 'name': {'type': 'string', 'selected': True} } }, + 'metadata': [ + { + 'metadata': { + 'metadata-key': 'metadata-value' + }, + 'breadcrumb': [ + 'properties', + 'name', + ], + }, + ], }, { 'stream': 'orders', @@ -38,10 +49,11 @@ class TestToDictAndFromDict(unittest.TestCase): ] } - obj_form = Catalog(streams=[ +obj_form = Catalog(streams=[ CatalogEntry( stream='users', tap_stream_id='prod_users', + stream_alias='users_alias', database='prod', table='users', schema=Schema( @@ -49,7 +61,16 @@ class TestToDictAndFromDict(unittest.TestCase): selected=True, properties={ 'id': Schema(type='integer', selected=True), - 'name': Schema(type='string', selected=True)})), + 'name': Schema(type='string', selected=True)}), + metadata=[{ + 'metadata': { + 'metadata-key': 'metadata-value' + }, + 'breadcrumb': [ + 'properties', + 'name', + ], + }]), CatalogEntry( stream='orders', tap_stream_id='prod_orders', @@ -62,11 +83,15 @@ class TestToDictAndFromDict(unittest.TestCase): 'id': Schema(type='integer', selected=True), 'amount': Schema(type='number', selected=True)}))]) + + + +class TestToDictAndFromDict(unittest.TestCase): def test_from_dict(self): - self.assertEqual(self.obj_form, Catalog.from_dict(self.dict_form)) + self.assertEqual(obj_form, Catalog.from_dict(dict_form)) def test_to_dict(self): - self.assertEqual(self.dict_form, self.obj_form.to_dict()) + self.assertEqual(dict_form, obj_form.to_dict()) class TestGetStream(unittest.TestCase): @@ -77,3 +102,7 @@ def test(self): CatalogEntry(tap_stream_id='c')]) entry = catalog.get_stream('b') self.assertEquals('b', entry.tap_stream_id) + +class TestWriteCatalog(unittest.TestCase): + def test(self): + singer.catalog.write_catalog(dict_form) diff --git a/tests/test_singer.py b/tests/test_singer.py index 252f294..f820b37 100644 --- a/tests/test_singer.py +++ b/tests/test_singer.py @@ -1,6 +1,7 @@ import singer import unittest - +import datetime +import dateutil class TestSinger(unittest.TestCase): def test_parse_message_record_good(self): @@ -17,6 +18,23 @@ def test_parse_message_record_with_version_good(self): message, singer.RecordMessage(record={'name': 'foo'}, stream='users', version=2)) + def test_parse_message_record_naive_extraction_time(self): + with self.assertRaisesRegex(ValueError, "must be either None or an aware datetime"): + message = singer.parse_message( + '{"type": "RECORD", "record": {"name": "foo"}, "stream": "users", "version": 2, "time_extracted": "1970-01-02T00:00:00"}') + + def test_parse_message_record_aware_extraction_time(self): + message = singer.parse_message( + '{"type": "RECORD", "record": {"name": "foo"}, "stream": "users", "version": 2, "time_extracted": "1970-01-02T00:00:00.000Z"}') + expected = singer.RecordMessage( + record={'name': 'foo'}, + stream='users', + version=2, + time_extracted=dateutil.parser.parse("1970-01-02T00:00:00.000Z")) + print(message) + print(expected) + self.assertEqual(message, expected) + def test_parse_message_record_missing_record(self): with self.assertRaises(Exception): singer.parse_message('{"type": "RECORD", "stream": "users"}') diff --git a/tests/test_utils.py b/tests/test_utils.py index cd7fc42..0b89448 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -6,5 +6,5 @@ class TestFormat(unittest.TestCase): def test_small_years(self): - self.assertEqual(u.strftime(dt(90, 1, 1, tzinfo=tz.utc)), + self.assertEqual(u.strftime(dt(90, 1, 1, tzinfo=tz.utc), '%04Y-%m-%dT%H:%M:%S.%fZ'), "0090-01-01T00:00:00.000000Z")