|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import pytest |
| 4 | +from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker |
| 5 | +from fixtures.common_types import Lsn |
| 6 | +from fixtures.log_helper import log |
| 7 | +from fixtures.neon_fixtures import ( |
| 8 | + NeonEnvBuilder, |
| 9 | + wait_for_commit_lsn, |
| 10 | + wait_for_last_flush_lsn, |
| 11 | +) |
| 12 | +from fixtures.pageserver.utils import wait_for_last_record_lsn |
| 13 | + |
| 14 | + |
| 15 | +@pytest.mark.timeout(600) |
| 16 | +@pytest.mark.parametrize("size", [1024, 8192, 131072]) |
| 17 | +@pytest.mark.parametrize("fsync", [True, False], ids=["fsync", "nofsync"]) |
| 18 | +def test_ingest_logical_message( |
| 19 | + request: pytest.FixtureRequest, |
| 20 | + neon_env_builder: NeonEnvBuilder, |
| 21 | + zenbenchmark: NeonBenchmarker, |
| 22 | + fsync: bool, |
| 23 | + size: int, |
| 24 | +): |
| 25 | + """ |
| 26 | + Benchmarks ingestion of 10 GB of logical message WAL. These are essentially noops, and don't |
| 27 | + incur any pageserver writes. |
| 28 | + """ |
| 29 | + |
| 30 | + VOLUME = 10 * 1024**3 |
| 31 | + count = VOLUME // size |
| 32 | + |
| 33 | + neon_env_builder.safekeepers_enable_fsync = fsync |
| 34 | + |
| 35 | + env = neon_env_builder.init_start() |
| 36 | + endpoint = env.endpoints.create_start( |
| 37 | + "main", |
| 38 | + config_lines=[ |
| 39 | + f"fsync = {fsync}", |
| 40 | + # Disable backpressure. We don't want to block on pageserver. |
| 41 | + "max_replication_apply_lag = 0", |
| 42 | + "max_replication_flush_lag = 0", |
| 43 | + "max_replication_write_lag = 0", |
| 44 | + ], |
| 45 | + ) |
| 46 | + client = env.pageserver.http_client() |
| 47 | + |
| 48 | + # Wait for the timeline to be propagated to the pageserver. |
| 49 | + wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline) |
| 50 | + |
| 51 | + # Ingest data and measure durations. |
| 52 | + start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0]) |
| 53 | + |
| 54 | + with endpoint.cursor() as cur: |
| 55 | + cur.execute("set statement_timeout = 0") |
| 56 | + |
| 57 | + # Postgres will return once the logical messages have been written to its local WAL, without |
| 58 | + # waiting for Safekeeper commit. We measure ingestion time both for Postgres, Safekeeper, |
| 59 | + # and Pageserver to detect bottlenecks. |
| 60 | + log.info("Ingesting data") |
| 61 | + with zenbenchmark.record_duration("pageserver_ingest"): |
| 62 | + with zenbenchmark.record_duration("safekeeper_ingest"): |
| 63 | + with zenbenchmark.record_duration("postgres_ingest"): |
| 64 | + cur.execute(f""" |
| 65 | + select pg_logical_emit_message(false, '', repeat('x', {size})) |
| 66 | + from generate_series(1, {count}) |
| 67 | + """) |
| 68 | + |
| 69 | + end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0]) |
| 70 | + |
| 71 | + # Wait for Safekeeper. |
| 72 | + log.info("Waiting for Safekeeper to catch up") |
| 73 | + wait_for_commit_lsn(env, env.initial_tenant, env.initial_timeline, end_lsn) |
| 74 | + |
| 75 | + # Wait for Pageserver. |
| 76 | + log.info("Waiting for Pageserver to catch up") |
| 77 | + wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn) |
| 78 | + |
| 79 | + # Now that all data is ingested, delete and recreate the tenant in the pageserver. This will |
| 80 | + # reingest all the WAL from the safekeeper without any other constraints. This gives us a |
| 81 | + # baseline of how fast the pageserver can ingest this WAL in isolation. |
| 82 | + status = env.storage_controller.inspect(tenant_shard_id=env.initial_tenant) |
| 83 | + assert status is not None |
| 84 | + |
| 85 | + client.tenant_delete(env.initial_tenant) |
| 86 | + env.pageserver.tenant_create(tenant_id=env.initial_tenant, generation=status[0]) |
| 87 | + |
| 88 | + with zenbenchmark.record_duration("pageserver_recover_ingest"): |
| 89 | + log.info("Recovering WAL into pageserver") |
| 90 | + client.timeline_create(env.pg_version, env.initial_tenant, env.initial_timeline) |
| 91 | + wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline) |
| 92 | + |
| 93 | + # Emit metrics. |
| 94 | + wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024)) |
| 95 | + zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM) |
| 96 | + zenbenchmark.record("message_count", count, "messages", MetricReport.TEST_PARAM) |
| 97 | + |
| 98 | + props = {p["name"]: p["value"] for _, p in request.node.user_properties} |
| 99 | + for name in ("postgres", "safekeeper", "pageserver", "pageserver_recover"): |
| 100 | + throughput = int(wal_written_mb / props[f"{name}_ingest"]) |
| 101 | + zenbenchmark.record(f"{name}_throughput", throughput, "MB/s", MetricReport.HIGHER_IS_BETTER) |
0 commit comments