|
17 | 17 | import logging |
18 | 18 | import os |
19 | 19 | from contextlib import asynccontextmanager |
20 | | -from contextvars import ContextVar |
21 | 20 | from typing import Literal |
22 | 21 |
|
23 | 22 | import asyncpg |
24 | 23 | import geojson |
25 | | -from asyncpg import Pool, Record |
| 24 | +from asyncpg import Record |
26 | 25 | from geojson import Feature, FeatureCollection, MultiPolygon |
27 | 26 |
|
28 | 27 | from ohsome_quality_api.config import get_config_value |
|
32 | 31 | WORKING_DIR = os.path.dirname(os.path.abspath(__file__)) |
33 | 32 |
|
34 | 33 |
|
35 | | -OQAPIDB_POOL: ContextVar[Pool] = ContextVar("OQAPIDB_POOL") |
36 | | -OHSOMEDB_POOL: ContextVar[Pool] = ContextVar("OHSOMEDB_POOL") |
37 | | - |
38 | | - |
39 | 34 | def log_query(record): |
40 | 35 | logger.debug("Query:\n" + record.query) |
41 | 36 | logger.debug("Args:\n" + str(record.args)) |
42 | 37 |
|
43 | 38 |
|
44 | | -@asynccontextmanager |
45 | | -async def create_pool(): |
46 | | - # DSN in libpq connection URI format |
47 | | - oqapidb_dsn = "postgres://{user}:{password}@{host}:{port}/{database}".format( |
48 | | - host=get_config_value("postgres_host"), |
49 | | - port=get_config_value("postgres_port"), |
50 | | - database=get_config_value("postgres_db"), |
51 | | - user=get_config_value("postgres_user"), |
52 | | - password=get_config_value("postgres_password"), |
53 | | - ) |
54 | | - ohsomedb_dsn = "postgres://{user}:{password}@{host}:{port}/{database}".format( |
55 | | - host=get_config_value("ohsomedb_host"), |
56 | | - port=get_config_value("ohsomedb_port"), |
57 | | - database=get_config_value("ohsomedb_db"), |
58 | | - user=get_config_value("ohsomedb_user"), |
59 | | - password=get_config_value("ohsomedb_password"), |
60 | | - ) |
61 | | - async with ( |
62 | | - asyncpg.create_pool(oqapidb_dsn) as oqapidb_pool, |
63 | | - asyncpg.create_pool(ohsomedb_dsn) as ohsomedb_pool, |
64 | | - ): |
65 | | - sql = 'set search_path to "global_2026-04-13",public' |
66 | | - await oqapidb_pool.execute(sql) |
67 | | - await ohsomedb_pool.execute(sql) |
68 | | - oqapidb_pool_token = OQAPIDB_POOL.set(oqapidb_pool) |
69 | | - ohsomedb_pool_token = OHSOMEDB_POOL.set(ohsomedb_pool) |
70 | | - try: |
71 | | - yield |
72 | | - finally: |
73 | | - OQAPIDB_POOL.reset(oqapidb_pool_token) |
74 | | - OHSOMEDB_POOL.reset(ohsomedb_pool_token) |
75 | | - |
76 | | - |
77 | 39 | @asynccontextmanager |
78 | 40 | async def get_connection(database: Literal["oqapidb", "ohsomedb"] = "oqapidb"): |
| 41 | + # DNS in libpq connection URI format |
79 | 42 | match database: |
80 | 43 | case "oqapidb": |
81 | | - pool = OQAPIDB_POOL.get() |
| 44 | + dns = "postgres://{user}:{password}@{host}:{port}/{database}".format( |
| 45 | + host=get_config_value("postgres_host"), |
| 46 | + port=get_config_value("postgres_port"), |
| 47 | + database=get_config_value("postgres_db"), |
| 48 | + user=get_config_value("postgres_user"), |
| 49 | + password=get_config_value("postgres_password"), |
| 50 | + ) |
82 | 51 | case "ohsomedb": |
83 | | - pool = OHSOMEDB_POOL.get() |
84 | | - async with pool.acquire() as conn: |
85 | | - try: |
86 | | - with conn.query_logger(log_query): |
87 | | - yield conn |
88 | | - finally: |
89 | | - await conn.close() |
| 52 | + dns = "postgres://{user}:{password}@{host}:{port}/{database}".format( |
| 53 | + host=get_config_value("ohsomedb_host"), |
| 54 | + port=get_config_value("ohsomedb_port"), |
| 55 | + database=get_config_value("ohsomedb_db"), |
| 56 | + user=get_config_value("ohsomedb_user"), |
| 57 | + password=get_config_value("ohsomedb_password"), |
| 58 | + ) |
| 59 | + case _: |
| 60 | + raise ValueError() |
| 61 | + conn = await asyncpg.connect(dns) |
| 62 | + await conn.execute('set search_path to "global_2026-04-13",public') |
| 63 | + try: |
| 64 | + with conn.query_logger(log_query): |
| 65 | + yield conn |
| 66 | + finally: |
| 67 | + await conn.close() |
90 | 68 |
|
91 | 69 |
|
92 | 70 | async def fetch( |
|
0 commit comments