Skip to content

Commit 2b2a85e

Browse files
authored
Fix discoverable metadata
1 parent 2915acf commit 2b2a85e

File tree

1 file changed

+32
-78
lines changed

1 file changed

+32
-78
lines changed

tap_frontapp/__init__.py

Lines changed: 32 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -15,132 +15,88 @@
1515

1616
LOGGER = singer.get_logger()
1717

18-
#def check_authorization(atx):
19-
# atx.client.get('/settings')
20-
21-
22-
# Some taps do discovery dynamically where the catalog is read in from a
23-
# call to the api but with the odd frontapp structure, we won't do that
24-
# here we never use atx in here since the schema is from file but we
25-
# would use it if we pulled schema from the API def discover(atx):
26-
2718

2819
def discover():
2920
catalog = Catalog([])
30-
21+
3122
# Build initial catalog from schema files
3223
for tap_stream_id in schemas.STATIC_SCHEMA_STREAM_IDS:
3324
LOGGER.info("tap stream id=%s", tap_stream_id)
3425
schema = Schema.from_dict(schemas.load_schema(tap_stream_id))
3526
metadata = []
36-
37-
# Stream-level metadata select the stream
27+
28+
# Add discoverable stream-level metadata
3829
metadata.append({
3930
"metadata": {
40-
"selected": True # Make sure to select every stream
31+
"inclusion": "available",
32+
"selected-by-default": True,
33+
"inclusion-reason": "automatic"
4134
},
4235
"breadcrumb": []
4336
})
44-
45-
# Field level metadata with inclusion type
37+
38+
# Add discoverable field-level metadata
4639
for field_name in schema.properties.keys():
47-
if field_name in schemas.PK_FIELDS[tap_stream_id]:
48-
inclusion = 'automatic'
49-
else:
50-
inclusion = 'available'
40+
inclusion = "automatic" if field_name in schemas.PK_FIELDS[tap_stream_id] else "available"
5141
metadata.append({
5242
"metadata": {
53-
"inclusion": inclusion
43+
"inclusion": inclusion,
44+
"selected-by-default": True,
45+
"inclusion-reason": "manual"
5446
},
55-
"breadcrumb": ['properties', field_name]
47+
"breadcrumb": ["properties", field_name]
5648
})
57-
49+
5850
catalog.streams.append(CatalogEntry(
5951
stream=tap_stream_id,
6052
tap_stream_id=tap_stream_id,
6153
key_properties=schemas.PK_FIELDS[tap_stream_id],
6254
schema=schema,
6355
metadata=metadata
6456
))
65-
66-
# Creating a dict to change before converting
57+
6758
catalog_dict = catalog.to_dict()
68-
69-
required_streams = [
70-
"accounts_table",
71-
"channels_table",
72-
"inboxes_table",
73-
"tags_table",
74-
"teammates_table",
75-
"teams_table"
76-
]
77-
78-
# We verify this to ensure all mandatory streams are available even if schema files are missing
79-
present_streams = {stream['tap_stream_id'] for stream in catalog_dict['streams']}
80-
81-
# Ensure all required streams are included even if schema is missing
82-
for stream_name in required_streams:
83-
if stream_name not in present_streams:
59+
present_streams = {stream["tap_stream_id"] for stream in catalog_dict["streams"]}
8460

61+
# Use METRIC_API_DESCRIPTION_KEY keys instead of hardcoded list
62+
for stream_name in streams.METRIC_API_DESCRIPTION_KEY.keys():
63+
if stream_name not in present_streams:
8564
LOGGER.info("Adding missing required stream: %s", stream_name)
86-
87-
# Create a minimal stream entry that will be visible in the output
88-
catalog_dict['streams'].append({
65+
catalog_dict["streams"].append({
8966
"stream": stream_name,
9067
"tap_stream_id": stream_name,
9168
"schema": {
92-
"type": ['null', 'object'],
69+
"type": ["null", "object"],
9370
"properties": {},
9471
"additionalProperties": False
9572
},
9673
"key_properties": [],
9774
"metadata": [
9875
{
9976
"metadata": {
100-
"selected": True
77+
"inclusion": "available",
78+
"selected-by-default": True,
79+
"inclusion-reason": "automatic"
10180
},
10281
"breadcrumb": []
10382
}
10483
]
10584
})
106-
107-
# This ensure singer tools recognize all streams in the catalog dictionary
108-
for stream in catalog_dict['streams']:
109-
has_selection = False
110-
for metadata_item in stream.get('metadata', []):
111-
if metadata_item.get('breadcrumb') == [] and metadata_item.get('metadata', {}).get('selected') is True:
112-
has_selection = True
113-
break
114-
115-
if not has_selection:
116-
LOGGER.info("Adding selection metadata to stream: %s", stream['tap_stream_id'])
117-
stream.setdefault('metadata', []).insert(0, {
118-
"metadata": {
119-
"selected": True
120-
},
121-
"breadcrumb": []
122-
})
123-
124-
# Convert back to a Catalog object
125-
modified_catalog = Catalog.from_dict(catalog_dict)
12685

127-
128-
return modified_catalog
86+
return Catalog.from_dict(catalog_dict)
12987

13088

13189
def get_abs_path(path: str):
132-
"""Returns absolute path for URL."""
90+
"""Returns absolute path for a given relative path."""
13391
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)
13492

13593

136-
# this is already defined in schemas.py though w/o dependencies
13794
def load_schema(tap_stream_id):
138-
path = "schemas/{}.json".format(tap_stream_id)
95+
"""Loads schema from JSON file, resolving dependencies."""
96+
path = f"schemas/{tap_stream_id}.json"
13997
schema = utils.load_json(get_abs_path(path))
14098
dependencies = schema.pop("tap_schema_dependencies", [])
141-
refs = {}
142-
for sub_stream_id in dependencies:
143-
refs[sub_stream_id] = load_schema(sub_stream_id)
99+
refs = {sub_id: load_schema(sub_id) for sub_id in dependencies}
144100
if refs:
145101
singer.resolve_schema_references(schema, refs)
146102
return schema
@@ -149,7 +105,6 @@ def load_schema(tap_stream_id):
149105
def sync(atx):
150106
for tap_stream_id in schemas.STATIC_SCHEMA_STREAM_IDS:
151107
schemas.load_and_write_schema(tap_stream_id)
152-
153108
streams.sync_selected_streams(atx)
154109

155110

@@ -158,13 +113,12 @@ def main():
158113
args = utils.parse_args(REQUIRED_CONFIG_KEYS)
159114
atx = Context(args.config, args.state)
160115
if args.discover:
161-
# the schema is static from file so we don't need to pass in atx for connection info.
162116
catalog = discover()
163117
json.dump(catalog.to_dict(), sys.stdout)
164118
else:
165-
atx.catalog = Catalog.from_dict(args.properties) \
166-
if args.properties else discover()
119+
atx.catalog = Catalog.from_dict(args.properties) if args.properties else discover()
167120
sync(atx)
168121

122+
169123
if __name__ == "__main__":
170-
main()
124+
main()

0 commit comments

Comments
 (0)