-
Couldn't load subscription status.
- Fork 128
Handling empty schema #88
Description
Current Issue
According to singer docs here
Schemas are required, but they can be defined in the broadest terms - a JSON Schema of '{}' validates all data points.
But right now,pipelinewise-target-snowflakewould break if fed withSCHEMAmessages that define an empty schema.
In my use case, the source was pipelinewise-tap-salesforce. In this tap, when salesforce defines a field with datatype anyType, like with History objects, the schema will be empty in singer.
This is a sample stream of messages generated by the salesforce tap:
{"type": "STATE", "value": {"current_stream": "AccountHistory"}}
{"type": "SCHEMA", "stream": "AccountHistory", "schema": {"additionalProperties": false, "properties": {"AccountId": {"type": ["null", "string"]}, "CreatedById": {"type": ["null", "string"]}, "CreatedDate": {"anyOf": [{"format": "date-time", "type": "string"}, {"type": ["string", "null"]}]}, "Field": {"type": ["null", "string"]}, "Id": {"type": "string"}, "IsDeleted": {"type": ["null", "boolean"]}, "NewValue": {}, "OldValue": {}}, "type": "object"}, "key_properties": ["Id"], "bookmark_properties": ["CreatedDate"]}
{"type": "ACTIVATE_VERSION", "stream": "AccountHistory", "version": 1593166361466}
{"type": "RECORD", "stream": "AccountHistory", "record": {"AccountId": "WoYq3nK08PuOT5rYkR", "CreatedById": "4lkUFTtWU6ktX19cpe", "CreatedDate": "2016-12-07T21:28:07.000000Z", "Field": "accountUpdatedByLead", "Id": "wWZS7XwK5Bs5EFZwYC", "IsDeleted": false, "NewValue": null, "OldValue": null}, "version": 1593166361466, "time_extracted": "2020-06-26T10:12:41.467536Z"}
{"type": "STATE", "value": {"current_stream": "AccountHistory", "bookmarks": {"AccountHistory": {"version": 1593166361466, "CreatedDate": "2016-12-07T21:28:07.000000Z"}}}}
{"type": "RECORD", "stream": "AccountHistory", "record": {"AccountId": "KFO6SPVsoURdtmlxg6", "CreatedById": "pM1Ui74VCiijGHSXYs", "CreatedDate": "2016-12-08T02:42:23.000000Z", "Field": "Website", "Id": "dAyRQJr9RQ1YVIVh6g", "IsDeleted": false, "NewValue": "example.com", "OldValue": null}, "version": 1593166361466, "time_extracted": "2020-06-26T10:12:41.467536Z"}
{"type": "STATE", "value": {"current_stream": "AccountHistory", "bookmarks": {"AccountHistory": {"version": 1593166361466, "CreatedDate": "2016-12-08T02:42:23.000000Z"}}}}
{"type": "RECORD", "stream": "AccountHistory", "record": {"AccountId": "uIpoNCSWjcCebhMCG9", "CreatedById": "jidWq314JDa92a0Dnt", "CreatedDate": "2016-12-09T16:36:38.000000Z", "Field": "Some_Field__c", "Id": "6AoUgVJF8IhQTlIGMp", "IsDeleted": false, "NewValue": true, "OldValue": false}, "version": 1593166361466, "time_extracted": "2020-06-26T10:12:41.467536Z"}
{"type": "STATE", "value": {"current_stream": "AccountHistory", "bookmarks": {"AccountHistory": {"version": 1593166361466, "CreatedDate": "2016-12-09T16:36:38.000000Z"}}}}
{"type": "RECORD", "stream": "AccountHistory", "record": {"AccountId": "wfmR8DzpDpbiDeY2jj", "CreatedById": "XUGP2VNVDXHsRxpVbf", "CreatedDate": "2016-12-10T20:10:23.000000Z", "Field": "Another_Field__c", "Id": "aJgpPh9JkrRyIaMxu3", "IsDeleted": false, "NewValue": 120, "OldValue": 100}, "version": 1593166361466, "time_extracted": "2020-06-26T10:12:41.467536Z"}
{"type": "STATE", "value": {"current_stream": "AccountHistory", "bookmarks": {"AccountHistory": {"version": 1593166361466, "CreatedDate": "2016-12-10T20:10:23.000000Z"}}}}
It is possible to see in the SCHEMA message: "NewValue": {}, "OldValue": {}
When this sequence of message is piped to pipelinewise-target-snowflake, a KeyError exception is risen:
Traceback (most recent call last):
File "/home/vagrant/pipelinewise/.virtualenvs/target-snowflake/bin/target-snowflake", line 33, in <module>
sys.exit(load_entry_point('pipelinewise-target-snowflake', 'console_scripts', 'target-snowflake')())
File "/vagrant/pipelinewise-target-snowflake/target_snowflake/__init__.py", line 446, in main
persist_lines(config, singer_messages, table_cache)
File "/vagrant/pipelinewise-target-snowflake/target_snowflake/__init__.py", line 189, in persist_lines
adjust_timestamps_in_record(o['record'], schemas[stream])
File "/vagrant/pipelinewise-target-snowflake/target_snowflake/__init__.py", line 148, in adjust_timestamps_in_record
if 'string' in schema['properties'][key]['type'] and \
KeyError: 'type'
Proposed Solution
The proposed solution is to fix the bugs that cause the KeyError exceptions in the code, and at the same time implement a way to treat schemaless fields as snowflake Variant datatype.