|
15 | 15 | import re |
16 | 16 | from collections import OrderedDict |
17 | 17 | from concurrent.futures import Future |
18 | | -from typing import Any, Dict, Iterable, List |
| 18 | +from typing import Any, Dict, Iterable, List, FrozenSet, Tuple |
19 | 19 |
|
20 | 20 | import agate |
21 | 21 | import dbt.exceptions |
22 | 22 | from dbt.adapters.base.impl import catch_as_completed |
23 | 23 | from dbt.adapters.sql import SQLAdapter |
24 | | -from dbt.clients import agate_helper |
25 | | -from dbt.clients.agate_helper import ColumnTypeBuilder, NullableAgateType, _NullMarker |
26 | | -from dbt.events import AdapterLogger |
27 | | -from dbt.utils import executor |
| 24 | +from dbt_common.clients import agate_helper |
| 25 | +from dbt_common.clients.agate_helper import ColumnTypeBuilder, NullableAgateType, _NullMarker |
| 26 | +from dbt.adapters.events.logging import AdapterLogger |
| 27 | +from dbt_common.utils import executor |
| 28 | +from dbt.adapters.contracts.relation import RelationConfig |
28 | 29 |
|
29 | 30 | import dbt.adapters.impala.cloudera_tracking as tracker |
30 | 31 | from dbt.adapters.impala import ImpalaConnectionManager |
@@ -275,23 +276,23 @@ def parse_columns_from_information(self, relation: ImpalaRelation) -> List[Impal |
275 | 276 |
|
276 | 277 | return columns |
277 | 278 |
|
278 | | - def get_catalog(self, manifest): |
279 | | - schema_map = self._get_catalog_schemas(manifest) |
| 279 | + def get_catalog( |
| 280 | + self, relation_configs: Iterable[RelationConfig], used_schemas: FrozenSet[Tuple[str, str]] |
| 281 | + ): |
| 282 | + schema_map = self._get_catalog_schemas(relation_configs) |
280 | 283 |
|
281 | 284 | with executor(self.config) as tpe: |
282 | 285 | futures: List[Future[agate.Table]] = [] |
283 | 286 | for info, schemas in schema_map.items(): |
284 | 287 | for schema in schemas: |
285 | 288 | futures.append( |
286 | | - tpe.submit_connected( |
287 | | - self, schema, self._get_one_catalog, info, [schema], manifest |
288 | | - ) |
| 289 | + tpe.submit_connected(self, schema, self._get_one_catalog, info, [schema]) |
289 | 290 | ) |
290 | 291 | catalogs, exceptions = catch_as_completed(futures) # call the default implementation |
291 | 292 |
|
292 | 293 | return catalogs, exceptions |
293 | 294 |
|
294 | | - def _get_one_catalog(self, information_schema, schemas, manifest) -> agate.Table: |
| 295 | + def _get_one_catalog(self, information_schema, schemas) -> agate.Table: |
295 | 296 | if len(schemas) != 1: |
296 | 297 | dbt.exceptions.raise_compiler_error( |
297 | 298 | f"Expected only one schema in ImpalaAdapter._get_one_catalog, found " f"{schemas}" |
|
0 commit comments