|
| 1 | +# |
| 2 | +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. |
| 3 | +# |
| 4 | + |
| 5 | +from dataclasses import InitVar, dataclass |
| 6 | +from typing import Any, Mapping, Optional |
| 7 | + |
| 8 | +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString |
| 9 | +from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader |
| 10 | + |
| 11 | + |
| 12 | +@dataclass |
| 13 | +class CustomFieldsSchemaLoader(SchemaLoader): |
| 14 | + config: Mapping[str, Any] |
| 15 | + default_schema_loader: Optional[SchemaLoader] = None |
| 16 | + fields_config_key: Optional[str] = None |
| 17 | + include_default_fields: Optional[bool] = False |
| 18 | + include_default_fields_config_key: Optional[bool] = False |
| 19 | + |
| 20 | + def get_json_schema(self) -> Mapping[str, Any]: |
| 21 | + """ |
| 22 | + Returns the JSON schema. |
| 23 | +
|
| 24 | + The final schema is constructed by first generating a schema for the fields |
| 25 | + in the config and, if default fields should be included, adding these to the |
| 26 | + schema. |
| 27 | + """ |
| 28 | + schema = self._get_json_schema_from_config() |
| 29 | + if self.default_schema_loader and self._include_default_fields: |
| 30 | + default_schema = self.default_schema_loader.get_json_schema() |
| 31 | + schema = self._union_schemas(default_schema, schema) |
| 32 | + return schema |
| 33 | + |
| 34 | + def _include_default_fields(self): |
| 35 | + if self.include_default_fields: |
| 36 | + return True |
| 37 | + else: |
| 38 | + return self.include_default_fields_config_key and self.config.get(self.include_default_fields_config_key, False) |
| 39 | + |
| 40 | + def _get_json_schema_from_config(self): |
| 41 | + if self.fields_config_key and self.config.get(self.fields_config_key, None): |
| 42 | + properties = { |
| 43 | + field.strip(): {"type": ["null", "string"]} |
| 44 | + for field in self.convert_custom_reports_fields_to_list(self.config.get(self.fields_config_key)) |
| 45 | + } |
| 46 | + else: |
| 47 | + properties = {} |
| 48 | + return { |
| 49 | + "$schema": "http://json-schema.org/draft-07/schema#", |
| 50 | + "type": "object", |
| 51 | + "properties": properties, |
| 52 | + } |
| 53 | + |
| 54 | + def convert_custom_reports_fields_to_list(self, custom_reports_fields: str) -> list: |
| 55 | + return custom_reports_fields.split(",") if custom_reports_fields else [] |
| 56 | + |
| 57 | + def _union_schemas(self, schema1, schema2): |
| 58 | + schema1["properties"] = {**schema1["properties"], **schema2["properties"]} |
| 59 | + return schema1 |
0 commit comments