|
| 1 | +import singer |
1 | 2 | from singer.catalog import Catalog, CatalogEntry, Schema |
| 3 | +from tap_impact.client import ImpactForbiddenError |
2 | 4 | from tap_impact.schema import get_schemas |
3 | | -from tap_impact.streams import flatten_streams |
| 5 | +from tap_impact.streams import STREAMS, flatten_streams |
4 | 6 |
|
5 | | -def discover(config): |
| 7 | +LOGGER = singer.get_logger() |
| 8 | + |
| 9 | + |
| 10 | +def _get_child_to_parent_map(): |
| 11 | + """Returns a mapping of child stream name -> parent stream name.""" |
| 12 | + mapping = {} |
| 13 | + for parent_name, parent_config in STREAMS.items(): |
| 14 | + for child_name in parent_config.get('children', {}): |
| 15 | + mapping[child_name] = parent_name |
| 16 | + return mapping |
| 17 | + |
| 18 | + |
| 19 | +def _check_stream_access(client, stream_name, path): |
| 20 | + """Return True if the stream is accessible, False if 403 Forbidden.""" |
| 21 | + try: |
| 22 | + client.request('GET', path=path, params={'PageSize': 1}, endpoint=stream_name) |
| 23 | + return True |
| 24 | + except ImpactForbiddenError: |
| 25 | + LOGGER.warning( |
| 26 | + "Stream '%s' does not have read permission, excluding from catalog.", |
| 27 | + stream_name, |
| 28 | + ) |
| 29 | + return False |
| 30 | + |
| 31 | + |
| 32 | +def _prune_inaccessible_children(schemas, field_metadata, child_to_parent): |
| 33 | + """Remove child streams from the catalog whose parent stream was excluded.""" |
| 34 | + for child_name, parent_name in child_to_parent.items(): |
| 35 | + if child_name in schemas and parent_name not in schemas: |
| 36 | + LOGGER.warning( |
| 37 | + "Stream '%s' excluded from catalog because its parent stream '%s' is not accessible.", |
| 38 | + child_name, parent_name, |
| 39 | + ) |
| 40 | + schemas.pop(child_name) |
| 41 | + field_metadata.pop(child_name) |
| 42 | + |
| 43 | + |
| 44 | +def _apply_access_checks(client, schemas, field_metadata): |
| 45 | + """ |
| 46 | + Probe each parent stream for read access and remove inaccessible streams |
| 47 | + (and their children) from schemas and field_metadata in place. |
| 48 | + Raises ImpactForbiddenError if no parent streams are accessible. |
| 49 | + """ |
| 50 | + inaccessible_streams = [ |
| 51 | + stream_name |
| 52 | + for stream_name, stream_config in STREAMS.items() |
| 53 | + if stream_name in schemas |
| 54 | + and not _check_stream_access(client, stream_name, stream_config['path']) |
| 55 | + ] |
| 56 | + |
| 57 | + for stream_name in inaccessible_streams: |
| 58 | + schemas.pop(stream_name, None) |
| 59 | + field_metadata.pop(stream_name, None) |
| 60 | + |
| 61 | + child_to_parent = _get_child_to_parent_map() |
| 62 | + _prune_inaccessible_children(schemas, field_metadata, child_to_parent) |
| 63 | + |
| 64 | + if inaccessible_streams: |
| 65 | + total_parent_streams = len(STREAMS) |
| 66 | + if len(inaccessible_streams) == total_parent_streams: |
| 67 | + raise ImpactForbiddenError( |
| 68 | + "HTTP-error-code: 403, Error: The account credentials supplied do not have 'read' access to any " |
| 69 | + "of the streams supported by the tap. Data collection cannot be initiated due to lack of permissions." |
| 70 | + ) |
| 71 | + LOGGER.warning( |
| 72 | + "The account credentials supplied do not have 'read' access to the following stream(s): %s. " |
| 73 | + "These streams have been excluded from the catalog.", |
| 74 | + ", ".join(inaccessible_streams), |
| 75 | + ) |
| 76 | + |
| 77 | + |
| 78 | +def discover(client, config): |
| 79 | + """ |
| 80 | + Run the discovery mode, prepare the catalog and return it. |
| 81 | + Access to each parent stream is verified using the provided client and |
| 82 | + streams the credentials cannot read are excluded from the returned catalog. |
| 83 | + """ |
6 | 84 | model_id = config.get('model_id') |
7 | 85 | schemas, field_metadata = get_schemas() |
| 86 | + _apply_access_checks(client, schemas, field_metadata) |
8 | 87 | catalog = Catalog([]) |
9 | 88 |
|
10 | 89 | flat_streams = flatten_streams() |
|
0 commit comments