diff --git a/api/BUILD b/api/BUILD index 06b9121f64..be431502c7 100644 --- a/api/BUILD +++ b/api/BUILD @@ -73,6 +73,7 @@ proto_library( visibility = ["//visibility:public"], deps = [ "//contrib/envoy/extensions/compression/qatzip/compressor/v3alpha:pkg", + "//contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha:pkg", "//contrib/envoy/extensions/filters/http/dynamo/v3:pkg", "//contrib/envoy/extensions/filters/http/golang/v3alpha:pkg", "//contrib/envoy/extensions/filters/http/language/v3alpha:pkg", diff --git a/api/contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/BUILD b/api/contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/BUILD new file mode 100644 index 0000000000..1c1a6f6b44 --- /dev/null +++ b/api/contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/BUILD @@ -0,0 +1,12 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = [ + "//envoy/config/core/v3:pkg", + "@com_github_cncf_udpa//udpa/annotations:pkg", + ], +) diff --git a/api/contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/qatzstd.proto b/api/contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/qatzstd.proto new file mode 100644 index 0000000000..f58aec3d32 --- /dev/null +++ b/api/contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/qatzstd.proto @@ -0,0 +1,72 @@ +syntax = "proto3"; + +package envoy.extensions.compression.qatzstd.compressor.v3alpha; + +import "envoy/config/core/v3/base.proto"; + +import "google/protobuf/wrappers.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.compression.qatzstd.compressor.v3alpha"; +option java_outer_classname = "QatzstdProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/compression/qatzstd/compressor/v3alpha"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Qatzstd Compressor] +// [#extension: envoy.compression.qatzstd.compressor] + +// [#next-free-field: 8] +message Qatzstd { + // Reference to http://facebook.github.io/zstd/zstd_manual.html + enum Strategy { + DEFAULT = 0; + FAST = 1; + DFAST = 2; + GREEDY = 3; + LAZY = 4; + LAZY2 = 5; + BTLAZY2 = 6; + BTOPT = 7; + BTULTRA = 8; + BTULTRA2 = 9; + } + + // Set compression parameters according to pre-defined compression level table. + // Note that exact compression parameters are dynamically determined, + // depending on both compression level and source content size (when known). + // Value 0 means default, and default level is 3. + // Setting a level does not automatically set all other compression parameters + // to default. Setting this will however eventually dynamically impact the compression + // parameters which have not been manually set. The manually set + // ones will 'stick'. + google.protobuf.UInt32Value compression_level = 1; + + // A 32-bits checksum of content is written at end of frame. If not set, defaults to false. + bool enable_checksum = 2; + + // The higher the value of selected strategy, the more complex it is, + // resulting in stronger and slower compression. + // Special: value 0 means "use default strategy". + Strategy strategy = 3 [(validate.rules).enum = {defined_only: true}]; + + // A dictionary for compression. Zstd offers dictionary compression, which greatly improves + // efficiency on small files and messages. Each dictionary will be generated with a dictionary ID + // that can be used to search the same dictionary during decompression. + // Please refer to `zstd manual `_ + // to train a specific dictionary for compression. + config.core.v3.DataSource dictionary = 4; + + // Value for compressor's next output buffer. If not set, defaults to 4096. + google.protobuf.UInt32Value chunk_size = 5 [(validate.rules).uint32 = {lte: 65536 gte: 4096}]; + + + // Enable QAT to accelerate zstd compresstion or not. If not set, defaults to false. + bool enable_qat_zstd = 6; + + // Fall back to software for QAT-zstd when input size is less than this value. + // Valid only enable_qat_zstd is true. 0 means no fallback at all. If not set, defaults to 4000. + google.protobuf.UInt32Value qat_zstd_fallback_threshold = 7 [(validate.rules).uint32 = {lte: 65536 gte: 0}]; +} \ No newline at end of file diff --git a/api/versioning/BUILD b/api/versioning/BUILD index fb8dde8572..c1efcaebc3 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -10,6 +10,7 @@ proto_library( visibility = ["//visibility:public"], deps = [ "//contrib/envoy/extensions/compression/qatzip/compressor/v3alpha:pkg", + "//contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha:pkg", "//contrib/envoy/extensions/config/v3alpha:pkg", "//contrib/envoy/extensions/filters/http/dynamo/v3:pkg", "//contrib/envoy/extensions/filters/http/golang/v3alpha:pkg", diff --git a/bazel/foreign_cc/qatzstd.patch b/bazel/foreign_cc/qatzstd.patch new file mode 100644 index 0000000000..6cb5201b7a --- /dev/null +++ b/bazel/foreign_cc/qatzstd.patch @@ -0,0 +1,85 @@ +diff --git a/src/Makefile b/src/Makefile +index 1abf10d..a0c7e9a 100644 +--- a/src/Makefile ++++ b/src/Makefile +@@ -41,6 +41,7 @@ LIBDIR ?= $(INSTALLDIR)/lib + INCLUDEDIR ?= $(INSTALLDIR)/include + + CP ?= cp ++MKDIR ?= mkdir + + ENABLE_USDM_DRV ?= 0 + ifneq ($(ICP_ROOT), ) +@@ -55,10 +56,8 @@ ifneq ($(ICP_ROOT), ) + else + QATFLAGS = -DINTREE + LDFLAGS = -lqat +- ifneq ($(ENABLE_USDM_DRV), 0) +- QATFLAGS += -DENABLE_USDM_DRV +- LDFLAGS += -lusdm +- endif ++ QATFLAGS += -DENABLE_USDM_DRV ++ LDFLAGS += -lusdm + endif + + ifdef ZSTDLIB +@@ -69,8 +68,8 @@ CFLAGS += -Wall -Werror -Wextra -Wcast-align -Wshadow -Wstrict-aliasing=1 \ + -Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes \ + -Wundef -Wpointer-arith -Wvla -Wformat=2 -Winit-self \ + -Wfloat-equal -Wwrite-strings -Wredundant-decls -Wc++-compat \ +- -pedantic -fstack-protector-strong -fPIE -fPIC \ +- -fno-delete-null-pointer-checks -fwrapv -fno-strict-overflow ++ -pedantic -fstack-protector-strong \ ++ -fno-delete-null-pointer-checks -fwrapv + + DEBUGLEVEL ?=0 + +@@ -81,27 +80,30 @@ else + QATFLAGS += -O3 + endif + ++$(info INSTALLDIR="$(INSTALLDIR)") ++$(info CPPFLAGS="$(CPPFLAGS)") ++ + qatseqprod.o: qatseqprod.c +- $(CC) -c $(CFLAGS) $(QATFLAGS) $(DEBUGFLAGS) $^ -o $@ ++ $(CC) -c $(CPPFLAGS) $(CFLAGS) $(QATFLAGS) $(DEBUGFLAGS) $^ -o $@ + + lib: qatseqprod.o + $(AR) rc libqatseqprod.a $^ +- $(CC) -shared $^ $(LDFLAGS) -o libqatseqprod.so ++ @echo qatseqprod library successfully build + + .PHONY: install + install: lib ++ $(MKDIR) -p $(LIBDIR) ++ $(MKDIR) -p $(INCLUDEDIR) + $(CP) libqatseqprod.a $(LIBDIR) +- $(CP) libqatseqprod.so $(LIBDIR) + $(CP) qatseqprod.h $(INCLUDEDIR) + @echo qatseqprod library successfully installed + + .PHONY: uninstall + uninstall: + $(RM) $(LIBDIR)/libqatseqprod.a +- $(RM) $(LIBDIR)/libqatseqprod.so + $(RM) $(INCLUDEDIR)/qatseqprod.h + @echo qatseqprod library successfully uninstalled + + clean: + $(RM) *.o +- $(RM) libqatseqprod.a libqatseqprod.so ++ $(RM) libqatseqprod.a +diff --git a/test/Makefile b/test/Makefile +index dff0c8e..4ba01b2 100644 +--- a/test/Makefile ++++ b/test/Makefile +@@ -34,7 +34,7 @@ + # ####################################################################### + LIB = ../src + +-LDFLAGS = $(LIB)/libqatseqprod.a -I$(LIB) ++LDFLAGS = $(LIB)/libqatseqprod.a -I$(LIB) -L$(LIB) -l:libqatseqprod.a -l:libqat.a -l:libusdm.a -l:libzstd.a -lpthread -lcrypto + + ifneq ($(ICP_ROOT), ) + LDFLAGS += -lqat_s -lusdm_drv_s -Wl,-rpath,$(ICP_ROOT)/build -L$(ICP_ROOT)/build diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index 32f9d067b0..ae9b5e1b52 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -280,6 +280,7 @@ def envoy_dependencies(skip_targets = []): _com_github_intel_ipp_crypto_crypto_mb() _com_github_intel_qatlib() _com_github_intel_qatzip() + _com_github_qat_zstd() _com_github_lz4_lz4() _com_github_jbeder_yaml_cpp() _com_github_libevent_libevent() @@ -526,6 +527,14 @@ def _com_github_intel_qatzip(): build_file_content = BUILD_ALL_CONTENT, ) +def _com_github_qat_zstd(): + external_http_archive( + name = "com_github_qat_zstd", + build_file_content = BUILD_ALL_CONTENT, + patch_args = ["-p1"], + patches = ["@envoy//bazel/foreign_cc:qatzstd.patch"], + ) + def _com_github_lz4_lz4(): external_http_archive( name = "com_github_lz4_lz4", diff --git a/bazel/repository_locations.bzl b/bazel/repository_locations.bzl index fb5875143c..2165a83a78 100644 --- a/bazel/repository_locations.bzl +++ b/bazel/repository_locations.bzl @@ -431,7 +431,7 @@ REPOSITORY_LOCATIONS_SPEC = dict( urls = ["https://github.com/intel/qatlib/archive/refs/tags/{version}.tar.gz"], use_category = ["dataplane_ext"], release_date = "2023-02-24", - extensions = ["envoy.tls.key_providers.qat","envoy.compression.qatzip.compresso"], + extensions = ["envoy.tls.key_providers.qat", "envoy.compression.qatzip.compressor","envoy.compression.qatzstd.compressor"], cpe = "N/A", license = "BSD-3-Clause", license_url = "https://github.com/intel/qatlib/blob/{version}/LICENSE", @@ -451,6 +451,21 @@ REPOSITORY_LOCATIONS_SPEC = dict( license = "BSD-3-Clause", license_url = "https://github.com/intel/QATzip/blob/{version}/LICENSE", ), + com_github_qat_zstd = dict( + project_name = "QAT-ZSTD-Plugin", + project_desc = "IntelĀ® QuickAssist Technology ZSTD Plugin (QAT ZSTD Plugin)", + project_url = "https://github.com/intel/QAT-ZSTD-Plugin/", + version = "0.1.0", + sha256 = "74c5bfbb3b0c6f1334e128ee0b43958d1d34751a4762e54e8f970c443e445f33", + strip_prefix = "QAT-ZSTD-Plugin-{version}", + urls = ["https://github.com/intel/QAT-ZSTD-Plugin/archive/refs/tags/v{version}.tar.gz"], + use_category = ["dataplane_ext"], + extensions = [ + "envoy.compression.qatzstd.compressor", + ], + release_date = "2023-09-08", + cpe = "N/A", + ), com_github_luajit_luajit = dict( project_name = "LuaJIT", project_desc = "Just-In-Time compiler for Lua", diff --git a/contrib/contrib_build_config.bzl b/contrib/contrib_build_config.bzl index d04de0d0e8..a9515036ca 100644 --- a/contrib/contrib_build_config.bzl +++ b/contrib/contrib_build_config.bzl @@ -5,84 +5,85 @@ CONTRIB_EXTENSIONS = { # "envoy.compression.qatzip.compressor": "//contrib/qat/compression/qatzip/compressor/source:config", + "envoy.compression.qatzstd.compressor": "//contrib/qat/compression/qatzstd/compressor/source:config", + + # # + # # HTTP filters + # # - # - # HTTP filters - # + # "envoy.filters.http.dynamo": "//contrib/dynamo/filters/http/source:config", + # "envoy.filters.http.golang": "//contrib/golang/filters/http/source:config", + # "envoy.filters.http.language": "//contrib/language/filters/http/source:config_lib", + # "envoy.filters.http.squash": "//contrib/squash/filters/http/source:config", + # "envoy.filters.http.sxg": "//contrib/sxg/filters/http/source:config", - "envoy.filters.http.dynamo": "//contrib/dynamo/filters/http/source:config", - "envoy.filters.http.golang": "//contrib/golang/filters/http/source:config", - "envoy.filters.http.language": "//contrib/language/filters/http/source:config_lib", - "envoy.filters.http.squash": "//contrib/squash/filters/http/source:config", - "envoy.filters.http.sxg": "//contrib/sxg/filters/http/source:config", + # # + # # Network filters + # # - # - # Network filters - # + # "envoy.filters.network.client_ssl_auth": "//contrib/client_ssl_auth/filters/network/source:config", + # "envoy.filters.network.kafka_broker": "//contrib/kafka/filters/network/source:kafka_broker_config_lib", + # "envoy.filters.network.kafka_mesh": "//contrib/kafka/filters/network/source/mesh:config_lib", + # "envoy.filters.network.mysql_proxy": "//contrib/mysql_proxy/filters/network/source:config", + # "envoy.filters.network.postgres_proxy": "//contrib/postgres_proxy/filters/network/source:config", + # "envoy.filters.network.rocketmq_proxy": "//contrib/rocketmq_proxy/filters/network/source:config", + # "envoy.filters.network.generic_proxy": "//contrib/generic_proxy/filters/network/source:config", + # "envoy.filters.network.golang": "//contrib/golang/filters/network/source:config", - "envoy.filters.network.client_ssl_auth": "//contrib/client_ssl_auth/filters/network/source:config", - "envoy.filters.network.kafka_broker": "//contrib/kafka/filters/network/source:kafka_broker_config_lib", - "envoy.filters.network.kafka_mesh": "//contrib/kafka/filters/network/source/mesh:config_lib", - "envoy.filters.network.mysql_proxy": "//contrib/mysql_proxy/filters/network/source:config", - "envoy.filters.network.postgres_proxy": "//contrib/postgres_proxy/filters/network/source:config", - "envoy.filters.network.rocketmq_proxy": "//contrib/rocketmq_proxy/filters/network/source:config", - "envoy.filters.network.generic_proxy": "//contrib/generic_proxy/filters/network/source:config", - "envoy.filters.network.golang": "//contrib/golang/filters/network/source:config", + # # + # # Sip proxy + # # - # - # Sip proxy - # + # "envoy.filters.network.sip_proxy": "//contrib/sip_proxy/filters/network/source:config", + # "envoy.filters.sip.router": "//contrib/sip_proxy/filters/network/source/router:config", - "envoy.filters.network.sip_proxy": "//contrib/sip_proxy/filters/network/source:config", - "envoy.filters.sip.router": "//contrib/sip_proxy/filters/network/source/router:config", + # # + # # Private key providers + # # - # - # Private key providers - # + # "envoy.tls.key_providers.cryptomb": "//contrib/cryptomb/private_key_providers/source:config", + # "envoy.tls.key_providers.qat": "//contrib/qat/private_key_providers/source:config", + # "envoy.tls.key_providers.sgx": "//contrib/sgx/private_key_providers/source:config", - "envoy.tls.key_providers.cryptomb": "//contrib/cryptomb/private_key_providers/source:config", - "envoy.tls.key_providers.qat": "//contrib/qat/private_key_providers/source:config", - "envoy.tls.key_providers.sgx": "//contrib/sgx/private_key_providers/source:config", + # # + # # Socket interface extensions + # # - # - # Socket interface extensions - # + # "envoy.bootstrap.vcl": "//contrib/vcl/source:config", - "envoy.bootstrap.vcl": "//contrib/vcl/source:config", + # # + # # Input matchers + # # - # - # Input matchers - # + # "envoy.matching.input_matchers.hyperscan": "//contrib/hyperscan/matching/input_matchers/source:config", - "envoy.matching.input_matchers.hyperscan": "//contrib/hyperscan/matching/input_matchers/source:config", + # # + # # Connection Balance extensions + # # - # - # Connection Balance extensions - # + # "envoy.network.connection_balance.dlb": "//contrib/network/connection_balance/dlb/source:connection_balancer", - "envoy.network.connection_balance.dlb": "//contrib/network/connection_balance/dlb/source:connection_balancer", + # # + # # Regex engines + # # - # - # Regex engines - # + # "envoy.regex_engines.hyperscan": "//contrib/hyperscan/regex_engines/source:config", - "envoy.regex_engines.hyperscan": "//contrib/hyperscan/regex_engines/source:config", + # # + # # Extensions for generic proxy + # # + # "envoy.filters.generic.router": "//contrib/generic_proxy/filters/network/source/router:config", + # "envoy.generic_proxy.codecs.dubbo": "//contrib/generic_proxy/filters/network/source/codecs/dubbo:config", - # - # Extensions for generic proxy - # - "envoy.filters.generic.router": "//contrib/generic_proxy/filters/network/source/router:config", - "envoy.generic_proxy.codecs.dubbo": "//contrib/generic_proxy/filters/network/source/codecs/dubbo:config", + # # + # # xDS delegates + # # - # - # xDS delegates - # - - "envoy.xds_delegates.kv_store": "//contrib/config/source:kv_store_xds_delegate", + # "envoy.xds_delegates.kv_store": "//contrib/config/source:kv_store_xds_delegate", - # - # cluster specifier plugin - # + # # + # # cluster specifier plugin + # # - "envoy.router.cluster_specifier_plugin.golang": "//contrib/golang/router/cluster_specifier/source:config", + # "envoy.router.cluster_specifier_plugin.golang": "//contrib/golang/router/cluster_specifier/source:config", } diff --git a/contrib/extensions_metadata.yaml b/contrib/extensions_metadata.yaml index 7def18a48f..848ce6afb9 100644 --- a/contrib/extensions_metadata.yaml +++ b/contrib/extensions_metadata.yaml @@ -13,6 +13,11 @@ envoy.compression.qatzip.compressor: - envoy.compression.compressor security_posture: robust_to_untrusted_downstream_and_upstream status: alpha +envoy.compression.qatzstd.compressor: + categories: + - envoy.compression.compressor + security_posture: robust_to_untrusted_downstream_and_upstream + status: alpha envoy.filters.http.squash: categories: - envoy.filters.http diff --git a/contrib/qat/compression/qatzstd/compressor/source/BUILD b/contrib/qat/compression/qatzstd/compressor/source/BUILD new file mode 100644 index 0000000000..2d849049de --- /dev/null +++ b/contrib/qat/compression/qatzstd/compressor/source/BUILD @@ -0,0 +1,61 @@ +load("@rules_foreign_cc//foreign_cc:defs.bzl", "make") +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_contrib_extension", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +make( + name = "qat-zstd", + build_data = ["@com_github_qat_zstd//:all"], + includes = [], + lib_source = "@com_github_qat_zstd//:all", + out_static_libs = ["libqatseqprod.a"], + tags = ["skip_on_windows"], + target_compatible_with = [ + "@platforms//os:linux", + "@platforms//cpu:x86_64", + ], + targets = [ + "", + "install", + ], + deps = [ + "//contrib/qat:qatlib", + "//external:zstd", + ], +) + +envoy_cc_library( + name = "compressor_lib", + srcs = ["qatzstd_compressor_impl.cc"], + hdrs = ["qatzstd_compressor_impl.h"], + deps = [ + ":qat-zstd", + "//envoy/compression/compressor:compressor_interface", + "//source/common/buffer:buffer_lib", + "//source/common/compression/zstd/common:zstd_base_lib", + "//source/common/compression/zstd/common:zstd_dictionary_manager_lib", + "//source/common/compression/zstd/compressor:compressor_base", + ], +) + +envoy_cc_contrib_extension( + name = "config", + srcs = ["config.cc"], + hdrs = ["config.h"], + deps = [ + ":compressor_lib", + ":qat-zstd", + "//envoy/event:dispatcher_interface", + "//envoy/thread_local:thread_local_interface", + "//source/common/http:headers_lib", + "//source/extensions/compression/common/compressor:compressor_factory_base_lib", + "@envoy_api//contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha:pkg_cc_proto", + ], +) diff --git a/contrib/qat/compression/qatzstd/compressor/source/config.cc b/contrib/qat/compression/qatzstd/compressor/source/config.cc new file mode 100644 index 0000000000..75dfa15989 --- /dev/null +++ b/contrib/qat/compression/qatzstd/compressor/source/config.cc @@ -0,0 +1,91 @@ +#include "contrib/qat/compression/qatzstd/compressor/source/config.h" + +namespace Envoy { +namespace Extensions { +namespace Compression { +namespace Qatzstd { +namespace Compressor { + +QatzstdCompressorFactory::QatzstdCompressorFactory( + const envoy::extensions::compression::qatzstd::compressor::v3alpha::Qatzstd& qatzstd, + Event::Dispatcher& dispatcher, Api::Api& api, ThreadLocal::SlotAllocator& tls) + : compression_level_( + PROTOBUF_GET_WRAPPED_OR_DEFAULT(qatzstd, compression_level, ZSTD_CLEVEL_DEFAULT)), + enable_checksum_(qatzstd.enable_checksum()), strategy_(qatzstd.strategy()), + chunk_size_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(qatzstd, chunk_size, ZSTD_CStreamOutSize())), + enable_qat_zstd_(qatzstd.enable_qat_zstd()), + qat_zstd_fallback_threshold_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( + qatzstd, qat_zstd_fallback_threshold, DefaultQatZstdFallbackThreshold)), + tls_slot_(nullptr) { + if (qatzstd.has_dictionary()) { + Protobuf::RepeatedPtrField dictionaries; + dictionaries.Add()->CopyFrom(qatzstd.dictionary()); + cdict_manager_ = std::make_unique( + dictionaries, dispatcher, api, tls, true, + [this](const void* dict_buffer, size_t dict_size) -> ZSTD_CDict* { + return ZSTD_createCDict(dict_buffer, dict_size, compression_level_); + }); + } + if (enable_qat_zstd_) { + tls_slot_ = ThreadLocal::TypedSlot::makeUnique(tls); + tls_slot_->set([](Event::Dispatcher&) { return std::make_shared(); }); + } +} + +QatzstdCompressorFactory::QatzstdThreadLocal::QatzstdThreadLocal() + : initialized_(false), sequenceProducerState_(nullptr) {} + +QatzstdCompressorFactory::QatzstdThreadLocal::~QatzstdThreadLocal() { + if (initialized_) { + /* Free sequence producer state */ + QZSTD_freeSeqProdState(sequenceProducerState_); + /* Stop QAT device, please call this function when + you won't use QAT anymore or before the process exits */ + QZSTD_stopQatDevice(); + } +} + +void* QatzstdCompressorFactory::QatzstdThreadLocal::GetQATSession() { + // The session must be initialized only once in every worker thread. + if (!initialized_) { + + int status = QZSTD_startQatDevice(); + RELEASE_ASSERT(status == QZSTD_OK, "failed to initialize hardware"); + sequenceProducerState_ = QZSTD_createSeqProdState(); + initialized_ = true; + } + + return sequenceProducerState_; +} + +Envoy::Compression::Compressor::CompressorPtr QatzstdCompressorFactory::createCompressor() { + if (enable_qat_zstd_) { + return std::make_unique( + compression_level_, enable_checksum_, strategy_, cdict_manager_, chunk_size_, + enable_qat_zstd_, qat_zstd_fallback_threshold_, tls_slot_->get()->GetQATSession()); + } else { + return std::make_unique(compression_level_, enable_checksum_, strategy_, + cdict_manager_, chunk_size_, enable_qat_zstd_, + qat_zstd_fallback_threshold_, nullptr); + } +} + +Envoy::Compression::Compressor::CompressorFactoryPtr +QatzstdCompressorLibraryFactory::createCompressorFactoryFromProtoTyped( + const envoy::extensions::compression::qatzstd::compressor::v3alpha::Qatzstd& proto_config, + Server::Configuration::FactoryContext& context) { + return std::make_unique(proto_config, context.mainThreadDispatcher(), + context.api(), context.threadLocal()); +} + +/** + * Static registration for the zstd compressor library. @see NamedCompressorLibraryConfigFactory. + */ +REGISTER_FACTORY(QatzstdCompressorLibraryFactory, + Envoy::Compression::Compressor::NamedCompressorLibraryConfigFactory); + +} // namespace Compressor +} // namespace Qatzstd +} // namespace Compression +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/qat/compression/qatzstd/compressor/source/config.h b/contrib/qat/compression/qatzstd/compressor/source/config.h new file mode 100644 index 0000000000..0342a2f2ac --- /dev/null +++ b/contrib/qat/compression/qatzstd/compressor/source/config.h @@ -0,0 +1,82 @@ +#pragma once + +#include "envoy/compression/compressor/factory.h" +#include "envoy/event/dispatcher.h" +#include "envoy/thread_local/thread_local.h" + +#include "source/common/http/headers.h" +#include "source/extensions/compression/common/compressor/factory_base.h" + +#include "contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/qatzstd.pb.h" +#include "contrib/envoy/extensions/compression/qatzstd/compressor/v3alpha/qatzstd.pb.validate.h" +#include "contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.h" +#include "qatseqprod.h" + +namespace Envoy { +namespace Extensions { +namespace Compression { +namespace Qatzstd { +namespace Compressor { + +// Default threshold for qat_zstd fallback to software. +const uint32_t DefaultQatZstdFallbackThreshold = 4000; + +namespace { + +const std::string& qatzstdStatsPrefix() { CONSTRUCT_ON_FIRST_USE(std::string, "qatzstd."); } +const std::string& qatzstdExtensionName() { + CONSTRUCT_ON_FIRST_USE(std::string, "envoy.compression.qatzstd.compressor"); +} + +} // namespace + +class QatzstdCompressorFactory : public Envoy::Compression::Compressor::CompressorFactory { +public: + QatzstdCompressorFactory( + const envoy::extensions::compression::qatzstd::compressor::v3alpha::Qatzstd& qatzstd, + Event::Dispatcher& dispatcher, Api::Api& api, ThreadLocal::SlotAllocator& tls); + + // Envoy::Compression::Compressor::CompressorFactory + Envoy::Compression::Compressor::CompressorPtr createCompressor() override; + const std::string& statsPrefix() const override { return qatzstdStatsPrefix(); } + const std::string& contentEncoding() const override { + return Http::CustomHeaders::get().ContentEncodingValues.Zstd; + } + +private: + struct QatzstdThreadLocal : public ThreadLocal::ThreadLocalObject { + QatzstdThreadLocal(); + ~QatzstdThreadLocal() override; + void* GetQATSession(); + bool initialized_; + void* sequenceProducerState_; + }; + const uint32_t compression_level_; + const bool enable_checksum_; + const uint32_t strategy_; + const uint32_t chunk_size_; + ZstdCDictManagerPtr cdict_manager_{nullptr}; + const bool enable_qat_zstd_; + const uint32_t qat_zstd_fallback_threshold_; + ThreadLocal::TypedSlotPtr tls_slot_; +}; + +class QatzstdCompressorLibraryFactory + : public Compression::Common::Compressor::CompressorLibraryFactoryBase< + envoy::extensions::compression::qatzstd::compressor::v3alpha::Qatzstd> { +public: + QatzstdCompressorLibraryFactory() : CompressorLibraryFactoryBase(qatzstdExtensionName()) {} + +private: + Envoy::Compression::Compressor::CompressorFactoryPtr createCompressorFactoryFromProtoTyped( + const envoy::extensions::compression::qatzstd::compressor::v3alpha::Qatzstd& config, + Server::Configuration::FactoryContext& context) override; +}; + +DECLARE_FACTORY(QatzstdCompressorLibraryFactory); + +} // namespace Compressor +} // namespace Qatzstd +} // namespace Compression +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.cc b/contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.cc new file mode 100644 index 0000000000..bbd13fda60 --- /dev/null +++ b/contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.cc @@ -0,0 +1,80 @@ +#include "contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.h" + +namespace Envoy { +namespace Extensions { +namespace Compression { +namespace Qatzstd { +namespace Compressor { + +QatzstdCompressorImpl::QatzstdCompressorImpl(uint32_t compression_level, bool enable_checksum, + uint32_t strategy, + const ZstdCDictManagerPtr& cdict_manager, + uint32_t chunk_size, bool enable_qat_zstd, + uint32_t qat_zstd_fallback_threshold, + void* sequenceProducerState) + : ZstdCompressorImplBase(compression_level, enable_checksum, strategy, cdict_manager, + chunk_size), + enable_qat_zstd_(enable_qat_zstd), qat_zstd_fallback_threshold_(qat_zstd_fallback_threshold), + sequenceProducerState_(sequenceProducerState), input_ptr_{std::make_unique( + chunk_size)}, + input_len_(0), chunk_size_(chunk_size) { + ENVOY_LOG(debug, + "zstd new ZstdCompressorImpl, compression_level: {}, strategy: {}, chunk_size: " + "{}, enable_qat_zstd: {}, qat_zstd_fallback_threshold: {}", + compression_level, strategy, chunk_size, enable_qat_zstd, qat_zstd_fallback_threshold); + if (enable_qat_zstd_) { + /* register qatSequenceProducer */ + ZSTD_registerSequenceProducer(cctx_.get(), sequenceProducerState_, qatSequenceProducer); + size_t result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_enableSeqProducerFallback, 1); + RELEASE_ASSERT(!ZSTD_isError(result), ""); + } +} + +void QatzstdCompressorImpl::compressPreprocess(Buffer::Instance& buffer, + Envoy::Compression::Compressor::State state) { + ENVOY_LOG(debug, "zstd compress input size {}", buffer.length()); + if (enable_qat_zstd_ && state == Envoy::Compression::Compressor::State::Flush) { + // Fall back to software if input size less than threshold to achieve better performance. + if (buffer.length() < qat_zstd_fallback_threshold_) { + ENVOY_LOG(debug, "zstd compress fall back to software"); + ZSTD_registerSequenceProducer(cctx_.get(), nullptr, nullptr); + } + } +} + +void QatzstdCompressorImpl::setInput(const uint8_t* input, size_t size) { + input_.src = input; + input_.pos = 0; + input_.size = size; + input_len_ = 0; +} + +void QatzstdCompressorImpl::compressProcess(const Buffer::Instance& buffer, + const Buffer::RawSlice& input_slice, + Buffer::Instance& accumulation_buffer) { + if (input_slice.len_ == buffer.length()) { + setInput(static_cast(input_slice.mem_), input_slice.len_); + process(accumulation_buffer, ZSTD_e_continue); + } else { + if (input_len_ + input_slice.len_ > chunk_size_) { + setInput(input_ptr_.get(), input_len_); + process(accumulation_buffer, ZSTD_e_continue); + } + memcpy(input_ptr_.get() + input_len_, input_slice.mem_, + input_slice.len_); // NOLINT(safe-memcpy) + input_len_ += input_slice.len_; + } +} + +void QatzstdCompressorImpl::compressPostprocess(Buffer::Instance& accumulation_buffer) { + if (input_len_ > 0) { + setInput(input_ptr_.get(), input_len_); + process(accumulation_buffer, ZSTD_e_continue); + } +} + +} // namespace Compressor +} // namespace Qatzstd +} // namespace Compression +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.h b/contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.h new file mode 100644 index 0000000000..fc198f73a9 --- /dev/null +++ b/contrib/qat/compression/qatzstd/compressor/source/qatzstd_compressor_impl.h @@ -0,0 +1,58 @@ +#pragma once + +#include "envoy/compression/compressor/compressor.h" +#include "envoy/server/factory_context.h" + +#include "source/common/common/logger.h" +#include "source/common/compression/zstd/common/base.h" +#include "source/common/compression/zstd/common/dictionary_manager.h" +#include "source/common/compression/zstd/compressor/zstd_compressor_impl_base.h" + +#include "qatseqprod.h" + +namespace Envoy { +namespace Extensions { +namespace Compression { +namespace Qatzstd { +namespace Compressor { + +using ZstdCDictManager = + Envoy::Compression::Zstd::Common::DictionaryManager; +using ZstdCDictManagerPtr = std::unique_ptr; + +/** + * Implementation of compressor's interface. + */ +class QatzstdCompressorImpl : public Envoy::Compression::Zstd::Compressor::ZstdCompressorImplBase, + public Logger::Loggable { +public: + QatzstdCompressorImpl(uint32_t compression_level, bool enable_checksum, uint32_t strategy, + const ZstdCDictManagerPtr& cdict_manager, uint32_t chunk_size, + bool enable_qat_zstd, uint32_t qat_zstd_fallback_threshold, + void* sequenceProducerState); + +private: + void compressPreprocess(Buffer::Instance& buffer, + Envoy::Compression::Compressor::State state) override; + + void compressProcess(const Buffer::Instance& buffer, const Buffer::RawSlice& input_slice, + Buffer::Instance& accumulation_buffer) override; + + void compressPostprocess(Buffer::Instance& accumulation_buffer) override; + + void setInput(const uint8_t* input, size_t size); + + bool enable_qat_zstd_; + const uint32_t qat_zstd_fallback_threshold_; + void* sequenceProducerState_; + std::unique_ptr input_ptr_; + uint64_t input_len_; + uint64_t chunk_size_; +}; + +} // namespace Compressor +} // namespace Qatzstd +} // namespace Compression +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/qat/compression/qatzstd/compressor/test/BUILD b/contrib/qat/compression/qatzstd/compressor/test/BUILD new file mode 100644 index 0000000000..5d14ef1bf0 --- /dev/null +++ b/contrib/qat/compression/qatzstd/compressor/test/BUILD @@ -0,0 +1,20 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_contrib_package", + "envoy_cc_test", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_test( + name = "compressor_test", + srcs = ["qatzstd_compressor_impl_test.cc"], + deps = [ + "//contrib/qat/compression/qatzstd/compressor/source:config", + "//source/extensions/compression/zstd/decompressor:decompressor_lib", + "//test/mocks/server:factory_context_mocks", + "//test/test_common:utility_lib", + ], +) diff --git a/contrib/qat/compression/qatzstd/compressor/test/qatzstd_compressor_impl_test.cc b/contrib/qat/compression/qatzstd/compressor/test/qatzstd_compressor_impl_test.cc new file mode 100644 index 0000000000..8f0faf70f1 --- /dev/null +++ b/contrib/qat/compression/qatzstd/compressor/test/qatzstd_compressor_impl_test.cc @@ -0,0 +1,159 @@ +#include "source/common/buffer/buffer_impl.h" +#include "source/common/stats/isolated_store_impl.h" +#include "contrib/qat/compression/qatzstd/compressor/source/config.h" +#include "source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.h" + +#include "test/mocks/server/factory_context.h" +#include "test/test_common/utility.h" + +#include "gtest/gtest.h" +#include "qatseqprod.h" + +namespace Envoy { +namespace Extensions { +namespace Compression { +namespace Qatzstd { +namespace Compressor { +namespace { + +class QatzstdCompressorImplTest : public testing::Test { +protected: + void drainBuffer(Buffer::OwnedImpl& buffer) { + buffer.drain(buffer.length()); + ASSERT_EQ(0, buffer.length()); + } + + void verifyWithDecompressor(Envoy::Compression::Compressor::CompressorPtr compressor) { + Buffer::OwnedImpl buffer; + Buffer::OwnedImpl accumulation_buffer; + std::string original_text{}; + for (uint64_t i = 0; i < 10; i++) { + TestUtility::feedBufferWithRandomCharacters(buffer, default_input_size_ * i, i); + original_text.append(buffer.toString()); + ASSERT_EQ(default_input_size_ * i, buffer.length()); + compressor->compress(buffer, Envoy::Compression::Compressor::State::Flush); + accumulation_buffer.add(buffer); + drainBuffer(buffer); + } + + compressor->compress(buffer, Envoy::Compression::Compressor::State::Finish); + accumulation_buffer.add(buffer); + drainBuffer(buffer); + + Stats::IsolatedStoreImpl stats_store{}; + Zstd::Decompressor::ZstdDecompressorImpl decompressor{*stats_store.rootScope(), "test.", + default_ddict_manager_, 4096}; + + decompressor.decompress(accumulation_buffer, buffer); + std::string decompressed_text{buffer.toString()}; + + ASSERT_EQ(original_text.length(), decompressed_text.length()); + EXPECT_EQ(original_text, decompressed_text); + } + + Envoy::Compression::Compressor::CompressorFactoryPtr + createQatzstdCompressorFactoryFromConfig(const std::string& json) { + envoy::extensions::compression::qatzstd::compressor::v3alpha::Qatzstd qatzstd_config; + TestUtility::loadFromJson(json, qatzstd_config); + + return qatzstd_compressor_library_factory_.createCompressorFactoryFromProto(qatzstd_config, + context_); + } + + static constexpr uint32_t default_compression_level_{6}; + static constexpr uint32_t default_enable_checksum_{0}; + static constexpr uint32_t default_strategy_{0}; + uint32_t default_input_size_{796}; + uint32_t default_input_round_{10}; + ZstdCDictManagerPtr default_cdict_manager_{nullptr}; + Zstd::Decompressor::ZstdDDictManagerPtr default_ddict_manager_{nullptr}; + bool enable_qat_zstd{true}; + uint32_t qat_zstd_fallback_threshold{0}; + QatzstdCompressorLibraryFactory qatzstd_compressor_library_factory_; + NiceMock context_; +}; + +class QatzstdConfigTest + : public QatzstdCompressorImplTest, + public ::testing::WithParamInterface> {}; + +// These tests should pass even if required hardware or setup steps required for qatzstd are missing. +// Qatzstd uses a sofware fallback in this case. +INSTANTIATE_TEST_SUITE_P(QatzstdConfigTestInstantiation, QatzstdConfigTest, + // First tuple has all default values. + ::testing::Values(std::make_tuple(1, 4096, true, 4096), + std::make_tuple(2, 4096, true, 4096), + std::make_tuple(3, 65536, true, 4096), + std::make_tuple(4, 4096, true, 4096), + std::make_tuple(5, 8192, true, 1024), + std::make_tuple(6, 4096, false, 1024), + std::make_tuple(7, 4096, true, 1024), + std::make_tuple(8, 8192, true, 4096), + std::make_tuple(9, 8192, true, 1024), + std::make_tuple(10, 16384, true, 1024), + std::make_tuple(11, 8192, true, 8192), + std::make_tuple(12, 4096, true, 1024))); + +TEST_P(QatzstdConfigTest, LoadConfigAndVerifyWithDecompressor) { + std::tuple config_value_tuple = GetParam(); + std::string json{fmt::format(R"EOF({{ + "compression_level": {}, + "chunk_size": {}, + "enable_qat_zstd": {}, + "qat_zstd_fallback_threshold": {}, +}})EOF", + std::get<0>(config_value_tuple), std::get<1>(config_value_tuple), + std::get<2>(config_value_tuple), std::get<3>(config_value_tuple))}; + + Envoy::Compression::Compressor::CompressorFactoryPtr qatzstd_compressor_factory = + createQatzstdCompressorFactoryFromConfig(json); + + EXPECT_EQ("zstd", qatzstd_compressor_factory->contentEncoding()); + EXPECT_EQ("qatzstd.", qatzstd_compressor_factory->statsPrefix()); + + verifyWithDecompressor(qatzstd_compressor_factory->createCompressor()); +} + +TEST_F(QatzstdCompressorImplTest, IllegalConfig) { + envoy::extensions::compression::qatzstd::compressor::v3alpha::Qatzstd qatzstd; + Qatzstd::Compressor::QatzstdCompressorLibraryFactory lib_factory; + NiceMock mock_context; + std::string json; + + json = R"EOF({ + "compression_level": 7, + "enable_checksum": true, + "strategy":"default", + "chunk_size": 4096, + "dictionary": { + "inline_string": "" + }, + enable_qat_zstd: true, + qat_zstd_fallback_threshold: 1024, +})EOF"; + TestUtility::loadFromJson(json, qatzstd); + EXPECT_THROW_WITH_MESSAGE(lib_factory.createCompressorFactoryFromProto(qatzstd, mock_context), + EnvoyException, "DataSource cannot be empty"); + + json = R"EOF({ + "compression_level": 7, + "enable_checksum": true, + "strategy":"default", + "chunk_size": 4096, + "dictionary": { + "inline_string": "123321123" + }, + enable_qat_zstd: true, + qat_zstd_fallback_threshold: 1024, +})EOF"; + TestUtility::loadFromJson(json, qatzstd); + EXPECT_DEATH({ lib_factory.createCompressorFactoryFromProto(qatzstd, mock_context); }, + "assert failure: id != 0. Details: Illegal Zstd dictionary"); +} + +} // namespace +} // namespace Compressor +} // namespace Qatzstd +} // namespace Compression +} // namespace Extensions +} // namespace Envoy diff --git a/envoy-zstd.yaml b/envoy-zstd.yaml new file mode 100644 index 0000000000..ff65d46ac7 --- /dev/null +++ b/envoy-zstd.yaml @@ -0,0 +1,62 @@ +static_resources: + listeners: + - address: + socket_address: + address: 0.0.0.0 + port_value: 10000 + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http + route_config: + name: local_route + virtual_hosts: + - name: backend + domains: + - "*" + routes: + - match: + prefix: "/" + route: + cluster: service + http_filters: + - name: envoy.filters.http.compressor + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.compressor.v3.Compressor + response_direction_config: + common_config: + min_content_length: 100 + content_type: + - application/octet-stream + disable_on_etag_header: false + compressor_library: + name: text_optimized + typed_config: + "@type": type.googleapis.com/envoy.extensions.compression.qatzstd.compressor.v3alpha.Qatzstd + compression_level: 10 + enable_qat_zstd: true + qat_zstd_fallback_threshold: 0 + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + clusters: + - name: service + connect_timeout: 0.25s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: service + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 1234 +admin: + address: + socket_address: + address: 0.0.0.0 + port_value: 9901 diff --git a/source/extensions/compression/zstd/common/BUILD b/source/common/compression/zstd/common/BUILD similarity index 91% rename from source/extensions/compression/zstd/common/BUILD rename to source/common/compression/zstd/common/BUILD index 2da0386499..8bc41364c5 100644 --- a/source/extensions/compression/zstd/common/BUILD +++ b/source/common/compression/zstd/common/BUILD @@ -1,12 +1,12 @@ load( "//bazel:envoy_build_system.bzl", "envoy_cc_library", - "envoy_extension_package", + "envoy_package", ) licenses(["notice"]) # Apache 2 -envoy_extension_package() +envoy_package() envoy_cc_library( name = "zstd_base_lib", diff --git a/source/extensions/compression/zstd/common/base.cc b/source/common/compression/zstd/common/base.cc similarity index 86% rename from source/extensions/compression/zstd/common/base.cc rename to source/common/compression/zstd/common/base.cc index 3682f36269..89dc2941cb 100644 --- a/source/extensions/compression/zstd/common/base.cc +++ b/source/common/compression/zstd/common/base.cc @@ -1,7 +1,6 @@ -#include "source/extensions/compression/zstd/common/base.h" +#include "source/common/compression/zstd/common/base.h" namespace Envoy { -namespace Extensions { namespace Compression { namespace Zstd { namespace Common { @@ -27,5 +26,4 @@ void Base::getOutput(Buffer::Instance& output_buffer) { } // namespace Common } // namespace Zstd } // namespace Compression -} // namespace Extensions } // namespace Envoy diff --git a/source/extensions/compression/zstd/common/base.h b/source/common/compression/zstd/common/base.h similarity index 92% rename from source/extensions/compression/zstd/common/base.h rename to source/common/compression/zstd/common/base.h index 6a3c626a37..8582fe3f86 100644 --- a/source/extensions/compression/zstd/common/base.h +++ b/source/common/compression/zstd/common/base.h @@ -7,7 +7,6 @@ #include "zstd.h" namespace Envoy { -namespace Extensions { namespace Compression { namespace Zstd { namespace Common { @@ -29,5 +28,4 @@ struct Base { } // namespace Common } // namespace Zstd } // namespace Compression -} // namespace Extensions } // namespace Envoy diff --git a/source/extensions/compression/zstd/common/dictionary_manager.h b/source/common/compression/zstd/common/dictionary_manager.h similarity index 98% rename from source/extensions/compression/zstd/common/dictionary_manager.h rename to source/common/compression/zstd/common/dictionary_manager.h index 06358b5948..1f71ea28ca 100644 --- a/source/extensions/compression/zstd/common/dictionary_manager.h +++ b/source/common/compression/zstd/common/dictionary_manager.h @@ -10,7 +10,6 @@ #include "zstd.h" namespace Envoy { -namespace Extensions { namespace Compression { namespace Zstd { namespace Common { @@ -117,5 +116,4 @@ template class } // namespace Common } // namespace Zstd } // namespace Compression -} // namespace Extensions } // namespace Envoy diff --git a/source/common/compression/zstd/compressor/BUILD b/source/common/compression/zstd/compressor/BUILD new file mode 100644 index 0000000000..75d11f2d5f --- /dev/null +++ b/source/common/compression/zstd/compressor/BUILD @@ -0,0 +1,21 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_package() + +envoy_cc_library( + name = "compressor_base", + srcs = ["zstd_compressor_impl_base.cc"], + hdrs = ["zstd_compressor_impl_base.h"], + deps = [ + "//envoy/compression/compressor:compressor_interface", + "//source/common/buffer:buffer_lib", + "//source/common/compression/zstd/common:zstd_base_lib", + "//source/common/compression/zstd/common:zstd_dictionary_manager_lib", + ], +) diff --git a/source/common/compression/zstd/compressor/zstd_compressor_impl_base.cc b/source/common/compression/zstd/compressor/zstd_compressor_impl_base.cc new file mode 100644 index 0000000000..af5e87f4fa --- /dev/null +++ b/source/common/compression/zstd/compressor/zstd_compressor_impl_base.cc @@ -0,0 +1,69 @@ +#include "source/common/compression/zstd/compressor/zstd_compressor_impl_base.h" + +#include "source/common/buffer/buffer_impl.h" + +namespace Envoy { +namespace Compression { +namespace Zstd { +namespace Compressor { + +ZstdCompressorImplBase::ZstdCompressorImplBase(uint32_t compression_level, bool enable_checksum, + uint32_t strategy, + const ZstdCDictManagerPtr& cdict_manager, + uint32_t chunk_size) + : Common::Base(chunk_size), cctx_(ZSTD_createCCtx(), &ZSTD_freeCCtx), + cdict_manager_(cdict_manager), compression_level_(compression_level) { + size_t result; + result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_checksumFlag, enable_checksum); + RELEASE_ASSERT(!ZSTD_isError(result), ""); + + result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_strategy, strategy); + RELEASE_ASSERT(!ZSTD_isError(result), ""); + + if (cdict_manager_) { + ZSTD_CDict* cdict = cdict_manager_->getFirstDictionary(); + result = ZSTD_CCtx_refCDict(cctx_.get(), cdict); + } else { + result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_compressionLevel, compression_level_); + } + RELEASE_ASSERT(!ZSTD_isError(result), ""); +} + +void ZstdCompressorImplBase::compress(Buffer::Instance& buffer, + Envoy::Compression::Compressor::State state) { + compressPreprocess(buffer, state); + + Buffer::OwnedImpl accumulation_buffer; + for (const Buffer::RawSlice& input_slice : buffer.getRawSlices()) { + if (input_slice.len_ > 0) { + compressProcess(buffer, input_slice, accumulation_buffer); + buffer.drain(input_slice.len_); + } + } + + compressPostprocess(accumulation_buffer); + + ASSERT(buffer.length() == 0); + buffer.move(accumulation_buffer); + + if (state == Envoy::Compression::Compressor::State::Finish) { + process(buffer, ZSTD_e_end); + } +} + +void ZstdCompressorImplBase::process(Buffer::Instance& output_buffer, ZSTD_EndDirective mode) { + bool finished; + do { + const size_t remaining = ZSTD_compressStream2(cctx_.get(), &output_, &input_, mode); + getOutput(output_buffer); + // If we're on the last chunk we're finished when zstd returns 0, + // which means its consumed all the input AND finished the frame. + // Otherwise, we're finished when we've consumed all the input. + finished = (ZSTD_e_end == mode) ? (remaining == 0) : (input_.pos == input_.size); + } while (!finished); +} + +} // namespace Compressor +} // namespace Zstd +} // namespace Compression +} // namespace Envoy diff --git a/source/common/compression/zstd/compressor/zstd_compressor_impl_base.h b/source/common/compression/zstd/compressor/zstd_compressor_impl_base.h new file mode 100644 index 0000000000..5da65ac425 --- /dev/null +++ b/source/common/compression/zstd/compressor/zstd_compressor_impl_base.h @@ -0,0 +1,48 @@ +#pragma once + +#include "envoy/compression/compressor/compressor.h" + +#include "source/common/compression/zstd/common/base.h" +#include "source/common/compression/zstd/common/dictionary_manager.h" + +namespace Envoy { +namespace Compression { +namespace Zstd { +namespace Compressor { + +using ZstdCDictManager = + Common::DictionaryManager; +using ZstdCDictManagerPtr = std::unique_ptr; + +/** + * Implementation of compressor's interface. + */ +class ZstdCompressorImplBase : public Common::Base, + public Envoy::Compression::Compressor::Compressor, + NonCopyable { +public: + ZstdCompressorImplBase(uint32_t compression_level, bool enable_checksum, uint32_t strategy, + const ZstdCDictManagerPtr& cdict_manager, uint32_t chunk_size); + + // Compression::Compressor::Compressor + void compress(Buffer::Instance& buffer, Envoy::Compression::Compressor::State state) override; + + void process(Buffer::Instance& output_buffer, ZSTD_EndDirective mode); + + virtual void compressPreprocess(Buffer::Instance& buffer, + Envoy::Compression::Compressor::State state) PURE; + + virtual void compressProcess(const Buffer::Instance& buffer, const Buffer::RawSlice& input_slice, + Buffer::Instance& accumulation_buffer) PURE; + + virtual void compressPostprocess(Buffer::Instance& accumulation_buffer) PURE; + + std::unique_ptr cctx_; + const ZstdCDictManagerPtr& cdict_manager_; + const uint32_t compression_level_; +}; + +} // namespace Compressor +} // namespace Zstd +} // namespace Compression +} // namespace Envoy diff --git a/source/extensions/compression/zstd/compressor/BUILD b/source/extensions/compression/zstd/compressor/BUILD index e6167ff1a1..fe53d6a123 100644 --- a/source/extensions/compression/zstd/compressor/BUILD +++ b/source/extensions/compression/zstd/compressor/BUILD @@ -16,8 +16,9 @@ envoy_cc_library( deps = [ "//envoy/compression/compressor:compressor_interface", "//source/common/buffer:buffer_lib", - "//source/extensions/compression/zstd/common:zstd_base_lib", - "//source/extensions/compression/zstd/common:zstd_dictionary_manager_lib", + "//source/common/compression/zstd/common:zstd_base_lib", + "//source/common/compression/zstd/common:zstd_dictionary_manager_lib", + "//source/common/compression/zstd/compressor:compressor_base", ], ) diff --git a/source/extensions/compression/zstd/compressor/zstd_compressor_impl.cc b/source/extensions/compression/zstd/compressor/zstd_compressor_impl.cc index 89d3ed754e..fedfb736d9 100644 --- a/source/extensions/compression/zstd/compressor/zstd_compressor_impl.cc +++ b/source/extensions/compression/zstd/compressor/zstd_compressor_impl.cc @@ -1,64 +1,22 @@ #include "source/extensions/compression/zstd/compressor/zstd_compressor_impl.h" -#include "source/common/buffer/buffer_impl.h" - namespace Envoy { namespace Extensions { namespace Compression { namespace Zstd { namespace Compressor { -ZstdCompressorImpl::ZstdCompressorImpl(uint32_t compression_level, bool enable_checksum, - uint32_t strategy, const ZstdCDictManagerPtr& cdict_manager, - uint32_t chunk_size) - : Common::Base(chunk_size), cctx_(ZSTD_createCCtx(), &ZSTD_freeCCtx), - cdict_manager_(cdict_manager), compression_level_(compression_level) { - size_t result; - result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_checksumFlag, enable_checksum); - RELEASE_ASSERT(!ZSTD_isError(result), ""); - - result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_strategy, strategy); - RELEASE_ASSERT(!ZSTD_isError(result), ""); +void ZstdCompressorImpl::compressPreprocess(Buffer::Instance&, + Envoy::Compression::Compressor::State) {} - if (cdict_manager_) { - ZSTD_CDict* cdict = cdict_manager_->getFirstDictionary(); - result = ZSTD_CCtx_refCDict(cctx_.get(), cdict); - } else { - result = ZSTD_CCtx_setParameter(cctx_.get(), ZSTD_c_compressionLevel, compression_level_); - } - RELEASE_ASSERT(!ZSTD_isError(result), ""); +void ZstdCompressorImpl::compressProcess(const Buffer::Instance&, + const Buffer::RawSlice& input_slice, + Buffer::Instance& accumulation_buffer) { + setInput(input_slice); + process(accumulation_buffer, ZSTD_e_continue); } -void ZstdCompressorImpl::compress(Buffer::Instance& buffer, - Envoy::Compression::Compressor::State state) { - Buffer::OwnedImpl accumulation_buffer; - for (const Buffer::RawSlice& input_slice : buffer.getRawSlices()) { - if (input_slice.len_ > 0) { - setInput(input_slice); - process(accumulation_buffer, ZSTD_e_continue); - buffer.drain(input_slice.len_); - } - } - - ASSERT(buffer.length() == 0); - buffer.move(accumulation_buffer); - - if (state == Envoy::Compression::Compressor::State::Finish) { - process(buffer, ZSTD_e_end); - } -} - -void ZstdCompressorImpl::process(Buffer::Instance& output_buffer, ZSTD_EndDirective mode) { - bool finished; - do { - const size_t remaining = ZSTD_compressStream2(cctx_.get(), &output_, &input_, mode); - getOutput(output_buffer); - // If we're on the last chunk we're finished when zstd returns 0, - // which means its consumed all the input AND finished the frame. - // Otherwise, we're finished when we've consumed all the input. - finished = (ZSTD_e_end == mode) ? (remaining == 0) : (input_.pos == input_.size); - } while (!finished); -} +void ZstdCompressorImpl::compressPostprocess(Buffer::Instance&) {} } // namespace Compressor } // namespace Zstd diff --git a/source/extensions/compression/zstd/compressor/zstd_compressor_impl.h b/source/extensions/compression/zstd/compressor/zstd_compressor_impl.h index e0adaec7ca..dd5b9f6769 100644 --- a/source/extensions/compression/zstd/compressor/zstd_compressor_impl.h +++ b/source/extensions/compression/zstd/compressor/zstd_compressor_impl.h @@ -2,8 +2,9 @@ #include "envoy/compression/compressor/compressor.h" -#include "source/extensions/compression/zstd/common/base.h" -#include "source/extensions/compression/zstd/common/dictionary_manager.h" +#include "source/common/compression/zstd/common/base.h" +#include "source/common/compression/zstd/common/dictionary_manager.h" +#include "source/common/compression/zstd/compressor/zstd_compressor_impl_base.h" namespace Envoy { namespace Extensions { @@ -12,28 +13,28 @@ namespace Zstd { namespace Compressor { using ZstdCDictManager = - Common::DictionaryManager; + Envoy::Compression::Zstd::Common::DictionaryManager; using ZstdCDictManagerPtr = std::unique_ptr; /** * Implementation of compressor's interface. */ -class ZstdCompressorImpl : public Common::Base, - public Envoy::Compression::Compressor::Compressor, - NonCopyable { +class ZstdCompressorImpl : public Envoy::Compression::Zstd::Compressor::ZstdCompressorImplBase { public: ZstdCompressorImpl(uint32_t compression_level, bool enable_checksum, uint32_t strategy, - const ZstdCDictManagerPtr& cdict_manager, uint32_t chunk_size); - - // Compression::Compressor::Compressor - void compress(Buffer::Instance& buffer, Envoy::Compression::Compressor::State state) override; + const ZstdCDictManagerPtr& cdict_manager, uint32_t chunk_size) + : ZstdCompressorImplBase(compression_level, enable_checksum, strategy, cdict_manager, + chunk_size) {} private: - void process(Buffer::Instance& output_buffer, ZSTD_EndDirective mode); + void compressPreprocess(Buffer::Instance& buffer, + Envoy::Compression::Compressor::State state) override; + + void compressProcess(const Buffer::Instance& buffer, const Buffer::RawSlice& input_slice, + Buffer::Instance& accumulation_buffer) override; - std::unique_ptr cctx_; - const ZstdCDictManagerPtr& cdict_manager_; - const uint32_t compression_level_; + void compressPostprocess(Buffer::Instance& accumulation_buffer) override; }; } // namespace Compressor diff --git a/source/extensions/compression/zstd/decompressor/BUILD b/source/extensions/compression/zstd/decompressor/BUILD index 9fc4383b93..a0f5611649 100644 --- a/source/extensions/compression/zstd/decompressor/BUILD +++ b/source/extensions/compression/zstd/decompressor/BUILD @@ -18,8 +18,8 @@ envoy_cc_library( "//envoy/stats:stats_interface", "//envoy/stats:stats_macros", "//source/common/buffer:buffer_lib", - "//source/extensions/compression/zstd/common:zstd_base_lib", - "//source/extensions/compression/zstd/common:zstd_dictionary_manager_lib", + "//source/common/compression/zstd/common:zstd_base_lib", + "//source/common/compression/zstd/common:zstd_dictionary_manager_lib", ], ) diff --git a/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.cc b/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.cc index 7a165da197..e2a00232c1 100644 --- a/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.cc +++ b/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.cc @@ -21,7 +21,7 @@ constexpr uint64_t MaxInflateRatio = 100; ZstdDecompressorImpl::ZstdDecompressorImpl(Stats::Scope& scope, const std::string& stats_prefix, const ZstdDDictManagerPtr& ddict_manager, uint32_t chunk_size) - : Common::Base(chunk_size), dctx_(ZSTD_createDCtx(), &ZSTD_freeDCtx), + : Envoy::Compression::Zstd::Common::Base(chunk_size), dctx_(ZSTD_createDCtx(), &ZSTD_freeDCtx), ddict_manager_(ddict_manager), stats_(generateStats(stats_prefix, scope)) {} void ZstdDecompressorImpl::decompress(const Buffer::Instance& input_buffer, diff --git a/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.h b/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.h index 597a8c72ad..e738575f49 100644 --- a/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.h +++ b/source/extensions/compression/zstd/decompressor/zstd_decompressor_impl.h @@ -5,8 +5,8 @@ #include "envoy/stats/stats_macros.h" #include "source/common/common/logger.h" -#include "source/extensions/compression/zstd/common/base.h" -#include "source/extensions/compression/zstd/common/dictionary_manager.h" +#include "source/common/compression/zstd/common/base.h" +#include "source/common/compression/zstd/common/dictionary_manager.h" #include "zstd_errors.h" @@ -17,7 +17,8 @@ namespace Zstd { namespace Decompressor { using ZstdDDictManager = - Common::DictionaryManager; + Envoy::Compression::Zstd::Common::DictionaryManager; using ZstdDDictManagerPtr = std::unique_ptr; /** @@ -39,7 +40,7 @@ struct ZstdDecompressorStats { /** * Implementation of decompressor's interface. */ -class ZstdDecompressorImpl : public Common::Base, +class ZstdDecompressorImpl : public Envoy::Compression::Zstd::Common::Base, public Envoy::Compression::Decompressor::Decompressor, public Logger::Loggable, NonCopyable {