Skip to content

Commit 741b1a5

Browse files
pjoshi30Preetam Joshiromain-intelwangchy27saikonen
authored
Added the ability to separate out reads and writes into their own connection pools. (Netflix#344)
* Changes for using a separate reader pool for Aurora-like use cases * Avoid some expensive logging operations when not needed * Refactoring execute_sql implementations and separating reader/writer endpoints choosing the right pool in execute_sql * Adding documentation for using separate reader pools * use [PREFIX]_READ_REPLICA_HOST as a feature gate instead of localhost * In a previous commit, the detection of a failure became too aggressive. This remediates this by considering a run 'failed' if the hb hasn't been updated within heartbeat_cutoff time as opposed to the heartbeat_threshold time * Patch pjoshi aurora (Netflix#395) * Upgrade Github actions used in `dockerimage` action (Netflix#379) * upgrade github actions used in dockerimage action * remove setup-buildx-action and pin to hashes. * change deprecated pkg_resources to importlib.metadata (Netflix#387) * In a previous commit, the detection of a failure became too aggressive. (Netflix#386) * In a previous commit, the detection of a failure became too aggressive. This remediates this by considering a run 'failed' if the hb hasn't been updated within heartbeat_cutoff time as opposed to the heartbeat_threshold time * change run finished at query to heartbeat_cutoff from threshold * clean up unused values from run query --------- Co-authored-by: Sakari Ikonen <[email protected]> * fix PATH_PREFIX handling in metadata service so it doesn't interfere with mfgui routes (Netflix#388) * Configurable SSL Connection (Netflix#373) * [TRIS-297] Configurable SSL Connection (#1) * Configurable SSL connection * Update services/utils/__init__.py * no ssl unit testing (#3) * ssl seperate test (Netflix#4) * dsn generator sslmode none (Netflix#5) * fix run_goose.py not working without SSL mode env variables. (Netflix#390) * change run inactive cutoff default to 6 minutes. cleanup unused constant (Netflix#392) * clarify comment on read replica hosts * make USE_SEPARATE_READER_POOL a boolean * remove unnecessary conditionals for pool choice in execute_sql --------- Co-authored-by: Tom Furmston <[email protected]> Co-authored-by: Romain <[email protected]> Co-authored-by: Oleg Avdeev <[email protected]> Co-authored-by: RikishK <[email protected]> * fix broken connection string after conflict resolve * make codestyles happy * fix test cases * cleanup * merge run_goose.py from master * revert unnecessary changes --------- Co-authored-by: Preetam Joshi <[email protected]> Co-authored-by: Romain Cledat <[email protected]> Co-authored-by: Chaoying Wang <[email protected]> Co-authored-by: Sakari Ikonen <[email protected]> Co-authored-by: Tom Furmston <[email protected]> Co-authored-by: Romain <[email protected]> Co-authored-by: Oleg Avdeev <[email protected]> Co-authored-by: RikishK <[email protected]> Co-authored-by: Sakari Ikonen <[email protected]>
1 parent 65d41dd commit 741b1a5

File tree

13 files changed

+140
-96
lines changed

13 files changed

+140
-96
lines changed

services/data/postgres_async_db.py

+51-21
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
import math
88
import re
99
import time
10-
from services.utils import logging
10+
from services.utils import logging, DBType
1111
from typing import List, Tuple
1212

1313
from .db_utils import DBResponse, DBPagination, aiopg_exception_handling, \
1414
get_db_ts_epoch_str, translate_run_key, translate_task_key, new_heartbeat_ts
1515
from .models import FlowRow, RunRow, StepRow, TaskRow, MetadataRow, ArtifactRow
16-
from services.utils import DBConfiguration
16+
from services.utils import DBConfiguration, USE_SEPARATE_READER_POOL
1717

1818
from services.data.service_configs import max_connection_retires, \
1919
connection_retry_wait_time_seconds
@@ -49,6 +49,7 @@ class _AsyncPostgresDB(object):
4949
metadata_table_postgres = None
5050

5151
pool = None
52+
reader_pool = None
5253
db_conf: DBConfiguration = None
5354

5455
def __init__(self, name='global'):
@@ -77,21 +78,42 @@ async def _init(self, db_conf: DBConfiguration, create_triggers=DB_TRIGGER_CREAT
7778
for i in range(retries):
7879
try:
7980
self.pool = await aiopg.create_pool(
80-
db_conf.dsn,
81+
db_conf.get_dsn(),
8182
minsize=db_conf.pool_min,
8283
maxsize=db_conf.pool_max,
8384
timeout=db_conf.timeout,
8485
pool_recycle=10 * db_conf.timeout,
8586
echo=AIOPG_ECHO)
8687

88+
self.reader_pool = await aiopg.create_pool(
89+
db_conf.get_dsn(type=DBType.READER),
90+
minsize=db_conf.pool_min,
91+
maxsize=db_conf.pool_max,
92+
timeout=db_conf.timeout,
93+
pool_recycle=10 * db_conf.timeout,
94+
echo=AIOPG_ECHO) if USE_SEPARATE_READER_POOL else self.pool
95+
8796
for table in self.tables:
8897
await table._init(create_triggers=create_triggers)
8998

90-
self.logger.info(
91-
"Connection established.\n"
92-
" Pool min: {pool_min} max: {pool_max}\n".format(
93-
pool_min=self.pool.minsize,
94-
pool_max=self.pool.maxsize))
99+
if USE_SEPARATE_READER_POOL:
100+
self.logger.info(
101+
"Writer Connection established.\n"
102+
" Pool min: {pool_min} max: {pool_max}\n".format(
103+
pool_min=self.pool.minsize,
104+
pool_max=self.pool.maxsize))
105+
106+
self.logger.info(
107+
"Reader Connection established.\n"
108+
" Pool min: {pool_min} max: {pool_max}\n".format(
109+
pool_min=self.reader_pool.minsize,
110+
pool_max=self.reader_pool.maxsize))
111+
else:
112+
self.logger.info(
113+
"Connection established.\n"
114+
" Pool min: {pool_min} max: {pool_max}\n".format(
115+
pool_min=self.pool.minsize,
116+
pool_max=self.pool.maxsize))
95117

96118
break # Break the retry loop
97119
except Exception as e:
@@ -211,15 +233,20 @@ async def find_records(self, conditions: List[str] = None, values=[], fetch_sing
211233

212234
async def execute_sql(self, select_sql: str, values=[], fetch_single=False,
213235
expanded=False, limit: int = 0, offset: int = 0,
214-
cur: aiopg.Cursor = None) -> Tuple[DBResponse, DBPagination]:
236+
cur: aiopg.Cursor = None, serialize: bool = True) -> Tuple[DBResponse, DBPagination]:
215237
async def _execute_on_cursor(_cur):
216238
await _cur.execute(select_sql, values)
217239

218240
rows = []
219241
records = await _cur.fetchall()
220-
for record in records:
221-
row = self._row_type(**record) # pylint: disable=not-callable
222-
rows.append(row.serialize(expanded))
242+
if serialize:
243+
for record in records:
244+
# pylint-initial-ignore: Lack of __init__ makes this too hard for pylint
245+
# pylint: disable=not-callable
246+
row = self._row_type(**record)
247+
rows.append(row.serialize(expanded))
248+
else:
249+
rows = records
223250

224251
count = len(rows)
225252

@@ -232,17 +259,20 @@ async def _execute_on_cursor(_cur):
232259
page=math.floor(int(offset) / max(int(limit), 1)) + 1,
233260
)
234261
return body, pagination
235-
if cur:
236-
# if we are using the passed in cursor, we allow any errors to be managed by cursor owner
237-
body, pagination = await _execute_on_cursor(cur)
238-
return DBResponse(response_code=200, body=body), pagination
262+
239263
try:
240-
with (await self.db.pool.cursor(
241-
cursor_factory=psycopg2.extras.DictCursor
242-
)) as cur:
264+
if cur:
265+
# if we are using the passed in cursor, we allow any errors to be managed by cursor owner
243266
body, pagination = await _execute_on_cursor(cur)
244-
cur.close() # unsure if needed, leaving in there for safety
245-
return DBResponse(response_code=200, body=body), pagination
267+
return DBResponse(response_code=200, body=body), pagination
268+
else:
269+
db_pool = self.db.reader_pool if USE_SEPARATE_READER_POOL else self.db.pool
270+
with (await db_pool.cursor(
271+
cursor_factory=psycopg2.extras.DictCursor
272+
)) as cur:
273+
body, pagination = await _execute_on_cursor(cur)
274+
cur.close()
275+
return DBResponse(response_code=200, body=body), pagination
246276
except IndexError as error:
247277
return aiopg_exception_handling(error), None
248278
except (Exception, psycopg2.DatabaseError) as error:

services/metadata_service/tests/integration_tests/utils.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
from typing import Callable
33

44
import pytest
5+
import psycopg2
6+
import psycopg2.extras
57
from aiohttp import web
68
from services.data.postgres_async_db import AsyncPostgresDB
79
from services.utils.tests import get_test_dbconf
@@ -67,8 +69,11 @@ async def clean_db(db: AsyncPostgresDB):
6769
db.run_table_postgres,
6870
db.flow_table_postgres
6971
]
70-
for table in tables:
71-
await table.execute_sql(select_sql="DELETE FROM {}".format(table.table_name))
72+
with (await db.pool.cursor(
73+
cursor_factory=psycopg2.extras.DictCursor
74+
)) as cur:
75+
for table in tables:
76+
await table.execute_sql(select_sql="DELETE FROM {}".format(table.table_name), cur=cur)
7277

7378

7479
@pytest.fixture

services/migration_service/api/admin.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ async def upgrade(self, request):
6969
description: could not upgrade
7070
"""
7171
goose_version_cmd = make_goose_migration_template(
72-
db_conf.connection_string_url,
72+
db_conf.connection_string_url(),
7373
"up"
7474
)
7575
p = Popen(goose_version_cmd, shell=True,

services/migration_service/api/utils.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def get_unapplied_migrations(current_version):
2525
@staticmethod
2626
async def get_goose_version():
2727
# if tables exist but goose doesn't find version table then
28-
goose_version_cmd = make_goose_template(db_conf.connection_string_url, 'version')
28+
goose_version_cmd = make_goose_template(db_conf.connection_string_url(), 'version')
2929

3030
p = Popen(goose_version_cmd, stdout=PIPE, stderr=PIPE, shell=True,
3131
close_fds=True)
@@ -55,7 +55,7 @@ async def get_latest_compatible_version():
5555
return version_dict[version]
5656
else:
5757
print("Running initial migration..", file=sys.stderr)
58-
goose_version_cmd = make_goose_migration_template(db_conf.connection_string_url, 'up')
58+
goose_version_cmd = make_goose_migration_template(db_conf.connection_string_url(), 'up')
5959
p = Popen(goose_version_cmd, shell=True,
6060
close_fds=True)
6161
if p.wait() != 0:
@@ -65,7 +65,7 @@ async def get_latest_compatible_version():
6565
@staticmethod
6666
async def is_migration_in_progress():
6767
goose_version_cmd = make_goose_template(
68-
db_conf.connection_string_url, "status"
68+
db_conf.connection_string_url(), "status"
6969
)
7070

7171
p = Popen(goose_version_cmd, stdout=PIPE, stderr=PIPE, shell=True,

services/migration_service/data/postgres_async_db.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ async def _init(self, db_conf: DBConfiguration):
4040
retries = 3
4141
for i in range(retries):
4242
try:
43-
self.pool = await aiopg.create_pool(db_conf.dsn, timeout=db_conf.timeout)
43+
self.pool = await aiopg.create_pool(db_conf.get_dsn(), timeout=db_conf.timeout)
4444
except Exception as e:
4545
print("printing connection exception: " + str(e))
4646
if retries - i < 1:

services/ui_backend_service/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ The UI service module is `services.ui_backend_service.ui_server`:
2424
> $ /opt/latest/bin/python3 -m services.ui_backend_service.ui_server
2525
> ```
2626
27-
Below is an Docker run command for running UI Service exposed at port 8083:
27+
Below is a Docker run command for running UI Service exposed at port 8083:
2828
2929
> ```sh
3030
> $ docker run \
@@ -60,7 +60,7 @@ The service depends on the following Environment Variables to be set:
6060
- `MF_METADATA_DB_PSWD` [defaults to postgres]
6161
- `MF_METADATA_DB_NAME` [defaults to postgres]
6262
63-
Optionally you can also overrider the host and port the service runs on:
63+
Optionally you can also override the host and port the service runs on:
6464
6565
- `MF_UI_METADATA_PORT` [defaults to 8083]
6666
- `MF_UI_METADATA_HOST` [defaults to 0.0.0.0]

services/ui_backend_service/data/db/postgres_async_db.py

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ class AsyncPostgresDB(BaseAsyncPostgresDB):
3535
metadata_table_postgres = None
3636

3737
pool = None
38+
reader_pool = None
3839
db_conf: DBConfiguration = None
3940

4041
def __init__(self, name='global'):

services/ui_backend_service/data/db/tables/base.py

+1-50
Original file line numberDiff line numberDiff line change
@@ -269,56 +269,7 @@ async def benchmark_sql(
269269
self.db.logger.exception("Query Benchmarking failed")
270270
return None
271271

272-
async def execute_sql(
273-
self,
274-
select_sql: str,
275-
values=[],
276-
fetch_single=False,
277-
expanded=False,
278-
limit: int = 0,
279-
offset: int = 0,
280-
serialize: bool = True,
281-
) -> Tuple[DBResponse, DBPagination]:
282-
try:
283-
with (
284-
await self.db.pool.cursor(cursor_factory=psycopg2.extras.DictCursor)
285-
) as cur:
286-
await cur.execute(select_sql, values)
287-
288-
rows = []
289-
records = await cur.fetchall()
290-
if serialize:
291-
for record in records:
292-
# pylint-initial-ignore: Lack of __init__ makes this too hard for pylint
293-
# pylint: disable=not-callable
294-
row = self._row_type(**record)
295-
rows.append(row.serialize(expanded))
296-
else:
297-
rows = records
298-
299-
count = len(rows)
300-
301-
# Will raise IndexError in case fetch_single=True and there's no results
302-
body = rows[0] if fetch_single else rows
303-
304-
pagination = DBPagination(
305-
limit=limit,
306-
offset=offset,
307-
count=count,
308-
page=math.floor(int(offset) / max(int(limit), 1)) + 1,
309-
)
310-
311-
cur.close()
312-
return DBResponse(response_code=200, body=body), pagination
313-
except IndexError as error:
314-
return aiopg_exception_handling(error), None
315-
except (Exception, psycopg2.DatabaseError) as error:
316-
self.db.logger.exception("Exception occured")
317-
return aiopg_exception_handling(error), None
318-
319-
async def get_tags(
320-
self, conditions: List[str] = None, values=[], limit: int = 0, offset: int = 0
321-
):
272+
async def get_tags(self, conditions: List[str] = None, values=[], limit: int = 0, offset: int = 0):
322273
sql_template = """
323274
SELECT DISTINCT tag
324275
FROM (

services/ui_backend_service/docs/environment.md

+25
Original file line numberDiff line numberDiff line change
@@ -186,3 +186,28 @@ The `MF_LOG_LOAD_POLICY` environment variable restricts the amount of log conten
186186
## Card content restriction
187187
188188
The `MF_CARD_LOAD_POLICY` (default `full`) environment variable can be set to `blurb_only` to return a Python code snippet to access card using Metaflow client, instead of loading actual HTML card payload.
189+
190+
191+
## Scaling reads using read replicas
192+
193+
Databases such as [Amazon Aurora](https://aws.amazon.com/rds/aurora/) provide
194+
[read replicas](https://aws.amazon.com/rds/features/read-replicas/) that make it easy to elastically scale beyond
195+
the capacity constraints of single database instance for heavy read workloads. You are able to separate out the reads
196+
and the writes of this application by setting the following two environment variables:
197+
198+
>```
199+
> USE_SEPARATE_READER_POOL = 1
200+
> MF_METADATA_DB_READ_REPLICA_HOST = <READ_REPLICA_ENDPOINT>
201+
>```
202+
203+
As the name suggests, the `USE_SEPARATE_READER_POOL` variable creates a separate read pool with the same
204+
min/max pool size as the writer pool. It is also required to set this variable `MF_METADATA_DB_READ_REPLICA_HOST` to
205+
point to the read replica endpoint that is typically a load balancer in front of all the database's read replicas.
206+
207+
### Accounting for eventual consistency
208+
209+
When a read replica is created, there is a lag between the time a transaction is committed to the writer instance and
210+
the time when the newly written data is available in the read replica. In Amazon Aurora, this [lag is usually much less
211+
than 100ms](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/Aurora.Replication.html) because the replicas
212+
share the same underlying storage layer as the writer instance thereby avoiding the need to copy data into the replica
213+
nodes. This Metaflow UI service application is read heavy and hence is a great candidate for scaling reads using this model.

services/ui_backend_service/tests/integration_tests/utils.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from aiohttp import web
22
from pyee import AsyncIOEventEmitter
33
import pytest
4+
import psycopg2
5+
import psycopg2.extras
46
import os
57
import json
68
import datetime
@@ -95,8 +97,11 @@ async def clean_db(db: AsyncPostgresDB):
9597
db.run_table_postgres,
9698
db.flow_table_postgres
9799
]
98-
for table in tables:
99-
await table.execute_sql(select_sql="DELETE FROM {}".format(table.table_name))
100+
with (await db.pool.cursor(
101+
cursor_factory=psycopg2.extras.DictCursor
102+
)) as cur:
103+
for table in tables:
104+
await table.execute_sql(select_sql="DELETE FROM {}".format(table.table_name), cur=cur)
100105

101106

102107
@pytest.fixture

0 commit comments

Comments
 (0)