Skip to content

Commit 09724fc

Browse files
chore: testing-5
1 parent e80ff57 commit 09724fc

3 files changed

Lines changed: 18 additions & 13 deletions

File tree

drivers/kafka/internal/kafka_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ func TestKafkaIntegration(t *testing.T) {
1919
name: "JSON-Format",
2020
cfg: &testutils.IntegrationTest{
2121
TestConfig: testutils.GetTestConfig(string(constants.Kafka),"Json"),
22-
Namespace: "json-topic",
22+
Namespace: "topics",
2323
ExpectedData: ExpectedKafkaJSONData,
2424
ExpectedUpdatedData: ExpectedKafkaUpdatedJSONData,
2525
DestinationDataTypeSchema: KafkaToDestinationJSONSchema,
2626
UpdatedDestinationDataTypeSchema: EvolvedKafkaToDestinationJSONSchema,
2727
DefaultCDCColumnsSchema: ExpectedKafkaDefaultCDCColumnsSchema,
2828
ExecuteQuery: ExecuteQueryForJson,
29-
DestinationDB: "kafka_topics",
29+
DestinationDB: "kafka_json_topics",
3030
PartitionRegex: "/{int_value,identity}",
3131
FilterConfig: `{
3232
"logical_operator": "And",
@@ -49,14 +49,14 @@ func TestKafkaIntegration(t *testing.T) {
4949
name: "AVRO-Format",
5050
cfg: &testutils.IntegrationTest{
5151
TestConfig: testutils.GetTestConfig(string(constants.Kafka),"Avro"),
52-
Namespace: "Avro-topic",
52+
Namespace: "topics",
5353
ExpectedData: ExpectedKafkaAvroData,
5454
ExpectedUpdatedData: ExpectedKafkaUpdatedAvroData,
5555
DestinationDataTypeSchema: KafkaToDestinationAvroSchema,
5656
UpdatedDestinationDataTypeSchema: EvolvedKafkaToDestinationAvroSchema,
5757
DefaultCDCColumnsSchema: ExpectedKafkaDefaultCDCColumnsSchema,
5858
ExecuteQuery: ExecuteQueryForAvro,
59-
DestinationDB: "kafka_topics",
59+
DestinationDB: "kafka_avro_topics",
6060
PartitionRegex: "/{int_value,identity}",
6161
FilterConfig: `{
6262
"logical_operator": "And",

drivers/kafka/internal/kafka_test_utils.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ import (
2020

2121
var (
2222
// Message key and value for JSON
23-
key = []byte("test-key")
23+
Jsonkey = []byte("json-key")
24+
Avrokey = []byte("avro-key")
2425
value = []byte(`{"int_value": 100,"float_value": 99.99,"boolean_true": true,"boolean_false": false,"timestamp_value": "2026-03-22T14:30:00Z","string_value": "test_string"}`)
2526
evolved_value = []byte(`{"int_value": 100,"float_value": 99.99,"boolean_true": true,"boolean_false": false,"timestamp_value": "2026-03-22T14:30:00Z","string_value": "test_string", "id_int": 101}`)
2627
filtervalue1 = []byte(`{"int_value": 99,"float_value": 99.99}`)
@@ -128,23 +129,23 @@ func ExecuteQueryForJson(ctx context.Context, t *testing.T, streams []string, op
128129
case "add":
129130
for partition := 0; partition < partitionCount; partition++ {
130131
writeMessagesWithRetry(ctx, t, writer, kafka.Message{
131-
Key: key,
132+
Key: Jsonkey,
132133
Value: value,
133134
})
134135
}
135136
writeMessagesWithRetry(ctx, t, writer, kafka.Message{
136-
Key: key,
137+
Key: Jsonkey,
137138
Value: filtervalue1,
138139
})
139140
writeMessagesWithRetry(ctx, t, writer, kafka.Message{
140-
Key: key,
141+
Key: Jsonkey,
141142
Value: filtervalue2,
142143
})
143144
t.Logf("Added 7 messages to topic '%s' (one per partition and two for filters)", streams[0])
144145
case "evolve-schema":
145146
for partition := 0; partition < partitionCount; partition++ {
146147
writeMessagesWithRetry(ctx, t, writer, kafka.Message{
147-
Key: key,
148+
Key: Jsonkey,
148149
Value: evolved_value,
149150
Partition: partition,
150151
})
@@ -203,8 +204,8 @@ func ExecuteQueryForAvro(ctx context.Context, t *testing.T, streams []string, op
203204
confluentBaseMsg := encodeConfluentBinary(schemaID, binaryData)
204205
confluentFilterMsg := encodeConfluentBinary(schemaID, binaryDataFilter)
205206
err = writer.WriteMessages(ctx,
206-
kafka.Message{Topic: streams[0], Key: []byte("avro-key"), Value: confluentBaseMsg},
207-
kafka.Message{Topic: streams[0], Key: []byte("avro-key"), Value: confluentFilterMsg},
207+
kafka.Message{Key: Avrokey, Value: confluentBaseMsg},
208+
kafka.Message{Key: Avrokey, Value: confluentFilterMsg},
208209
)
209210
require.NoError(t, err)
210211
case "evolve-schema":
@@ -216,8 +217,7 @@ func ExecuteQueryForAvro(ctx context.Context, t *testing.T, streams []string, op
216217
require.NoError(t, err)
217218
confluentMsg := encodeConfluentBinary(schemaID, binaryData)
218219
err = writer.WriteMessages(ctx, kafka.Message{
219-
Topic: streams[0],
220-
Key: []byte("avro-key"),
220+
Key: Avrokey,
221221
Value: confluentMsg,
222222
})
223223
require.NoError(t, err)

utils/testutils/test_utils.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ type TestConfig struct {
7575
HostTestDataPath string
7676
HostCatalogPath string
7777
HostTestCatalogPath string
78+
DataFormat string
7879
}
7980

8081
// history stores the RPS values and the last updated time for a given mode.
@@ -184,6 +185,7 @@ func GetTestConfig(driver string,dataFormat string) *TestConfig {
184185
return &TestConfig{
185186
Driver: driver,
186187
HostRootPath: rootPath,
188+
DataFormat: dataFormat,
187189
HostTestDataPath: fmt.Sprintf(hostTestDataPath, driver, ""),
188190
HostTestCatalogPath: fmt.Sprintf(hostTestDataPath, driver, "test_streams.json"),
189191
HostCatalogPath: fmt.Sprintf(hostTestDataPath, driver, "streams.json"),
@@ -813,6 +815,9 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) {
813815
t.Logf("Root Project directory: %s", cfg.TestConfig.HostRootPath)
814816
t.Logf("Test data directory: %s", cfg.TestConfig.HostTestDataPath)
815817
currentTestTable := fmt.Sprintf("%s_test_table_olake", cfg.TestConfig.Driver)
818+
if cfg.TestConfig.DataFormat != "" {
819+
currentTestTable = fmt.Sprintf("%s_%s_test_table_olake", cfg.TestConfig.Driver, cfg.TestConfig.DataFormat)
820+
}
816821

817822
t.Run("Discover", func(t *testing.T) {
818823
req := testcontainers.ContainerRequest{

0 commit comments

Comments
 (0)