@@ -269,6 +269,34 @@ private <K, V> void processKeyValueAndVerifyTimestampedValue(final K key,
269269 "Could not get expected result in time." );
270270 }
271271
272+ private <K , V > void verifyLegacyTimestampedValue (final K key ,
273+ final V value ,
274+ final long timestamp )
275+ throws Exception {
276+
277+ TestUtils .waitForCondition (
278+ () -> {
279+ try {
280+ final ReadOnlyKeyValueStore <K , ValueAndTimestamp <V >> store =
281+ IntegrationTestUtils .getStore (STORE_NAME , kafkaStreams , QueryableStoreTypes .timestampedKeyValueStore ());
282+
283+ if (store == null ) {
284+ return false ;
285+ }
286+
287+ final ValueAndTimestamp <V > result = store .get (key );
288+ return result != null && result .value ().equals (value ) && result .timestamp () == timestamp ;
289+ } catch (final Exception swallow ) {
290+ swallow .printStackTrace ();
291+ System .err .println (swallow .getMessage ());
292+ return false ;
293+ }
294+ },
295+ 60_000L ,
296+ "Could not get expected result in time." );
297+ }
298+
299+
272300 private <K , V > void processKeyValueWithTimestampAndHeadersAndVerify (final K key ,
273301 final V value ,
274302 final long timestamp ,
@@ -653,6 +681,82 @@ public void process(final Record<String, String> record) {
653681 }
654682 }
655683
684+ @ Test
685+ public void shouldFailDowngradeFromTimestampedKeyValueStoreWithHeadersToTimestampedKeyValueStore () throws Exception {
686+ final Properties props = props ();
687+ setupAndPopulateKeyValueStoreWithHeaders (props );
688+ kafkaStreams = null ;
689+
690+ // Attempt to downgrade to non-headers key-value store
691+ final StreamsBuilder downgradedBuilder = new StreamsBuilder ();
692+ downgradedBuilder .addStateStore (
693+ Stores .timestampedKeyValueStoreBuilder (
694+ Stores .persistentTimestampedKeyValueStore (STORE_NAME ),
695+ Serdes .String (),
696+ Serdes .String ()))
697+ .stream (inputStream , Consumed .with (Serdes .String (), Serdes .String ()))
698+ .process (TimestampedKeyValueProcessor ::new , STORE_NAME );
699+
700+ kafkaStreams = new KafkaStreams (downgradedBuilder .build (), props );
701+
702+ boolean exceptionThrown = false ;
703+ try {
704+ kafkaStreams .start ();
705+ } catch (final Exception e ) {
706+ Throwable cause = e ;
707+ while (cause != null ) {
708+ if (cause instanceof ProcessorStateException &&
709+ cause .getMessage () != null &&
710+ cause .getMessage ().contains ("headers-aware" ) &&
711+ cause .getMessage ().contains ("Downgrade" )) {
712+ exceptionThrown = true ;
713+ break ;
714+ }
715+ cause = cause .getCause ();
716+ }
717+
718+ if (!exceptionThrown ) {
719+ throw new AssertionError ("Expected ProcessorStateException about downgrade not being supported, but got: " + e .getMessage (), e );
720+ }
721+ } finally {
722+ kafkaStreams .close (Duration .ofSeconds (30L ));
723+ }
724+
725+ if (!exceptionThrown ) {
726+ throw new AssertionError ("Expected ProcessorStateException to be thrown when attempting to downgrade from headers-aware to non-headers key-value store" );
727+ }
728+ }
729+
730+ @ Test
731+ public void shouldSuccessfullyDowngradeFromTimestampedKeyValueStoreWithHeadersToTimestampedKeyValueStoreAfterCleanup () throws Exception {
732+ final Properties props = props ();
733+ setupAndPopulateKeyValueStoreWithHeaders (props );
734+
735+ kafkaStreams .cleanUp (); // Delete local state
736+ kafkaStreams = null ;
737+
738+ final StreamsBuilder downgradedBuilder = new StreamsBuilder ();
739+ downgradedBuilder .addStateStore (
740+ Stores .timestampedKeyValueStoreBuilder (
741+ Stores .persistentTimestampedKeyValueStore (STORE_NAME ),
742+ Serdes .String (),
743+ Serdes .String ()))
744+ .stream (inputStream , Consumed .with (Serdes .String (), Serdes .String ()))
745+ .process (TimestampedKeyValueProcessor ::new , STORE_NAME );
746+
747+ kafkaStreams = new KafkaStreams (downgradedBuilder .build (), props );
748+ kafkaStreams .start ();
749+
750+ // verify legacy key, values
751+ verifyLegacyTimestampedValue ("key1" , "value1" , 11L );
752+ verifyLegacyTimestampedValue ("key2" , "value2" , 22L );
753+
754+ processKeyValueAndVerifyTimestampedValue ("key3" , "value3" , 333L );
755+ processKeyValueAndVerifyTimestampedValue ("key4" , "value4" , 444L );
756+
757+ kafkaStreams .close ();
758+ }
759+
656760 @ Test
657761 public void shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore () throws Exception {
658762 final Properties props = props ();
@@ -809,6 +913,28 @@ private long setupWindowStoreWithHeaders(final Properties props) {
809913 return CLUSTER .time .milliseconds ();
810914 }
811915
916+ private void setupAndPopulateKeyValueStoreWithHeaders (final Properties props ) throws Exception {
917+ final StreamsBuilder headersBuilder = new StreamsBuilder ();
918+ headersBuilder .addStateStore (
919+ Stores .timestampedKeyValueStoreBuilderWithHeaders (
920+ Stores .persistentTimestampedKeyValueStoreWithHeaders (STORE_NAME ),
921+ Serdes .String (),
922+ Serdes .String ()))
923+ .stream (inputStream , Consumed .with (Serdes .String (), Serdes .String ()))
924+ .process (TimestampedKeyValueWithHeadersProcessor ::new , STORE_NAME );
925+
926+ kafkaStreams = new KafkaStreams (headersBuilder .build (), props );
927+ kafkaStreams .start ();
928+
929+ final Headers headers = new RecordHeaders ();
930+ headers .add ("source" , "test" .getBytes ());
931+
932+ processKeyValueWithTimestampAndHeadersAndVerify ("key1" , "value1" , 11L , headers , headers );
933+ processKeyValueWithTimestampAndHeadersAndVerify ("key2" , "value2" , 22L , headers , headers );
934+
935+ kafkaStreams .close ();
936+ }
937+
812938 private void produceRecordWithHeaders (final String key , final String value , final long timestamp ) throws Exception {
813939 final Headers headers = new RecordHeaders ();
814940 headers .add ("source" , "test" .getBytes ());
0 commit comments