Skip to content

Commit aab7eb0

Browse files
hash-databadalprasadsinghImDoubD-datazipsanjay7178vaibhav-datazip
authored
chore: releasing version 0.3.7 (#705)
Signed-off-by: badalprasadsingh <badal@datazip.io> Signed-off-by: Badal Prasad Singh <badal@datazip.io> Signed-off-by: Sai Sanjay <saisanjay7660@gmail.com> Co-authored-by: Badal Prasad Singh <badal@datazip.io> Co-authored-by: Duke <duke@datazip.io> Co-authored-by: Sai Sanjay <saisanjay7660@gmail.com> Co-authored-by: Vaibhav <vaibhav@datazip.io>
2 parents 9a47881 + 765752b commit aab7eb0

13 files changed

Lines changed: 257 additions & 31 deletions

File tree

build.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,4 +120,4 @@ if [ $# -gt 0 ]; then
120120
fi
121121
else
122122
fail "No arguments provided."
123-
fi
123+
fi

drivers/abstract/abstract.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,6 @@ func (a *AbstractDriver) ClearState(streams []types.StreamInterface) (*types.Sta
136136
for streamID := range dropStreams {
137137
a.state.Global.Streams.Remove(streamID)
138138
}
139-
// if all global streams are dropped, no point for global state itself, making it null
140-
if len(a.state.Global.Streams.Array()) == 0 {
141-
a.state.Global.State = nil
142-
}
143139
}
144140

145141
if len(a.state.Streams) > 0 {

drivers/kafka/README.md

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,30 @@ Add Kafka credentials in following format in `source.json` file. To check more a
2929
"max_threads": 3
3030
}
3131
```
32-
- There are 3 security protocols:<br>
32+
- There are 4 security protocols:<br>
3333
- `PLAINTEXT`
3434
- Kafka cluster without any authentication and encryption.
35+
- `SSL`
36+
- Kafka cluster with TLS encryption (no SASL authentication).
37+
- Supports external certificates for custom CA or mTLS.
38+
- All certificate fields are optional (useful for AWS MSK where broker certificates are managed).
39+
```json
40+
"protocol": {
41+
"security_protocol": "SSL",
42+
"tls_skip_verify": false,
43+
"ssl": {
44+
"server_ca": "-----BEGIN CERTIFICATE-----\nMII...\n-----END CERTIFICATE-----",
45+
"client_cert": "-----BEGIN CERTIFICATE-----\nMII...\n-----END CERTIFICATE-----",
46+
"client_key": "-----BEGIN PRIVATE KEY-----\nMII...\n-----END PRIVATE KEY-----",
47+
}
48+
}
49+
```
50+
- For AWS MSK (no custom certificates needed):
51+
```json
52+
"protocol": {
53+
"security_protocol": "SSL"
54+
}
55+
```
3556
- `SASL_PLAINTEXT`
3657
- Kafka cluster with Simple Authentication and Security Layer i.e. authentication but no encription.
3758
- Requires SASL mechanism and SASL JAAS Configuration string. Supported are:
@@ -53,6 +74,7 @@ Add Kafka credentials in following format in `source.json` file. To check more a
5374
```
5475
- `SASL_SSL`
5576
- Kafka cluster with Simple Authentication and Security Layer i.e. authentication and encryption using Secure Sockets Layer.
77+
- Supports external certificates for custom CA or TLS/SSL (optional).
5678
- Requires SASL mechanism and SASL JAAS Configuration string. Supported are:
5779
- PLAIN
5880
```json
@@ -70,6 +92,20 @@ Add Kafka credentials in following format in `source.json` file. To check more a
7092
"sasl_jaas_config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"TEST-PASS\" password=\"TEST-PASS\";"
7193
},
7294
```
95+
- With external certificates (TLS/SSL):
96+
```json
97+
"protocol": {
98+
"security_protocol": "SASL_SSL",
99+
"sasl_mechanism": "PLAIN",
100+
"sasl_jaas_config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"TEST-USER\" password=\"TEST-PASS\";",
101+
"tls_skip_verify": false,
102+
"ssl": {
103+
"server_ca": "-----BEGIN CERTIFICATE-----\nMII...\n-----END CERTIFICATE-----",
104+
"client_cert": "-----BEGIN CERTIFICATE-----\nMII...\n-----END CERTIFICATE-----",
105+
"client_key": "-----BEGIN PRIVATE KEY-----\nMII...\n-----END PRIVATE KEY-----"
106+
}
107+
},
108+
```
73109

74110
## Commands
75111
### Discover Command

drivers/kafka/go.mod

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ require (
1818
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
1919
github.com/Microsoft/go-winio v0.6.2 // indirect
2020
github.com/andybalholm/brotli v1.1.1 // indirect
21+
github.com/apache/arrow-go/v18 v18.2.0 // indirect
2122
github.com/apache/thrift v0.21.0 // indirect
2223
github.com/aws/aws-sdk-go v1.55.6 // indirect
2324
github.com/aws/aws-sdk-go-v2 v1.36.5 // indirect
@@ -58,18 +59,24 @@ require (
5859
github.com/go-viper/mapstructure/v2 v2.3.0 // indirect
5960
github.com/goccy/go-json v0.10.5 // indirect
6061
github.com/gogo/protobuf v1.3.2 // indirect
62+
github.com/golang/snappy v0.0.4 // indirect
63+
github.com/google/flatbuffers v25.2.10+incompatible // indirect
6164
github.com/google/uuid v1.6.0 // indirect
6265
github.com/hashicorp/errwrap v1.1.0 // indirect
6366
github.com/hashicorp/go-multierror v1.1.1 // indirect
6467
github.com/inconshreveable/mousetrap v1.1.0 // indirect
6568
github.com/jmespath/go-jmespath v0.4.0 // indirect
69+
github.com/klauspost/asmfmt v1.3.2 // indirect
6670
github.com/klauspost/compress v1.18.0 // indirect
71+
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
6772
github.com/leodido/go-urn v1.4.0 // indirect
6873
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
6974
github.com/magiconair/properties v1.8.10 // indirect
7075
github.com/mattn/go-colorable v0.1.14 // indirect
7176
github.com/mattn/go-isatty v0.0.20 // indirect
7277
github.com/mattn/go-runewidth v0.0.16 // indirect
78+
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
79+
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
7380
github.com/mitchellh/hashstructure v1.1.0 // indirect
7481
github.com/moby/docker-image-spec v1.3.1 // indirect
7582
github.com/moby/go-archive v0.1.0 // indirect
@@ -106,12 +113,14 @@ require (
106113
github.com/testcontainers/testcontainers-go v0.37.0 // indirect
107114
github.com/tklauser/go-sysconf v0.3.12 // indirect
108115
github.com/tklauser/numcpus v0.6.1 // indirect
116+
github.com/twmb/murmur3 v1.1.8 // indirect
109117
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
110118
github.com/xdg-go/scram v1.1.2 // indirect
111119
github.com/xdg-go/stringprep v1.0.4 // indirect
112120
github.com/xitongsys/parquet-go v1.6.2 // indirect
113121
github.com/xitongsys/parquet-go-source v0.0.0-20241021075129-b732d2ac9c9b // indirect
114122
github.com/yusufpapurcu/wmi v1.2.4 // indirect
123+
github.com/zeebo/xxh3 v1.0.2 // indirect
115124
go.mongodb.org/mongo-driver v1.17.3 // indirect
116125
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
117126
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
@@ -120,10 +129,14 @@ require (
120129
go.opentelemetry.io/otel/trace v1.38.0 // indirect
121130
go.uber.org/multierr v1.11.0 // indirect
122131
golang.org/x/crypto v0.40.0 // indirect
132+
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect
133+
golang.org/x/mod v0.25.0 // indirect
123134
golang.org/x/net v0.42.0 // indirect
124135
golang.org/x/sync v0.16.0 // indirect
125136
golang.org/x/sys v0.35.0 // indirect
126137
golang.org/x/text v0.27.0 // indirect
138+
golang.org/x/tools v0.34.0 // indirect
139+
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
127140
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
128141
google.golang.org/grpc v1.71.3 // indirect
129142
google.golang.org/protobuf v1.36.6 // indirect

drivers/kafka/internal/config.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ type Config struct {
1717
}
1818

1919
type ProtocolConfig struct {
20-
SecurityProtocol string `json:"security_protocol"`
21-
SASLMechanism string `json:"sasl_mechanism,omitempty"`
22-
SASLJAASConfig string `json:"sasl_jaas_config,omitempty"`
20+
SecurityProtocol string `json:"security_protocol"`
21+
SASLMechanism string `json:"sasl_mechanism,omitempty"`
22+
SASLJAASConfig string `json:"sasl_jaas_config,omitempty"`
23+
TLSSkipVerify bool `json:"tls_skip_verify,omitempty"`
24+
SSL *utils.SSLConfig `json:"ssl,omitempty"`
2325
}
2426

2527
func (c *Config) Validate() error {
@@ -28,7 +30,7 @@ func (c *Config) Validate() error {
2830
}
2931

3032
if c.Protocol.SecurityProtocol == "" {
31-
return fmt.Errorf("security_protocol must be either PLAINTEXT or SASL_PLAINTEXT or SASL_SSL")
33+
return fmt.Errorf("security_protocol must be one of: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL")
3234
}
3335

3436
if c.Protocol.SecurityProtocol == "SASL_PLAINTEXT" || c.Protocol.SecurityProtocol == "SASL_SSL" {
@@ -40,6 +42,20 @@ func (c *Config) Validate() error {
4042
}
4143
}
4244

45+
if c.Protocol.SecurityProtocol == "SSL" || c.Protocol.SecurityProtocol == "SASL_SSL" {
46+
if c.Protocol.SSL != nil {
47+
// Server CA is always required
48+
if c.Protocol.SSL.ServerCA == "" {
49+
return fmt.Errorf("server_ca must be provided")
50+
}
51+
52+
// Client Cert and Key are required together for mTLS
53+
if (c.Protocol.SSL.ClientCert != "") != (c.Protocol.SSL.ClientKey != "") {
54+
return fmt.Errorf("both client_cert and client_key must be provided together for mTLS")
55+
}
56+
}
57+
}
58+
4359
if c.MaxThreads <= 0 {
4460
c.MaxThreads = constants.DefaultThreadCount
4561
}

drivers/kafka/internal/kafka.go

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ package driver
33
import (
44
"context"
55
"crypto/tls"
6+
"crypto/x509"
67
"fmt"
7-
kafkapkg "github.com/datazip-inc/olake/pkg/kafka"
88
"regexp"
99
"strings"
1010
"sync"
1111
"time"
1212

13+
kafkapkg "github.com/datazip-inc/olake/pkg/kafka"
14+
1315
"github.com/datazip-inc/olake/constants"
1416
"github.com/datazip-inc/olake/drivers/abstract"
1517
"github.com/datazip-inc/olake/types"
@@ -149,6 +151,15 @@ func (k *Kafka) createDialer() (*kafka.Dialer, error) {
149151
switch k.config.Protocol.SecurityProtocol {
150152
case "PLAINTEXT":
151153
// No additional configuration needed
154+
155+
case "SSL":
156+
// Pure TLS without SASL authentication
157+
tlsConfig, err := k.buildTLSConfig()
158+
if err != nil {
159+
return nil, err
160+
}
161+
dialer.TLS = tlsConfig
162+
152163
case "SASL_PLAINTEXT":
153164
switch k.config.Protocol.SASLMechanism {
154165
case "PLAIN":
@@ -164,10 +175,15 @@ func (k *Kafka) createDialer() (*kafka.Dialer, error) {
164175
default:
165176
return nil, fmt.Errorf("unsupported SASL mechanism: %s", k.config.Protocol.SASLMechanism)
166177
}
178+
167179
case "SASL_SSL":
168-
dialer.TLS = &tls.Config{
169-
MinVersion: tls.VersionTLS12,
180+
// TLS with SASL authentication
181+
tlsConfig, err := k.buildTLSConfig()
182+
if err != nil {
183+
return nil, err
170184
}
185+
dialer.TLS = tlsConfig
186+
171187
switch k.config.Protocol.SASLMechanism {
172188
case "PLAIN":
173189
dialer.SASLMechanism = plain.Mechanism{
@@ -182,6 +198,7 @@ func (k *Kafka) createDialer() (*kafka.Dialer, error) {
182198
default:
183199
return nil, fmt.Errorf("unsupported SASL mechanism: %s", k.config.Protocol.SASLMechanism)
184200
}
201+
185202
default:
186203
return nil, fmt.Errorf("unsupported security protocol: %s", k.config.Protocol.SecurityProtocol)
187204
}
@@ -202,6 +219,38 @@ func parseSASLPlain(jassConfig string) (string, string, error) {
202219
return matches[1], matches[2], nil
203220
}
204221

222+
// buildTLSConfig creates TLS configuration with optional external certificates
223+
func (k *Kafka) buildTLSConfig() (*tls.Config, error) {
224+
tlsConfig := &tls.Config{
225+
MinVersion: tls.VersionTLS12,
226+
}
227+
228+
// Apply SSL config if provided
229+
if k.config.Protocol.SSL != nil {
230+
tlsConfig.InsecureSkipVerify = k.config.Protocol.TLSSkipVerify
231+
232+
// Load CA certificate if provided
233+
if k.config.Protocol.SSL.ServerCA != "" {
234+
caCertPool := x509.NewCertPool()
235+
if !caCertPool.AppendCertsFromPEM([]byte(k.config.Protocol.SSL.ServerCA)) {
236+
return nil, fmt.Errorf("failed to parse CA certificate")
237+
}
238+
tlsConfig.RootCAs = caCertPool
239+
}
240+
241+
// Load client certificate and key for mTLS
242+
if k.config.Protocol.SSL.ClientCert != "" && k.config.Protocol.SSL.ClientKey != "" {
243+
cert, err := tls.X509KeyPair([]byte(k.config.Protocol.SSL.ClientCert), []byte(k.config.Protocol.SSL.ClientKey))
244+
if err != nil {
245+
return nil, fmt.Errorf("failed to load client certificate/key: %w", err)
246+
}
247+
tlsConfig.Certificates = []tls.Certificate{cert}
248+
}
249+
}
250+
251+
return tlsConfig, nil
252+
}
253+
205254
// checkPartitionCompletion checks if a partition is complete and handles loop termination
206255
func (k *Kafka) checkPartitionCompletion(ctx context.Context, readerID string, completedPartitions, observedPartitions map[types.PartitionKey]struct{}) (bool, error) {
207256
// cache observed partitions

drivers/kafka/resources/spec.json

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,87 @@
7575
"title": "SASL JAAS Config",
7676
"description": "JAAS configuration string (e.g., org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pass\";)",
7777
"minLength": 1
78+
},
79+
"tls_skip_verify": {
80+
"type": "boolean",
81+
"title": "Skip TLS Verification",
82+
"description": "Skip TLS certificate verification (not recommended for production)",
83+
"default": false
84+
},
85+
"ssl": {
86+
"type": "object",
87+
"title": "SSL Configuration",
88+
"description": "SSL/TLS certificate configuration (optional for AWS MSK)",
89+
"properties": {
90+
"server_ca": {
91+
"type": "string",
92+
"title": "CA Certificate",
93+
"description": "CA certificate content in PEM format",
94+
"multiline": true
95+
},
96+
"client_cert": {
97+
"type": "string",
98+
"title": "Client Certificate",
99+
"description": "Client certificate content for mTLS authentication in PEM format",
100+
"multiline": true
101+
},
102+
"client_key": {
103+
"type": "string",
104+
"title": "Client Key",
105+
"description": "Client private key content for mTLS authentication in PEM format",
106+
"multiline": true
107+
}
108+
}
78109
}
79110
},
80111
"required": [
81112
"security_protocol",
82113
"sasl_mechanism",
83114
"sasl_jaas_config"
84115
]
116+
},
117+
{
118+
"title": "SSL",
119+
"type": "object",
120+
"properties": {
121+
"security_protocol": {
122+
"const": "SSL"
123+
},
124+
"tls_skip_verify": {
125+
"type": "boolean",
126+
"title": "Skip TLS Verification",
127+
"description": "Skip TLS certificate verification (not recommended for production)",
128+
"default": false
129+
},
130+
"ssl": {
131+
"type": "object",
132+
"title": "SSL Configuration",
133+
"description": "SSL/TLS certificate configuration (all fields optional for AWS MSK)",
134+
"properties": {
135+
"server_ca": {
136+
"type": "string",
137+
"title": "CA Certificate",
138+
"description": "CA certificate content in PEM format",
139+
"multiline": true
140+
},
141+
"client_cert": {
142+
"type": "string",
143+
"title": "Client Certificate",
144+
"description": "Client certificate content for mTLS authentication in PEM format",
145+
"multiline": true
146+
},
147+
"client_key": {
148+
"type": "string",
149+
"title": "Client Key",
150+
"description": "Client private key content for mTLS authentication in PEM format",
151+
"multiline": true
152+
}
153+
}
154+
}
155+
},
156+
"required": [
157+
"security_protocol"
158+
]
85159
}
86160
]
87161
},

drivers/mysql/internal/datatype_conversion.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@ var mysqlTypeToDataTypes = map[string]types.DataType{
2020
// Floating point types
2121
"float": types.Float32,
2222
"real": types.Float32,
23-
"decimal": types.Float32,
24-
"numeric": types.Float32,
2523
"double": types.Float64,
2624

25+
// Can handle up to 15 significant digits accurately (e.g., DECIMAL(15,2) or DECIMAL(15,7))
26+
// Values with 16 digits may have minor rounding. Beyond 16 (from 17) digits will have precision loss.
27+
"numeric": types.Float64,
28+
"decimal": types.Float64,
29+
2730
// String types
2831
"char": types.String,
2932
"varchar": types.String,

0 commit comments

Comments
 (0)