1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one or more
3+ * contributor license agreements. See the NOTICE file distributed with
4+ * this work for additional information regarding copyright ownership.
5+ * The ASF licenses this file to You under the Apache License, Version 2.0
6+ * (the "License"); you may not use this file except in compliance with
7+ * the License. You may obtain a copy of the License at
8+ *
9+ * http://www.apache.org/licenses/LICENSE-2.0
10+ *
11+ * Unless required by applicable law or agreed to in writing, software
12+ * distributed under the License is distributed on an "AS IS" BASIS,
13+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ * See the License for the specific language governing permissions and
15+ * limitations under the License.
16+ */
17+ package org .apache .kafka .streams .state .internals ;
18+
19+ import org .apache .kafka .common .header .Headers ;
20+ import org .apache .kafka .common .header .internals .RecordHeaders ;
21+ import org .apache .kafka .common .serialization .Serde ;
22+ import org .apache .kafka .common .utils .Bytes ;
23+ import org .apache .kafka .common .utils .Time ;
24+ import org .apache .kafka .streams .errors .ProcessorStateException ;
25+ import org .apache .kafka .streams .processor .internals .SerdeGetter ;
26+ import org .apache .kafka .streams .query .Position ;
27+ import org .apache .kafka .streams .query .PositionBound ;
28+ import org .apache .kafka .streams .query .Query ;
29+ import org .apache .kafka .streams .query .QueryConfig ;
30+ import org .apache .kafka .streams .query .QueryResult ;
31+ import org .apache .kafka .streams .state .KeyValueStore ;
32+ import org .apache .kafka .streams .state .TimestampedKeyValueStoreWithHeaders ;
33+ import org .apache .kafka .streams .state .ValueTimestampHeaders ;
34+
35+ import java .util .Objects ;
36+
37+ import static org .apache .kafka .streams .processor .internals .metrics .StreamsMetricsImpl .maybeMeasureLatency ;
38+
39+
40+ /**
41+ * A Metered {@link TimestampedKeyValueStoreWithHeaders} wrapper that is used for recording operation metrics, and hence
42+ * its inner KeyValueStore implementation does not need to provide its own metrics collecting functionality.
43+ *
44+ * The inner {@link KeyValueStore} of this class is of type <Bytes, byte[]>,
45+ * hence we use {@link Serde}s to convert from <K, ValueTimestampHeaders<V>> to <Bytes, byte[]>.
46+ *
47+ * @param <K> key type
48+ * @param <V> value type (wrapped in {@link ValueTimestampHeaders})
49+ */
50+ public class MeteredTimestampedKeyValueStoreWithHeaders <K , V >
51+ extends MeteredKeyValueStore <K , ValueTimestampHeaders <V >>
52+ implements TimestampedKeyValueStoreWithHeaders <K , V > {
53+
54+ MeteredTimestampedKeyValueStoreWithHeaders (final KeyValueStore <Bytes , byte []> inner ,
55+ final String metricScope ,
56+ final Time time ,
57+ final Serde <K > keySerde ,
58+ final Serde <ValueTimestampHeaders <V >> valueSerde ) {
59+ super (inner , metricScope , time , keySerde , valueSerde );
60+ }
61+
62+ @ SuppressWarnings ("unchecked" )
63+ @ Override
64+ protected Serde <ValueTimestampHeaders <V >> prepareValueSerdeForStore (final Serde <ValueTimestampHeaders <V >> valueSerde ,
65+ final SerdeGetter getter ) {
66+ if (valueSerde == null ) {
67+ return new ValueTimestampHeadersSerde <>((Serde <V >) getter .valueSerde ());
68+ } else {
69+ return super .prepareValueSerdeForStore (valueSerde , getter );
70+ }
71+ }
72+
73+ @ Override
74+ public void put (final K key ,
75+ final ValueTimestampHeaders <V > value ) {
76+ Objects .requireNonNull (key , "key cannot be null" );
77+ try {
78+ final Headers headers = value != null ? value .headers () : new RecordHeaders ();
79+ maybeMeasureLatency (() -> wrapped ().put (keyBytes (key , headers ), serdes .rawValue (value , headers )), time , putSensor );
80+ maybeRecordE2ELatency ();
81+ } catch (final ProcessorStateException e ) {
82+ final String message = String .format (e .getMessage (), key , value );
83+ throw new ProcessorStateException (message , e );
84+ }
85+ }
86+
87+ @ Override
88+ public ValueTimestampHeaders <V > putIfAbsent (final K key ,
89+ final ValueTimestampHeaders <V > value ) {
90+ Objects .requireNonNull (key , "key cannot be null" );
91+ final Headers headers = value != null ? value .headers () : new RecordHeaders ();
92+ final ValueTimestampHeaders <V > currentValue = maybeMeasureLatency (
93+ () -> outerValue (wrapped ().putIfAbsent (keyBytes (key , headers ), serdes .rawValue (value , headers ))),
94+ time ,
95+ putIfAbsentSensor
96+ );
97+ maybeRecordE2ELatency ();
98+ return currentValue ;
99+ }
100+
101+ @ Override
102+ public <R > QueryResult <R > query (final Query <R > query ,
103+ final PositionBound positionBound ,
104+ final QueryConfig config ) {
105+ throw new UnsupportedOperationException ("Querying is not supported for " + getClass ().getSimpleName ());
106+ }
107+
108+ @ Override
109+ public Position getPosition () {
110+ throw new UnsupportedOperationException ("Position is not supported for " + getClass ().getSimpleName ());
111+ }
112+
113+ protected Bytes keyBytes (final K key , final Headers headers ) {
114+ return Bytes .wrap (serdes .rawKey (key , headers ));
115+ }
116+
117+ }
0 commit comments