Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,6 @@ jobs:
- name: Start Dockerized test infrastructure
if: |
contains(fromJson('[
"materialize-clickhouse",
"materialize-dynamodb",
"materialize-elasticsearch",
"materialize-kafka",
"source-db2-batch",
"source-dynamodb",
"source-kafka",
Expand Down
18 changes: 18 additions & 0 deletions materialize-clickhouse/driver_clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestPrereqs(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

cfg := testConfig()

Expand Down Expand Up @@ -216,6 +217,7 @@ func TestInstallFence(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
var ctx = t.Context()
Expand All @@ -233,6 +235,7 @@ func TestExecStatements(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
var ctx = t.Context()
Expand All @@ -249,6 +252,7 @@ func TestTruncateTable(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
var ctx = t.Context()
Expand Down Expand Up @@ -292,6 +296,7 @@ func TestOpenNativeConn(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
conn, err := clickhouse.Open(cfg.newClickhouseOptions())
Expand All @@ -305,6 +310,7 @@ func TestDestroy(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
var tr = &transactor{}
Expand All @@ -325,6 +331,7 @@ func TestAddBinding(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
var ctx = t.Context()
Expand Down Expand Up @@ -422,6 +429,7 @@ func TestStoreAndLoadDataPath(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
cfg.HardDelete = true
Expand Down Expand Up @@ -478,6 +486,7 @@ func TestPrepareNewTransactor(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
var ctx = t.Context()
Expand Down Expand Up @@ -526,6 +535,7 @@ func TestHardDeleteTombstone(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
cfg.HardDelete = true
Expand Down Expand Up @@ -745,6 +755,7 @@ func TestLoadNonExistentKey(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
var ctx = t.Context()
Expand All @@ -763,6 +774,7 @@ func TestLoadMultipleKeys(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
var ctx = t.Context()
Expand Down Expand Up @@ -805,6 +817,7 @@ func TestCompositeKeyStoreAndLoad(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
var ctx = t.Context()
Expand Down Expand Up @@ -851,6 +864,7 @@ func TestVersionDeduplication(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
var ctx = t.Context()
Expand Down Expand Up @@ -887,6 +901,7 @@ func TestStoreBatchMultipleRows(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
var ctx = t.Context()
Expand All @@ -913,6 +928,7 @@ func TestMultiBindingStoreAndLoad(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
var ctx = t.Context()
Expand Down Expand Up @@ -990,6 +1006,7 @@ func TestCompositeKeyTombstone(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
cfg.HardDelete = true
Expand Down Expand Up @@ -1020,6 +1037,7 @@ func TestMovePartitionMissingTarget(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
var ctx = t.Context()
Expand Down
11 changes: 11 additions & 0 deletions materialize-clickhouse/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ func TestIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

makeResourceFn := func(table string, delta bool) tableConfig {
return tableConfig{Table: table, Delta: delta}
Expand Down Expand Up @@ -125,6 +126,11 @@ func noFlowDocShape(tableName string) sql.TableShape {
// double-encoded as strings) when reconstructed by the
// queryLoadTableNoFlowDocument template.
func TestNoFlowDocumentObjectColumns(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
cfg.Advanced.NoFlowDocument = true
var ctx = t.Context()
Expand Down Expand Up @@ -176,6 +182,11 @@ func TestNoFlowDocumentObjectColumns(t *testing.T) {
// TestNoFlowDocumentNullValues verifies that nullable columns serialize as JSON
// null (not the string "null" or a missing key) when their value is NULL.
func TestNoFlowDocumentNullValues(t *testing.T) {
if testing.Short() {
t.Skip()
}
ensureDockerUp(t)

var cfg = testConfig()
cfg.Advanced.NoFlowDocument = true
var ctx = t.Context()
Expand Down
66 changes: 49 additions & 17 deletions materialize-clickhouse/main_test.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,63 @@
package main

import (
"context"
"os"
"os/exec"
"strings"
"sync"
"testing"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
)

func TestMain(m *testing.M) {
// Start ClickHouse via docker compose for non-short test runs.
if !isShortMode() {
if err := exec.Command("docker", "compose", "-f", "docker-compose.yaml", "up", "--wait").Run(); err != nil {
panic("starting docker compose: " + err.Error())
var (
dockerOnce sync.Once
dockerUp bool
)

// ensureDockerUp starts the ClickHouse docker-compose services and waits until
// the server accepts Go client connections. Tests that need a running database
// must call this after their testing.Short() check. The docker-compose
// healthcheck uses clickhouse-client, which can succeed before the native TCP
// endpoint is ready to handshake with ch-go, so we poll with the real client
// before returning.
func ensureDockerUp(t *testing.T) {
t.Helper()
dockerOnce.Do(func() {
out, err := exec.Command("docker", "compose", "-f", "docker-compose.yaml", "up", "--wait").CombinedOutput()
if err != nil {
t.Fatalf("docker compose up failed: %s\n%s", err, out)
}
defer exec.Command("docker", "compose", "-f", "docker-compose.yaml", "down", "-v").Run()
}

os.Exit(m.Run())
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// isShortMode checks os.Args for the -test.short flag since flag.Parse
// hasn't been called yet inside TestMain.
func isShortMode() bool {
for _, arg := range os.Args {
if strings.HasPrefix(arg, "-test.short") || arg == "-short" {
return true
var lastErr error
for ctx.Err() == nil {
conn, err := clickhouse.Open(testConfig().newClickhouseOptions())
if err == nil {
if err = conn.Ping(ctx); err == nil {
conn.Close()
dockerUp = true
return
}
conn.Close()
}
lastErr = err
time.Sleep(250 * time.Millisecond)
}
t.Fatalf("clickhouse not accepting connections: %s", lastErr)
})
if !dockerUp {
t.Fatal("docker compose setup previously failed")
}
}

func TestMain(m *testing.M) {
code := m.Run()
if dockerUp {
exec.Command("docker", "compose", "-f", "docker-compose.yaml", "down", "-v").Run()
}
return false
os.Exit(code)
}
14 changes: 11 additions & 3 deletions materialize-kafka/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ services:
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
healthcheck:
test: /bin/kafka-cluster cluster-id --bootstrap-server localhost:9092
interval: 1s
timeout: 1s
interval: 5s
timeout: 10s
retries: 60
start_period: 30s
ports:
- 9092:9092
networks:
Expand All @@ -34,13 +35,20 @@ services:
hostname: schema-registry
container_name: schema-registry
depends_on:
- db
db:
condition: service_healthy
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'db:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
healthcheck:
test: curl -fsS http://localhost:8081/subjects || exit 1
interval: 5s
timeout: 10s
retries: 30
start_period: 30s
networks:
- flow-test

Expand Down
27 changes: 27 additions & 0 deletions materialize-kafka/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use rdkafka::{
};
use serde_json::{json, Map};
use std::collections::HashMap;
use std::sync::Once;
use time::{format_description, OffsetDateTime};

const BOOTSTRAP_SERVERS: &str = "localhost:9092";
Expand Down Expand Up @@ -43,6 +44,7 @@ fn test_spec() {

#[tokio::test]
async fn test_materialization() {
ensure_services_up();
drop_topics().await;

for name in [
Expand All @@ -69,6 +71,31 @@ async fn test_materialization() {
insta::assert_snapshot!(snapshot_topics().await);
}

fn ensure_services_up() {
static ONCE: Once = Once::new();
ONCE.call_once(|| {
let _ = std::process::Command::new("docker")
.args(["network", "create", "flow-test"])
.output();

let status = std::process::Command::new("docker")
.args(["compose", "-f", "docker-compose.yaml", "up", "--wait"])
.status()
.expect("failed to invoke docker compose");
if !status.success() {
let logs = std::process::Command::new("docker")
.args(["compose", "-f", "docker-compose.yaml", "logs"])
.output();
if let Ok(logs) = logs {
eprintln!("--- docker compose logs ---");
eprintln!("{}", String::from_utf8_lossy(&logs.stdout));
eprintln!("{}", String::from_utf8_lossy(&logs.stderr));
}
panic!("docker compose up --wait failed");
}
});
}

async fn drop_topics() {
let admin: AdminClient<_> = ClientConfig::new()
.set("bootstrap.servers", BOOTSTRAP_SERVERS)
Expand Down
Loading