1212import io .kestra .plugin .debezium .AbstractDebeziumTask ;
1313
1414import java .sql .SQLException ;
15+ import java .util .Arrays ;
1516import java .util .concurrent .Callable ;
1617
1718final class PostgresDebeziumTestHelper {
@@ -40,14 +41,14 @@ static void dropReplicationArtifacts(Callable<java.sql.Connection> connectionSup
4041 }
4142
4243 static void cleanupTaskState (RunContext runContext , AbstractDebeziumTask task ) throws Exception {
43- KVStore kvStore = runContext .namespaceKv (runContext .flowInfo ().namespace ());
44- String taskRunValue = runContext .storage ().getTaskStorageContext ()
44+ var kvStore = runContext .namespaceKv (runContext .flowInfo ().namespace ());
45+ var taskRunValue = runContext .storage ().getTaskStorageContext ()
4546 .map (StorageContext .Task ::getTaskRunValue )
4647 .orElse (null );
47- String stateName = runContext .render (task .getStateName ()).as (String .class ).orElse ("debezium-state" );
48+ var stateName = runContext .render (task .getStateName ()).as (String .class ).orElse ("debezium-state" );
4849
49- kvStore . delete ( computeKvStoreKey (runContext , stateName , "offsets.dat" , taskRunValue ));
50- kvStore . delete ( computeKvStoreKey (runContext , stateName , "dbhistory.dat" , taskRunValue ));
50+ deleteAllVersions ( kvStore , computeKvStoreKey (runContext , stateName , "offsets.dat" , taskRunValue ));
51+ deleteAllVersions ( kvStore , computeKvStoreKey (runContext , stateName , "dbhistory.dat" , taskRunValue ));
5152 }
5253
5354 static void cleanupFlowState (KVStoreService kvStoreService , String namespace , String flowId , String ... stateNames ) throws Exception {
@@ -58,14 +59,15 @@ static void cleanupFlowState(KVStoreService kvStoreService, String namespace, St
5859 return ;
5960 }
6061
61- String flowPrefix = Slugify .of (flowId ) + "_states_" ;
62+ var flowPrefix = Slugify .of (flowId ) + "_states_" ;
63+ var keysToDelete = kvStore .listAll ().stream ()
64+ .map (KVEntry ::key )
65+ .distinct ()
66+ .filter (key -> Arrays .stream (stateNames ).anyMatch (stateName -> key .startsWith (flowPrefix + stateName )))
67+ .toList ();
6268
63- for (KVEntry kvEntry : kvStore .listAll ()) {
64- for (String stateName : stateNames ) {
65- if (kvEntry .key ().startsWith (flowPrefix + stateName )) {
66- kvStore .delete (kvEntry .key ());
67- }
68- }
69+ for (var key : keysToDelete ) {
70+ deleteAllVersions (kvStore , key );
6971 }
7072 }
7173
@@ -84,4 +86,10 @@ private static String computeKvStoreKey(RunContext runContext, String stateName,
8486
8587 return prefix + separator + filename ;
8688 }
89+
90+ private static void deleteAllVersions (KVStore kvStore , String key ) throws Exception {
91+ while (kvStore .delete (key )) {
92+ // Delete all versions to prevent fallback to stale previous entries.
93+ }
94+ }
8795}
0 commit comments