1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one or more
3+ * contributor license agreements. See the NOTICE file distributed with
4+ * this work for additional information regarding copyright ownership.
5+ * The ASF licenses this file to You under the Apache License, Version 2.0
6+ * (the "License"); you may not use this file except in compliance with
7+ * the License. You may obtain a copy of the License at
8+ *
9+ * http://www.apache.org/licenses/LICENSE-2.0
10+ *
11+ * Unless required by applicable law or agreed to in writing, software
12+ * distributed under the License is distributed on an "AS IS" BASIS,
13+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ * See the License for the specific language governing permissions and
15+ * limitations under the License.
16+ */
17+ package org .apache .kafka .streams .integration ;
18+
19+ import org .apache .kafka .clients .consumer .ConsumerConfig ;
20+ import org .apache .kafka .common .header .Headers ;
21+ import org .apache .kafka .common .header .internals .RecordHeaders ;
22+ import org .apache .kafka .common .serialization .Serdes ;
23+ import org .apache .kafka .common .serialization .StringSerializer ;
24+ import org .apache .kafka .streams .KafkaStreams ;
25+ import org .apache .kafka .streams .KeyValue ;
26+ import org .apache .kafka .streams .StreamsBuilder ;
27+ import org .apache .kafka .streams .StreamsConfig ;
28+ import org .apache .kafka .streams .integration .utils .EmbeddedKafkaCluster ;
29+ import org .apache .kafka .streams .integration .utils .IntegrationTestUtils ;
30+ import org .apache .kafka .streams .kstream .Consumed ;
31+ import org .apache .kafka .streams .processor .api .Processor ;
32+ import org .apache .kafka .streams .processor .api .ProcessorContext ;
33+ import org .apache .kafka .streams .processor .api .Record ;
34+ import org .apache .kafka .streams .state .QueryableStoreTypes ;
35+ import org .apache .kafka .streams .state .ReadOnlyKeyValueStore ;
36+ import org .apache .kafka .streams .state .Stores ;
37+ import org .apache .kafka .streams .state .TimestampedKeyValueStore ;
38+ import org .apache .kafka .streams .state .TimestampedKeyValueStoreWithHeaders ;
39+ import org .apache .kafka .streams .state .ValueAndTimestamp ;
40+ import org .apache .kafka .streams .state .ValueTimestampHeaders ;
41+ import org .apache .kafka .test .TestUtils ;
42+
43+ import org .junit .jupiter .api .AfterAll ;
44+ import org .junit .jupiter .api .AfterEach ;
45+ import org .junit .jupiter .api .BeforeAll ;
46+ import org .junit .jupiter .api .BeforeEach ;
47+ import org .junit .jupiter .api .Tag ;
48+ import org .junit .jupiter .api .Test ;
49+ import org .junit .jupiter .api .TestInfo ;
50+
51+ import java .io .IOException ;
52+ import java .time .Duration ;
53+ import java .util .Properties ;
54+
55+ import static java .util .Collections .singletonList ;
56+ import static org .apache .kafka .streams .utils .TestUtils .safeUniqueTestName ;
57+
58+ @ Tag ("integration" )
59+ public class HeadersStoreUpgradeIntegrationTest {
60+ private static final String STORE_NAME = "store" ;
61+ private String inputStream ;
62+
63+ private KafkaStreams kafkaStreams ;
64+
65+ public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster (1 );
66+
67+ @ BeforeAll
68+ public static void startCluster () throws IOException {
69+ CLUSTER .start ();
70+ }
71+
72+ @ AfterAll
73+ public static void closeCluster () {
74+ CLUSTER .stop ();
75+ }
76+
77+ public String safeTestName ;
78+
79+ @ BeforeEach
80+ public void createTopics (final TestInfo testInfo ) throws Exception {
81+ safeTestName = safeUniqueTestName (testInfo );
82+ inputStream = "input-stream-" + safeTestName ;
83+ CLUSTER .createTopic (inputStream );
84+ }
85+
86+ private Properties props () {
87+ final Properties streamsConfiguration = new Properties ();
88+ streamsConfiguration .put (StreamsConfig .APPLICATION_ID_CONFIG , "app-" + safeTestName );
89+ streamsConfiguration .put (StreamsConfig .BOOTSTRAP_SERVERS_CONFIG , CLUSTER .bootstrapServers ());
90+ streamsConfiguration .put (StreamsConfig .STATESTORE_CACHE_MAX_BYTES_CONFIG , 0 );
91+ streamsConfiguration .put (StreamsConfig .STATE_DIR_CONFIG , TestUtils .tempDirectory ().getPath ());
92+ streamsConfiguration .put (StreamsConfig .DEFAULT_KEY_SERDE_CLASS_CONFIG , Serdes .String ().getClass ());
93+ streamsConfiguration .put (StreamsConfig .DEFAULT_VALUE_SERDE_CLASS_CONFIG , Serdes .String ().getClass ());
94+ streamsConfiguration .put (StreamsConfig .COMMIT_INTERVAL_MS_CONFIG , 1000L );
95+ streamsConfiguration .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
96+ return streamsConfiguration ;
97+ }
98+
99+ @ AfterEach
100+ public void shutdown () {
101+ if (kafkaStreams != null ) {
102+ kafkaStreams .close (Duration .ofSeconds (30L ));
103+ kafkaStreams .cleanUp ();
104+ }
105+ }
106+
107+ @ Test
108+ public void shouldMigrateInMemoryTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi () throws Exception {
109+ shouldMigrateTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi (false );
110+ }
111+
112+ @ Test
113+ public void shouldMigratePersistentTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi () throws Exception {
114+ shouldMigrateTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi (true );
115+ }
116+
117+ private void shouldMigrateTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi (final boolean persistentStore ) throws Exception {
118+ final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder ();
119+
120+ streamsBuilderForOldStore .addStateStore (
121+ Stores .timestampedKeyValueStoreBuilder (
122+ persistentStore ? Stores .persistentTimestampedKeyValueStore (STORE_NAME ) : Stores .inMemoryKeyValueStore (STORE_NAME ),
123+ Serdes .String (),
124+ Serdes .String ()))
125+ .stream (inputStream , Consumed .with (Serdes .String (), Serdes .String ()))
126+ .process (TimestampedKeyValueProcessor ::new , STORE_NAME );
127+
128+ final Properties props = props ();
129+ kafkaStreams = new KafkaStreams (streamsBuilderForOldStore .build (), props );
130+ kafkaStreams .start ();
131+
132+ processKeyValueAndVerifyTimestampedValue ("key1" , "value1" , 11L );
133+ processKeyValueAndVerifyTimestampedValue ("key2" , "value2" , 22L );
134+ processKeyValueAndVerifyTimestampedValue ("key3" , "value3" , 33L );
135+
136+ kafkaStreams .close ();
137+ kafkaStreams = null ;
138+
139+ final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder ();
140+
141+ streamsBuilderForNewStore .addStateStore (
142+ Stores .timestampedKeyValueStoreBuilderWithHeaders (
143+ persistentStore ? Stores .persistentTimestampedKeyValueStoreWithHeaders (STORE_NAME ) : Stores .inMemoryKeyValueStore (STORE_NAME ),
144+ Serdes .String (),
145+ Serdes .String ()))
146+ .stream (inputStream , Consumed .with (Serdes .String (), Serdes .String ()))
147+ .process (TimestampedKeyValueWithHeadersProcessor ::new , STORE_NAME );
148+
149+ kafkaStreams = new KafkaStreams (streamsBuilderForNewStore .build (), props );
150+ kafkaStreams .start ();
151+
152+ // Verify legacy data can be read with empty headers
153+ verifyLegacyValuesWithEmptyHeaders ("key1" , "value1" , 11L );
154+ verifyLegacyValuesWithEmptyHeaders ("key2" , "value2" , 22L );
155+ verifyLegacyValuesWithEmptyHeaders ("key3" , "value3" , 33L );
156+
157+ // Process new records with headers
158+ final Headers headers = new RecordHeaders ();
159+ headers .add ("source" , "test" .getBytes ());
160+
161+ processKeyValueWithTimestampAndHeadersAndVerify ("key3" , "value3" , 333L , headers , headers );
162+ processKeyValueWithTimestampAndHeadersAndVerify ("key4new" , "value4" , 444L , headers , headers );
163+
164+ kafkaStreams .close ();
165+ }
166+
167+ @ Test
168+ public void shouldProxyTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi () throws Exception {
169+ final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder ();
170+
171+ streamsBuilderForOldStore .addStateStore (
172+ Stores .timestampedKeyValueStoreBuilder (
173+ Stores .persistentTimestampedKeyValueStore (STORE_NAME ),
174+ Serdes .String (),
175+ Serdes .String ()))
176+ .stream (inputStream , Consumed .with (Serdes .String (), Serdes .String ()))
177+ .process (TimestampedKeyValueProcessor ::new , STORE_NAME );
178+
179+ final Properties props = props ();
180+ kafkaStreams = new KafkaStreams (streamsBuilderForOldStore .build (), props );
181+ kafkaStreams .start ();
182+
183+ processKeyValueAndVerifyTimestampedValue ("key1" , "value1" , 11L );
184+ processKeyValueAndVerifyTimestampedValue ("key2" , "value2" , 22L );
185+ processKeyValueAndVerifyTimestampedValue ("key3" , "value3" , 33L );
186+
187+ kafkaStreams .close ();
188+ kafkaStreams = null ;
189+
190+
191+
192+ final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder ();
193+
194+ streamsBuilderForNewStore .addStateStore (
195+ Stores .timestampedKeyValueStoreBuilderWithHeaders (
196+ Stores .persistentTimestampedKeyValueStore (STORE_NAME ),
197+ Serdes .String (),
198+ Serdes .String ()))
199+ .stream (inputStream , Consumed .with (Serdes .String (), Serdes .String ()))
200+ .process (TimestampedKeyValueWithHeadersProcessor ::new , STORE_NAME );
201+
202+ kafkaStreams = new KafkaStreams (streamsBuilderForNewStore .build (), props );
203+ kafkaStreams .start ();
204+
205+ // Verify legacy data can be read with empty headers
206+ verifyLegacyValuesWithEmptyHeaders ("key1" , "value1" , 11L );
207+ verifyLegacyValuesWithEmptyHeaders ("key2" , "value2" , 22L );
208+ verifyLegacyValuesWithEmptyHeaders ("key3" , "value3" , 33L );
209+
210+ // Process new records with headers
211+ final RecordHeaders headers = new RecordHeaders ();
212+ headers .add ("source" , "proxy-test" .getBytes ());
213+ final Headers expectedHeaders = new RecordHeaders ();
214+
215+ processKeyValueWithTimestampAndHeadersAndVerify ("key3" , "value3" , 333L , headers , expectedHeaders );
216+ processKeyValueWithTimestampAndHeadersAndVerify ("key4new" , "value4" , 444L , headers , expectedHeaders );
217+
218+ kafkaStreams .close ();
219+ }
220+
221+ private <K , V > void processKeyValueAndVerifyTimestampedValue (final K key ,
222+ final V value ,
223+ final long timestamp )
224+ throws Exception {
225+
226+ IntegrationTestUtils .produceKeyValuesSynchronouslyWithTimestamp (
227+ inputStream ,
228+ singletonList (KeyValue .pair (key , value )),
229+ TestUtils .producerConfig (CLUSTER .bootstrapServers (),
230+ StringSerializer .class ,
231+ StringSerializer .class ),
232+ timestamp ,
233+ false );
234+
235+ TestUtils .waitForCondition (
236+ () -> {
237+ try {
238+ final ReadOnlyKeyValueStore <K , ValueAndTimestamp <V >> store =
239+ IntegrationTestUtils .getStore (STORE_NAME , kafkaStreams , QueryableStoreTypes .timestampedKeyValueStore ());
240+
241+ if (store == null ) {
242+ return false ;
243+ }
244+
245+ final ValueAndTimestamp <V > result = store .get (key );
246+ return result != null && result .value ().equals (value ) && result .timestamp () == timestamp ;
247+ } catch (final Exception swallow ) {
248+ swallow .printStackTrace ();
249+ System .err .println (swallow .getMessage ());
250+ return false ;
251+ }
252+ },
253+ 60_000L ,
254+ "Could not get expected result in time." );
255+ }
256+
257+ private <K , V > void processKeyValueWithTimestampAndHeadersAndVerify (final K key ,
258+ final V value ,
259+ final long timestamp ,
260+ final Headers headers ,
261+ final Headers expectedHeaders )
262+ throws Exception {
263+
264+ IntegrationTestUtils .produceKeyValuesSynchronouslyWithTimestamp (
265+ inputStream ,
266+ singletonList (KeyValue .pair (key , value )),
267+ TestUtils .producerConfig (CLUSTER .bootstrapServers (),
268+ StringSerializer .class ,
269+ StringSerializer .class ),
270+ headers ,
271+ timestamp ,
272+ false );
273+
274+ TestUtils .waitForCondition (
275+ () -> {
276+ try {
277+ final ReadOnlyKeyValueStore <K , ValueTimestampHeaders <V >> store = IntegrationTestUtils
278+ .getStore (STORE_NAME , kafkaStreams , QueryableStoreTypes .keyValueStore ());
279+
280+ if (store == null )
281+ return false ;
282+
283+ final ValueTimestampHeaders <V > result = store .get (key );
284+ return result != null
285+ && result .value ().equals (value )
286+ && result .timestamp () == timestamp
287+ && result .headers ().equals (expectedHeaders );
288+ } catch (final Exception swallow ) {
289+ swallow .printStackTrace ();
290+ System .err .println (swallow .getMessage ());
291+ return false ;
292+ }
293+ },
294+ 60_000L ,
295+ "Could not get expected result in time." );
296+ }
297+
298+ private <K , V > void verifyLegacyValuesWithEmptyHeaders (final K key ,
299+ final V value ,
300+ final long timestamp ) throws Exception {
301+ TestUtils .waitForCondition (
302+ () -> {
303+ try {
304+ final ReadOnlyKeyValueStore <K , ValueTimestampHeaders <V >> store = IntegrationTestUtils
305+ .getStore (STORE_NAME , kafkaStreams , QueryableStoreTypes .keyValueStore ());
306+
307+ if (store == null )
308+ return false ;
309+
310+ final ValueTimestampHeaders <V > result = store .get (key );
311+ return result != null
312+ && result .value ().equals (value )
313+ && result .timestamp () == timestamp
314+ && result .headers ().toArray ().length == 0 ;
315+ } catch (final Exception swallow ) {
316+ swallow .printStackTrace ();
317+ System .err .println (swallow .getMessage ());
318+ return false ;
319+ }
320+ },
321+ 60_000L ,
322+ "Could not get expected result in time." );
323+ }
324+
325+ private static class TimestampedKeyValueProcessor implements Processor <String , String , Void , Void > {
326+ private TimestampedKeyValueStore <String , String > store ;
327+
328+ @ Override
329+ public void init (final ProcessorContext <Void , Void > context ) {
330+ store = context .getStateStore (STORE_NAME );
331+ }
332+
333+ @ Override
334+ public void process (final Record <String , String > record ) {
335+ store .put (record .key (), ValueAndTimestamp .make (record .value (), record .timestamp ()));
336+ }
337+ }
338+
339+ private static class TimestampedKeyValueWithHeadersProcessor implements Processor <String , String , Void , Void > {
340+ private TimestampedKeyValueStoreWithHeaders <String , String > store ;
341+
342+ @ Override
343+ public void init (final ProcessorContext <Void , Void > context ) {
344+ store = context .getStateStore (STORE_NAME );
345+ }
346+
347+ @ Override
348+ public void process (final Record <String , String > record ) {
349+ store .put (record .key (), ValueTimestampHeaders .make (record .value (), record .timestamp (), record .headers ()));
350+ }
351+ }
352+ }
0 commit comments