diff --git a/README.md b/README.md index e78188df5a..005b1db989 100644 --- a/README.md +++ b/README.md @@ -292,7 +292,7 @@ When using the HTTP protocol there are two independent compression layers: 1. **HTTP web compression** (whole request/response body). This uses HTTP headers (`Accept-Encoding` and `Content-Encoding`). In ClickHouse, response compression is controlled by the `enable_http_compression` setting (pass it via `Options.Settings` or DSN query params). In clickhouse-go this mode is used when `Compression.Method` is `gzip`, `deflate`, or `br`. -2. **ClickHouse native block compression over HTTP** (Native format blocks). This uses ClickHouse HTTP query parameters: `compress=1` (server compresses response blocks) and `decompress=1` (server expects a compressed request body). In clickhouse-go this mode is used when `Compression.Method` is `lz4` or `zstd`. +2. **ClickHouse native block compression over HTTP** (Native format blocks). This uses ClickHouse HTTP query parameters: `compress=1` (server compresses response blocks) and `decompress=1` (server expects a compressed request body), plus `network_compression_method` to select the block codec (`LZ4` or `ZSTD`). In clickhouse-go this mode is used when `Compression.Method` is `lz4` or `zstd`. Avoid enabling both at the same time unless you've measured it, as it can waste CPU by compressing already-compressed native blocks. diff --git a/conn_http.go b/conn_http.go index 992bfe8da5..0e6c437283 100644 --- a/conn_http.go +++ b/conn_http.go @@ -402,6 +402,22 @@ func createCompressionPool(compression *Compression) (Pool[HTTPReaderWriter], er return pool, nil } +func applyHTTPNativeCompressionSettings(settings Settings, method CompressionMethod, level int, requestCompressed bool) { + if method != CompressionLZ4 && method != CompressionZSTD { + return + } + + settings["compress"] = "1" + if requestCompressed { + settings["decompress"] = "1" + } + + settings["network_compression_method"] = strings.ToUpper(method.String()) + if method == CompressionZSTD && level > 0 { + settings["network_zstd_compression_level"] = level + } +} + func (h *httpConnect) writeData(block *proto.Block) error { // Saving offset of compressible data start := len(h.buffer.Buf) diff --git a/conn_http_batch.go b/conn_http_batch.go index 930b30b4c5..a3a5f5d731 100644 --- a/conn_http_batch.go +++ b/conn_http_batch.go @@ -240,8 +240,11 @@ func (b *httpBatch) Send() (err error) { case CompressionGZIP, CompressionDeflate, CompressionBrotli: headers["Content-Encoding"] = b.conn.compression.String() case CompressionZSTD, CompressionLZ4: - options.settings["decompress"] = "1" - options.settings["compress"] = "1" + compressionLevel := 0 + if b.conn.opt != nil && b.conn.opt.Compression != nil { + compressionLevel = b.conn.opt.Compression.Level + } + applyHTTPNativeCompressionSettings(options.settings, b.conn.compression, compressionLevel, true) } compressionWriter := b.conn.compressionPool.Get() diff --git a/conn_http_query.go b/conn_http_query.go index bc7e9897db..eb61f65f4c 100644 --- a/conn_http_query.go +++ b/conn_http_query.go @@ -41,7 +41,11 @@ func (h *httpConnect) query(ctx context.Context, release nativeTransportRelease, headers := make(map[string]string) switch h.compression { case CompressionZSTD, CompressionLZ4: - options.settings["compress"] = "1" + compressionLevel := 0 + if h.opt != nil && h.opt.Compression != nil { + compressionLevel = h.opt.Compression.Level + } + applyHTTPNativeCompressionSettings(options.settings, h.compression, compressionLevel, false) case CompressionGZIP, CompressionDeflate, CompressionBrotli: // request encoding headers["Accept-Encoding"] = h.compression.String() diff --git a/tests/issues/1772_test.go b/tests/issues/1772_test.go new file mode 100644 index 0000000000..63d99287b2 --- /dev/null +++ b/tests/issues/1772_test.go @@ -0,0 +1,59 @@ +package issues + +import ( + "context" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ClickHouse/clickhouse-go/v2" + clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests" +) + +func Test1772_HTTPNativeCompressionMethodSettings(t *testing.T) { + testCases := []struct { + name string + method clickhouse.CompressionMethod + level int + expectedMethod string + }{ + { + name: "zstd", + method: clickhouse.CompressionZSTD, + level: 9, + expectedMethod: "ZSTD", + }, + { + name: "lz4", + method: clickhouse.CompressionLZ4, + level: 3, + expectedMethod: "LZ4", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + conn, err := clickhouse_tests.GetConnectionHTTP("issues", t.Name(), nil, nil, &clickhouse.Compression{ + Method: tc.method, + Level: tc.level, + }) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, conn.Close()) + }) + + ctx := context.Background() + + var method string + require.NoError(t, conn.QueryRow(ctx, "SELECT getSetting('network_compression_method')").Scan(&method)) + require.Equal(t, tc.expectedMethod, strings.ToUpper(method)) + + if tc.method == clickhouse.CompressionZSTD { + var level int8 + require.NoError(t, conn.QueryRow(ctx, "SELECT getSetting('network_zstd_compression_level')").Scan(&level)) + require.Equal(t, int8(tc.level), level) + } + }) + } +}