Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pubsub/pulsar/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type pulsarMetadata struct {
ReplicateSubscriptionState bool `mapstructure:"replicateSubscriptionState"`
SubscriptionMode string `mapstructure:"subscribeMode"`
Token string `mapstructure:"token"`
CompressionType string `mapstructure:"compressionType"`
CompressionLevel string `mapstructure:"compressionLevel"`
oauth2.ClientCredentialsMetadata `mapstructure:",squash"`
}

Expand Down
26 changes: 26 additions & 0 deletions pubsub/pulsar/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,32 @@ metadata:
type: number
default: '"131072" (128 KB)'
example: '"131072"'
- name: compressionType
type: string
description: |
Sets the compression type for messages sent by the producer.
Compression can help reduce message size and improve throughput for large messages.
default: '"none"'
example: '"lz4"'
allowedValues:
- none
- lz4
- zlib
- zstd
url:
title: "Pulsar Message Compression"
url: "https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#compression"
- name: compressionLevel
type: string
description: |
Sets the compression level when compressionType is enabled.
Higher compression levels provide better compression ratios but require more CPU resources.
default: '"default"'
example: '"faster"'
allowedValues:
- default
- faster
- better
- name: publicKey
type: string
description: |
Expand Down
72 changes: 72 additions & 0 deletions pubsub/pulsar/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,18 @@ const (

subscribeModeDurable = "durable"
subscribeModeNonDurable = "non_durable"

compressionTypeKey = "compressionType"
compressionLevelKey = "compressionLevel"

compressionTypeNone = "none"
compressionTypeLZ4 = "lz4"
compressionTypeZLib = "zlib"
compressionTypeZSTD = "zstd"

compressionLevelDefault = "default"
compressionLevelFaster = "faster"
compressionLevelBetter = "better"
)

type ProcessMode string
Expand Down Expand Up @@ -168,6 +180,16 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
return nil, errors.New("invalid subscription mode")
}

m.CompressionType, err = parseCompressionType(meta.Properties[compressionTypeKey])
if err != nil {
return nil, errors.New("invalid compression type. Accepted values are `none`, `lz4`, `zlib` and `zstd`")
}

m.CompressionLevel, err = parseCompressionLevel(meta.Properties[compressionLevelKey])
if err != nil {
return nil, errors.New("invalid compression level. Accepted values are `default`, `faster` and `better`")
}

for k, v := range meta.Properties {
switch {
case strings.HasSuffix(k, topicJSONSchemaIdentifier):
Expand Down Expand Up @@ -308,6 +330,8 @@ func (p *Pulsar) Publish(ctx context.Context, req *pubsub.PublishRequest) error
BatchingMaxPublishDelay: p.metadata.BatchingMaxPublishDelay,
BatchingMaxMessages: p.metadata.BatchingMaxMessages,
BatchingMaxSize: p.metadata.BatchingMaxSize,
CompressionType: getCompressionType(p.metadata.CompressionType),
CompressionLevel: getCompressionLevel(p.metadata.CompressionLevel),
}

if hasSchema {
Expand Down Expand Up @@ -497,6 +521,54 @@ func getSubscriptionMode(subsModeStr string) pulsar.SubscriptionMode {
}
}

func parseCompressionType(in string) (string, error) {
compType := strings.ToLower(in)
switch compType {
case compressionTypeNone, compressionTypeLZ4, compressionTypeZLib, compressionTypeZSTD:
return compType, nil
case "":
return compressionTypeNone, nil
default:
return "", fmt.Errorf("invalid compression type: %s", compType)
}
}

func getCompressionType(compTypeStr string) pulsar.CompressionType {
switch compTypeStr {
case compressionTypeLZ4:
return pulsar.LZ4
case compressionTypeZLib:
return pulsar.ZLib
case compressionTypeZSTD:
return pulsar.ZSTD
default:
return pulsar.NoCompression
}
}

func parseCompressionLevel(in string) (string, error) {
compLevel := strings.ToLower(in)
switch compLevel {
case compressionLevelDefault, compressionLevelFaster, compressionLevelBetter:
return compLevel, nil
case "":
return compressionLevelDefault, nil
default:
return "", fmt.Errorf("invalid compression level: %s", compLevel)
}
}

func getCompressionLevel(compLevelStr string) pulsar.CompressionLevel {
switch compLevelStr {
case compressionLevelFaster:
return pulsar.Faster
case compressionLevelBetter:
return pulsar.Better
default:
return pulsar.Default
}
}

func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
if p.closed.Load() {
return errors.New("component is closed")
Expand Down
220 changes: 220 additions & 0 deletions pubsub/pulsar/pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,226 @@ func TestParsePulsarMetadataReplicateSubscriptionState(t *testing.T) {
}
}

func TestParsePulsarMetadataCompressionType(t *testing.T) {
tt := []struct {
name string
compressionType string
expected string
expectedPulsar pulsar.CompressionType
err bool
}{
{
name: "test valid compression type - none",
compressionType: "none",
expected: "none",
expectedPulsar: pulsar.NoCompression,
err: false,
},
{
name: "test valid compression type - lz4",
compressionType: "lz4",
expected: "lz4",
expectedPulsar: pulsar.LZ4,
err: false,
},
{
name: "test valid compression type - zlib",
compressionType: "zlib",
expected: "zlib",
expectedPulsar: pulsar.ZLib,
err: false,
},
{
name: "test valid compression type - zstd",
compressionType: "zstd",
expected: "zstd",
expectedPulsar: pulsar.ZSTD,
err: false,
},
{
name: "test valid compression type - empty defaults to none",
compressionType: "",
expected: "none",
expectedPulsar: pulsar.NoCompression,
err: false,
},
{
name: "test valid compression type - case insensitive",
compressionType: "LZ4",
expected: "lz4",
expectedPulsar: pulsar.LZ4,
err: false,
},
{
name: "test invalid compression type",
compressionType: "invalid",
err: true,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"host": "a",
"compressionType": tc.compressionType,
}
meta, err := parsePulsarMetadata(m)

if tc.err {
require.Error(t, err)
assert.Nil(t, meta)
return
}

require.NoError(t, err)
assert.Equal(t, tc.expected, meta.CompressionType)
assert.Equal(t, tc.expectedPulsar, getCompressionType(meta.CompressionType))
})
}
}

func TestParsePulsarMetadataCompressionLevel(t *testing.T) {
tt := []struct {
name string
compressionLevel string
expected string
expectedPulsar pulsar.CompressionLevel
err bool
}{
{
name: "test valid compression level - default",
compressionLevel: "default",
expected: "default",
expectedPulsar: pulsar.Default,
err: false,
},
{
name: "test valid compression level - faster",
compressionLevel: "faster",
expected: "faster",
expectedPulsar: pulsar.Faster,
err: false,
},
{
name: "test valid compression level - better",
compressionLevel: "better",
expected: "better",
expectedPulsar: pulsar.Better,
err: false,
},
{
name: "test valid compression level - empty defaults to default",
compressionLevel: "",
expected: "default",
expectedPulsar: pulsar.Default,
err: false,
},
{
name: "test valid compression level - case insensitive",
compressionLevel: "FASTER",
expected: "faster",
expectedPulsar: pulsar.Faster,
err: false,
},
{
name: "test invalid compression level",
compressionLevel: "invalid",
err: true,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"host": "a",
"compressionLevel": tc.compressionLevel,
}
meta, err := parsePulsarMetadata(m)

if tc.err {
require.Error(t, err)
assert.Nil(t, meta)
return
}

require.NoError(t, err)
assert.Equal(t, tc.expected, meta.CompressionLevel)
assert.Equal(t, tc.expectedPulsar, getCompressionLevel(meta.CompressionLevel))
})
}
}

func TestParsePulsarMetadataCompressionCombination(t *testing.T) {
tt := []struct {
name string
compressionType string
compressionLevel string
expectedType string
expectedLevel string
err bool
}{
{
name: "test default compression settings",
compressionType: "",
compressionLevel: "",
expectedType: "none",
expectedLevel: "default",
err: false,
},
{
name: "test lz4 with faster compression",
compressionType: "lz4",
compressionLevel: "faster",
expectedType: "lz4",
expectedLevel: "faster",
err: false,
},
{
name: "test zstd with better compression",
compressionType: "zstd",
compressionLevel: "better",
expectedType: "zstd",
expectedLevel: "better",
err: false,
},
{
name: "test invalid compression type",
compressionType: "invalid",
err: true,
},
{
name: "test invalid compression level",
compressionType: "lz4",
compressionLevel: "invalid",
err: true,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{
"host": "a",
"compressionType": tc.compressionType,
"compressionLevel": tc.compressionLevel,
}
meta, err := parsePulsarMetadata(m)

if tc.err {
require.Error(t, err)
assert.Nil(t, meta)
return
}

require.NoError(t, err)
assert.Equal(t, tc.expectedType, meta.CompressionType)
assert.Equal(t, tc.expectedLevel, meta.CompressionLevel)
})
}
}

func TestSanitiseURL(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading