Skip to content

Commit 51bddda

Browse files
Merge pull request #1245 from ballerina-platform/cdc-islive
Open Introduce support for liveness check for MSSQL CDC listener
2 parents 39126c9 + 02e9f4e commit 51bddda

File tree

7 files changed

+220
-25
lines changed

7 files changed

+220
-25
lines changed

ballerina/Ballerina.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
org = "ballerinax"
33
name = "postgresql"
4-
version = "1.16.2"
4+
version = "1.16.3"
55
authors = ["Ballerina"]
66
keywords = ["client", "network", "SQL", "RDBMS", "Vendor/PostgreSQL", "Area/Database", "Type/Connector"]
77
repository = "https://github.com/ballerina-platform/module-ballerinax-postgresql"
@@ -15,8 +15,8 @@ graalvmCompatible = true
1515
[[platform.java21.dependency]]
1616
groupId = "io.ballerina.stdlib"
1717
artifactId = "postgresql-native"
18-
version = "1.16.2"
19-
path = "../native/build/libs/postgresql-native-1.16.2.jar"
18+
version = "1.16.3"
19+
path = "../native/build/libs/postgresql-native-1.16.3-SNAPSHOT.jar"
2020

2121
[[platform.java21.dependency]]
2222
groupId = "io.ballerina.stdlib"

ballerina/CompilerPlugin.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ id = "postgresql-compiler-plugin"
33
class = "io.ballerina.stdlib.postgresql.compiler.PostgreSQLCompilerPlugin"
44

55
[[dependency]]
6-
path = "../compiler-plugin/build/libs/postgresql-compiler-plugin-1.16.2.jar"
6+
path = "../compiler-plugin/build/libs/postgresql-compiler-plugin-1.16.3-SNAPSHOT.jar"

ballerina/Dependencies.toml

Lines changed: 84 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,18 @@
77
dependencies-toml-version = "2"
88
distribution-version = "2201.12.0"
99

10+
[[package]]
11+
org = "ballerina"
12+
name = "avro"
13+
version = "1.2.0"
14+
dependencies = [
15+
{org = "ballerina", name = "jballerina.java"}
16+
]
17+
1018
[[package]]
1119
org = "ballerina"
1220
name = "crypto"
13-
version = "2.9.0"
21+
version = "2.9.3"
1422
dependencies = [
1523
{org = "ballerina", name = "jballerina.java"},
1624
{org = "ballerina", name = "time"}
@@ -22,7 +30,7 @@ modules = [
2230
[[package]]
2331
org = "ballerina"
2432
name = "data.jsondata"
25-
version = "1.1.0"
33+
version = "1.1.3"
2634
dependencies = [
2735
{org = "ballerina", name = "jballerina.java"},
2836
{org = "ballerina", name = "lang.object"}
@@ -67,7 +75,6 @@ modules = [
6775
org = "ballerina"
6876
name = "lang.__internal"
6977
version = "0.0.0"
70-
scope = "testOnly"
7178
dependencies = [
7279
{org = "ballerina", name = "jballerina.java"},
7380
{org = "ballerina", name = "lang.object"}
@@ -92,6 +99,16 @@ dependencies = [
9299
{org = "ballerina", name = "jballerina.java"}
93100
]
94101

102+
[[package]]
103+
org = "ballerina"
104+
name = "lang.int"
105+
version = "0.0.0"
106+
dependencies = [
107+
{org = "ballerina", name = "jballerina.java"},
108+
{org = "ballerina", name = "lang.__internal"},
109+
{org = "ballerina", name = "lang.object"}
110+
]
111+
95112
[[package]]
96113
org = "ballerina"
97114
name = "lang.object"
@@ -139,18 +156,29 @@ dependencies = [
139156
{org = "ballerina", name = "jballerina.java"}
140157
]
141158

159+
[[package]]
160+
org = "ballerina"
161+
name = "log"
162+
version = "2.12.0"
163+
dependencies = [
164+
{org = "ballerina", name = "io"},
165+
{org = "ballerina", name = "jballerina.java"},
166+
{org = "ballerina", name = "lang.value"},
167+
{org = "ballerina", name = "observe"}
168+
]
169+
142170
[[package]]
143171
org = "ballerina"
144172
name = "observe"
145-
version = "1.5.0"
173+
version = "1.5.1"
146174
dependencies = [
147175
{org = "ballerina", name = "jballerina.java"}
148176
]
149177

150178
[[package]]
151179
org = "ballerina"
152180
name = "os"
153-
version = "1.10.0"
181+
version = "1.10.1"
154182
scope = "testOnly"
155183
dependencies = [
156184
{org = "ballerina", name = "io"},
@@ -188,14 +216,25 @@ modules = [
188216
[[package]]
189217
org = "ballerina"
190218
name = "time"
191-
version = "2.7.0"
219+
version = "2.8.0"
192220
dependencies = [
193221
{org = "ballerina", name = "jballerina.java"}
194222
]
195223
modules = [
196224
{org = "ballerina", packageName = "time", moduleName = "time"}
197225
]
198226

227+
[[package]]
228+
org = "ballerina"
229+
name = "uuid"
230+
version = "1.10.0"
231+
dependencies = [
232+
{org = "ballerina", name = "crypto"},
233+
{org = "ballerina", name = "jballerina.java"},
234+
{org = "ballerina", name = "lang.int"},
235+
{org = "ballerina", name = "time"}
236+
]
237+
199238
[[package]]
200239
org = "ballerinai"
201240
name = "observe"
@@ -208,21 +247,57 @@ dependencies = [
208247
[[package]]
209248
org = "ballerinax"
210249
name = "cdc"
211-
version = "1.0.3"
250+
version = "1.2.0"
212251
dependencies = [
213252
{org = "ballerina", name = "crypto"},
214253
{org = "ballerina", name = "data.jsondata"},
254+
{org = "ballerina", name = "io"},
215255
{org = "ballerina", name = "jballerina.java"},
216-
{org = "ballerinai", name = "observe"}
256+
{org = "ballerina", name = "log"},
257+
{org = "ballerinai", name = "observe"},
258+
{org = "ballerinax", name = "kafka"}
217259
]
218260
modules = [
219261
{org = "ballerinax", packageName = "cdc", moduleName = "cdc"}
220262
]
221263

264+
[[package]]
265+
org = "ballerinax"
266+
name = "confluent.cavroserdes"
267+
version = "1.0.2"
268+
dependencies = [
269+
{org = "ballerina", name = "avro"},
270+
{org = "ballerina", name = "jballerina.java"},
271+
{org = "ballerinai", name = "observe"},
272+
{org = "ballerinax", name = "confluent.cregistry"}
273+
]
274+
275+
[[package]]
276+
org = "ballerinax"
277+
name = "confluent.cregistry"
278+
version = "0.4.3"
279+
dependencies = [
280+
{org = "ballerina", name = "jballerina.java"}
281+
]
282+
283+
[[package]]
284+
org = "ballerinax"
285+
name = "kafka"
286+
version = "4.6.3"
287+
dependencies = [
288+
{org = "ballerina", name = "crypto"},
289+
{org = "ballerina", name = "jballerina.java"},
290+
{org = "ballerina", name = "time"},
291+
{org = "ballerina", name = "uuid"},
292+
{org = "ballerinai", name = "observe"},
293+
{org = "ballerinax", name = "confluent.cavroserdes"},
294+
{org = "ballerinax", name = "confluent.cregistry"}
295+
]
296+
222297
[[package]]
223298
org = "ballerinax"
224299
name = "postgresql"
225-
version = "1.16.2"
300+
version = "1.16.3"
226301
dependencies = [
227302
{org = "ballerina", name = "crypto"},
228303
{org = "ballerina", name = "file"},

ballerina/cdc_listener.bal

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,22 @@ import ballerinax/cdc;
1919
public isolated class CdcListener {
2020
*cdc:Listener;
2121

22-
private final map<string> & readonly config;
22+
private final map<anydata> & readonly config;
2323
private boolean isStarted = false;
2424
private boolean hasAttachedService = false;
2525

2626
# Initializes the Postgresql listener with the given configuration.
2727
#
2828
# + config - The configuration for the Postgresql connector
2929
public isolated function init(*PostgresListenerConfiguration config) {
30-
map<string> configMap = {};
30+
map<string> debeziumConfigs = {};
3131
cdc:populateDebeziumProperties({
3232
engineName: config.engineName,
3333
offsetStorage: config.offsetStorage,
3434
internalSchemaStorage: config.internalSchemaStorage,
3535
options: config.options
36-
}, configMap);
36+
}, debeziumConfigs
37+
);
3738
cdc:populateDatabaseConfigurations({
3839
connectorClass: config.database.connectorClass,
3940
hostname: config.database.hostname,
@@ -47,9 +48,13 @@ public isolated class CdcListener {
4748
excludedTables: config.database.excludedTables,
4849
includedColumns: config.database.includedColumns,
4950
excludedColumns: config.database.excludedColumns
50-
}, configMap);
51-
populatePostgresConfigurations(config.database, configMap);
52-
self.config = configMap.cloneReadOnly();
51+
}, debeziumConfigs);
52+
populatePostgresConfigurations(config.database, debeziumConfigs);
53+
map<anydata> listenerConfigs = {
54+
...debeziumConfigs
55+
};
56+
listenerConfigs["livenessInterval"] = config.livenessInterval;
57+
self.config = listenerConfigs.cloneReadOnly();
5358
}
5459

5560
# Attaches a CDC service to the Postgresql listener.
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright (c) 2026, WSO2 LLC. (https://www.wso2.com).
2+
//
3+
// WSO2 LLC. licenses this file to you under the Apache License,
4+
// Version 2.0 (the "License"); you may not use this file except
5+
// in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing,
11+
// software distributed under the License is distributed on an
12+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
// KIND, either express or implied. See the License for the
14+
// specific language governing permissions and limitations
15+
// under the License.
16+
17+
import ballerina/lang.runtime;
18+
import ballerina/test;
19+
import ballerinax/cdc;
20+
21+
@test:Config {
22+
groups: ["liveness"]
23+
}
24+
function testLivenessBeforeListenerStart() returns error? {
25+
CdcListener postgresqlListener = new ({
26+
database: {
27+
username: cdcUsername,
28+
password: cdcPassword,
29+
port: cdcPort,
30+
databaseName: cdcDatabase
31+
},
32+
options: {
33+
snapshotMode: cdc:NO_DATA
34+
}
35+
});
36+
check postgresqlListener.attach(testService);
37+
boolean liveness = check cdc:isLive(postgresqlListener);
38+
test:assertFalse(liveness, "Liveness check passes even before listener starts");
39+
}
40+
41+
@test:Config {
42+
groups: ["liveness"]
43+
}
44+
function testLivenessWithStartedListener() returns error? {
45+
CdcListener postgresqlListener = new ({
46+
database: {
47+
username: cdcUsername,
48+
password: cdcPassword,
49+
port: cdcPort,
50+
databaseName: cdcDatabase
51+
},
52+
options: {
53+
snapshotMode: cdc:NO_DATA
54+
}
55+
});
56+
check postgresqlListener.attach(testService);
57+
check postgresqlListener.'start();
58+
boolean liveness = check cdc:isLive(postgresqlListener);
59+
test:assertTrue(liveness, "Liveness fails for a started listener");
60+
check postgresqlListener.gracefulStop();
61+
}
62+
63+
@test:Config {
64+
groups: ["liveness"]
65+
}
66+
function testLivenessAfterListenerStop() returns error? {
67+
CdcListener postgresqlListener = new ({
68+
database: {
69+
username: cdcUsername,
70+
password: cdcPassword,
71+
port: cdcPort,
72+
databaseName: cdcDatabase
73+
},
74+
options: {
75+
snapshotMode: cdc:NO_DATA
76+
}
77+
});
78+
check postgresqlListener.attach(testService);
79+
check postgresqlListener.'start();
80+
check postgresqlListener.gracefulStop();
81+
boolean liveness = check cdc:isLive(postgresqlListener);
82+
test:assertFalse(liveness, "Liveness check passes after the listener has stopped");
83+
}
84+
85+
@test:Config {
86+
groups: ["liveness"]
87+
}
88+
function testLivenessWithoutReceivingEvents() returns error? {
89+
CdcListener postgresqlListener = new ({
90+
database: {
91+
username: cdcUsername,
92+
password: cdcPassword,
93+
port: cdcPort,
94+
databaseName: cdcDatabase
95+
},
96+
options: {
97+
snapshotMode: cdc:NO_DATA
98+
},
99+
livenessInterval: 5.0
100+
});
101+
check postgresqlListener.attach(testService);
102+
check postgresqlListener.'start();
103+
runtime:sleep(10);
104+
boolean liveness = check cdc:isLive(postgresqlListener);
105+
test:assertFalse(liveness, "Liveness check passes even after not receiving events within the liveness interval");
106+
check postgresqlListener.gracefulStop();
107+
}

build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,13 @@ subprojects {
9999
ballerinaStdLibs "io.ballerina.stdlib:observe-ballerina:${observeVersion}"
100100
ballerinaStdLibs "io.ballerina:observe-ballerina:${observeInternalVersion}"
101101
ballerinaStdLibs "io.ballerina.stdlib:postgresql.driver-ballerina:${stdlibPostgresqlDriverVersion}"
102+
ballerinaStdLibs "io.ballerina.lib:avro-ballerina:${stdlibAvroVersion}"
102103

103104
ballerinaStdLibs "io.ballerina.lib:cdc-ballerina:${stdlibCdcVersion}"
104105
ballerinaStdLibs "io.ballerina.lib:postgresql.cdc.driver-ballerina:${stdlibPostgresCdcDriverVersion}"
106+
ballerinaStdLibs "io.ballerina.stdlib:kafka-ballerina:${stdlibKafkaVersion}"
107+
ballerinaStdLibs "io.ballerina.lib:confluent.cavroserdes-ballerina:${stdlibConfluentAvroSerDesVersion}"
108+
ballerinaStdLibs "io.ballerina.lib:confluent.cregistry-ballerina:${stdlibConfluentSchemaRegistryVersion}"
105109
}
106110
}
107111

0 commit comments

Comments
 (0)