1212import com .linkedin .upgrade .DataHubUpgradeState ;
1313import io .datahubproject .metadata .context .OperationContext ;
1414import java .util .ArrayList ;
15+ import java .util .Collection ;
16+ import java .util .Collections ;
1517import java .util .HashMap ;
1618import java .util .List ;
1719import java .util .Map ;
1820import java .util .Set ;
1921import java .util .function .Function ;
2022import lombok .extern .slf4j .Slf4j ;
2123import org .apache .kafka .clients .admin .AdminClient ;
24+ import org .apache .kafka .clients .admin .AlterConfigOp ;
25+ import org .apache .kafka .clients .admin .Config ;
26+ import org .apache .kafka .clients .admin .ConfigEntry ;
2227import org .apache .kafka .clients .admin .CreatePartitionsResult ;
2328import org .apache .kafka .clients .admin .CreateTopicsResult ;
2429import org .apache .kafka .clients .admin .DescribeTopicsResult ;
2530import org .apache .kafka .clients .admin .ListTopicsResult ;
2631import org .apache .kafka .clients .admin .NewPartitions ;
2732import org .apache .kafka .clients .admin .NewTopic ;
2833import org .apache .kafka .clients .admin .TopicDescription ;
34+ import org .apache .kafka .common .config .ConfigResource ;
2935import org .springframework .boot .kafka .autoconfigure .KafkaProperties ;
3036import org .springframework .kafka .config .TopicBuilder ;
3137
@@ -109,6 +115,8 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
109115 // Collect topics to create and partitions to increase
110116 List <NewTopic > topicsToCreate = new ArrayList <>();
111117 Map <String , NewPartitions > partitionsToIncrease = new HashMap <>();
118+ Map <ConfigResource , Map <String , String >> declaredByTopic = new HashMap <>();
119+ boolean reconcileConfigs = kafkaConfiguration .getSetup ().isReconcileExistingTopicConfigs ();
112120 List <String > failedTopics = new ArrayList <>();
113121
114122 // Batch fetch partition counts for existing topics
@@ -166,6 +174,19 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
166174 "Checking kafka topic {}: Skipping partition count check (auto-increase disabled)" ,
167175 topicName );
168176 }
177+ // Reconcile configProperties on existing topic when explicitly enabled.
178+ // Declared properties are collected here and aligned against the broker
179+ // below via incrementalAlterConfigs (additive SET — only declared keys
180+ // are touched). Closes the gap where broker auto-created topics keep
181+ // broker-default retention (notably DataHubUpgradeHistory_v1 which
182+ // needs retention.ms=-1).
183+ if (reconcileConfigs
184+ && topicConfig .getConfigProperties () != null
185+ && !topicConfig .getConfigProperties ().isEmpty ()) {
186+ declaredByTopic .put (
187+ new ConfigResource (ConfigResource .Type .TOPIC , topicName ),
188+ topicConfig .getConfigProperties ());
189+ }
169190 continue ;
170191 }
171192
@@ -209,7 +230,14 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
209230 log .info ("Successfully increased partitions for {} topics" , partitionsToIncrease .size ());
210231 }
211232
212- if (topicsToCreate .isEmpty () && partitionsToIncrease .isEmpty ()) {
233+ // Reconcile configProperties on existing topics if enabled
234+ if (!declaredByTopic .isEmpty ()) {
235+ reconcileTopicConfigs (adminClient , declaredByTopic );
236+ }
237+
238+ if (topicsToCreate .isEmpty ()
239+ && partitionsToIncrease .isEmpty ()
240+ && declaredByTopic .isEmpty ()) {
213241 log .info (
214242 "All configured topics already exist with correct configuration - no changes needed" );
215243 }
@@ -234,6 +262,95 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
234262 };
235263 }
236264
265+ /**
266+ * Reconcile declared topic configProperties against the broker, applying only the keys whose
267+ * current value differs from what is declared in {@code application.yaml}.
268+ */
269+ private static void reconcileTopicConfigs (
270+ AdminClient adminClient , Map <ConfigResource , Map <String , String >> declaredByTopic )
271+ throws Exception {
272+ log .info (
273+ "Reconciling configProperties on {} existing topics: {}" ,
274+ declaredByTopic .size (),
275+ declaredByTopic .keySet ().stream ().map (ConfigResource ::name ).sorted ().toList ());
276+
277+ Map <ConfigResource , Map <String , String >> currentByTopic =
278+ fetchCurrentTopicConfigs (adminClient , declaredByTopic .keySet ());
279+
280+ Map <ConfigResource , Collection <AlterConfigOp >> toApply = new HashMap <>();
281+ int totalAdded = 0 ;
282+ int totalChanged = 0 ;
283+ for (Map .Entry <ConfigResource , Map <String , String >> entry : declaredByTopic .entrySet ()) {
284+ ConfigResource topic = entry .getKey ();
285+ TopicConfigDiff diff =
286+ diffConfigs (entry .getValue (), currentByTopic .getOrDefault (topic , Map .of ()));
287+ if (diff .ops ().isEmpty ()) {
288+ log .info (
289+ "Topic {}: declared properties already match broker - nothing to alter" , topic .name ());
290+ } else {
291+ log .info ("Topic {}: added={} changed={}" , topic .name (), diff .added (), diff .changed ());
292+ toApply .put (topic , diff .ops ());
293+ totalAdded += diff .added ().size ();
294+ totalChanged += diff .changed ().size ();
295+ }
296+ }
297+
298+ if (toApply .isEmpty ()) {
299+ log .info (
300+ "All declared properties already aligned with broker - no AlterConfigs request issued" );
301+ return ;
302+ }
303+ adminClient .incrementalAlterConfigs (toApply ).all ().get ();
304+ log .info (
305+ "Successfully applied configProperty changes on {} topics ({} added, {} changed)" ,
306+ toApply .size (),
307+ totalAdded ,
308+ totalChanged );
309+ }
310+
311+ /**
312+ * Read current broker-side topic configs for the given resources, as a flat key->value map per
313+ * topic.
314+ */
315+ private static Map <ConfigResource , Map <String , String >> fetchCurrentTopicConfigs (
316+ AdminClient adminClient , Collection <ConfigResource > resources ) throws Exception {
317+ Map <ConfigResource , Config > raw = adminClient .describeConfigs (resources ).all ().get ();
318+ Map <ConfigResource , Map <String , String >> out = new HashMap <>(raw .size ());
319+ raw .forEach (
320+ (res , cfg ) -> {
321+ Map <String , String > entries = new HashMap <>();
322+ cfg .entries ().forEach (ce -> entries .put (ce .name (), ce .value ()));
323+ out .put (res , entries );
324+ });
325+ return out ;
326+ }
327+
328+ /** Compute the diff between declared (yaml) and current (broker) topic config maps. */
329+ private static TopicConfigDiff diffConfigs (
330+ Map <String , String > declared , Map <String , String > current ) {
331+ List <String > added = new ArrayList <>();
332+ List <String > changed = new ArrayList <>();
333+ List <AlterConfigOp > ops = new ArrayList <>();
334+ declared .forEach (
335+ (key , desired ) -> {
336+ String live = current .get (key );
337+ if (live == null ) {
338+ added .add ("+ " + key + "=" + desired );
339+ } else if (live .equals (desired )) {
340+ return ;
341+ } else {
342+ changed .add ("~ " + key + ": " + live + " -> " + desired );
343+ }
344+ ops .add (new AlterConfigOp (new ConfigEntry (key , desired ), AlterConfigOp .OpType .SET ));
345+ });
346+ Collections .sort (added );
347+ Collections .sort (changed );
348+ return new TopicConfigDiff (added , changed , ops );
349+ }
350+
351+ private record TopicConfigDiff (
352+ List <String > added , List <String > changed , List <AlterConfigOp > ops ) {}
353+
237354 /** Get the set of existing topic names from Kafka */
238355 private Set <String > getExistingTopics (AdminClient adminClient ) throws Exception {
239356 try {
0 commit comments