-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathStreamApplication.java
More file actions
120 lines (104 loc) · 4.99 KB
/
StreamApplication.java
File metadata and controls
120 lines (104 loc) · 4.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import java.util.Properties;
public class StreamApplication {
private static String updateValue(String newValue, String oldValue) {
String[] newList = newValue.split(",");
StringBuilder sb = new StringBuilder();
sb.append(newValue);
sb.append(":");
if (Long.valueOf(newList[0]) >= Long.valueOf(newList[1])) {
if (oldValue != null) {
String[] oldList = oldValue.split(":")[0].split(",");
if (Long.valueOf(oldList[0]) < Long.valueOf(oldList[1])) {
sb.append("OK");
}
}
} else {
sb.append(newValue);
}
return sb.toString();
}
public static void main(String[] args) throws Exception {
String bootstrapServers = args[0];
String studentTopic = args[1];
String classroomTopic = args[2];
String outputTopic = args[3];
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "StreamApplication-yannick");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> studentStream = builder.stream(studentTopic);
KStream<String, String> classroomStream = builder.stream(classroomTopic);
KTable<String, String> studentTable = studentStream
.filter((key, value) -> {
String[] tokens = value.split(",", -1);
return tokens.length == 2 && tokens[0].length() > 0 && tokens[1].length() > 0;
})
.map((key, value) -> {
String[] tokens = value.split(",", -1);
return KeyValue.pair(tokens[0], tokens[1]);
})
.groupByKey()
.aggregate(
() -> null,
(key, newValue, aggValue) -> newValue
);
KTable<String, Long> classroomOccupancy = studentTable
.groupBy((key, value) -> KeyValue.pair(value, 1L), Serialized.with(Serdes.String(), Serdes.Long()))
.aggregate(
() -> 0L,
(key, newValue, aggValue) -> aggValue + newValue,
(key, oldValue, aggValue) -> aggValue - oldValue,
Materialized.with(Serdes.String(), Serdes.Long())
);
KTable<String, Long> classroomCapacity = classroomStream
.filter((key, value) -> {
String[] tokens = value.split(",", -1);
if (tokens.length != 2 || tokens[0].length() == 0 || tokens[1].length() == 0) return false;
try {
long capacity = Long.valueOf(tokens[1]);
if (capacity < 0) return false;
} catch (Exception e) {
return false;
}
return true;
})
.map((key, value) -> {
String[] tokens = value.split(",");
return KeyValue.pair(tokens[0], Long.valueOf(tokens[1]));
})
.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
.aggregate(
() -> 0L,
(key, newValue, aggValue) -> newValue,
Materialized.with(Serdes.String(), Serdes.Long())
);
KTable<String, String> output = classroomCapacity.join(classroomOccupancy,
(capacity, occupancy) -> capacity + "," + occupancy)
.toStream().groupByKey()
.aggregate(
() -> null,
(key, newValue, oldValue) -> updateValue(newValue, oldValue)
);
output.toStream()
.filter((key, value) -> value.split(":").length == 2)
.map((key, value) -> KeyValue.pair(null, key + "," + value.split(":")[1]))
.to(outputTopic);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}