A Flink Connector (Source ONLY, Data Source API) with periodic events emitting.
- Testing with periodic emitting events
- When a time driven event source is needed
-
Create the Source
new PeriodicSource<>(SourceSupplier, initialDelayMillis, discoverPeriodMillis);
-
SourceSuppliershould return aCollectionofSourceSupplierOutputEvery
SourceSupplierOutputconsists of anid(for pretty logging when assigned),valueandPeriodicConfignew SourceSupplierOutput<>(String id, OUT value, PeriodicConfig config);
NOTE: DO NOT try to schedule tons of events, event flow should be redesigned properly in this case
PeriodicConfig:new PeriodicConfig(boolean useWallTime, long initialDelay, long period, TimeUnit unit);
- Wall time and fixed rate scheduling are supported,
useWallTime=trueindicates wall time scheduling. initialDelayrepresents the trigger delay in the period in wall time mode. With 10s period, 1s means at xx:01, xx:11, xx:21...periodrepresents the schedule interval in wall time mode, 10s means at xx:00, xx:10, xx:20...unitis the unit of initialDelay and period
- Wall time and fixed rate scheduling are supported,
-
SourceSupplieris invoked everydiscoverPeriodMillisms, with an initial delay ofinitialDelayMillisms after all subtasks are online.SourceSupplierruns on jobmanager.discoverPeriodMillisshould not be too small. Too frequent assigning may interrupt the scheduling.
-
Events returned by
SourceSupplierwill be assigned to subtasks in a round-robin fashion. -
Then every subtask emits the events based on the given
PeriodicConfig. -
If any Exception is thrown from
SourceSupplier, subtasks continue to emit events with previously successfully assigned events.
Check here for more
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(new PeriodicSource<>(
() -> {
List<SourceSupplierOutput<String>> a = new ArrayList<>();
a.add(new SourceSupplierOutput<>("1", "1s AAA",
new PeriodicConfig(true, 0, 1, TimeUnit.SECONDS)));
a.add(new SourceSupplierOutput<>("2", "5s BBB",
new PeriodicConfig(true, 0, 5, TimeUnit.SECONDS)));
a.add(new SourceSupplierOutput<>("3", "15s CCC",
new PeriodicConfig(true, 0, 15, TimeUnit.SECONDS)));
return a;
}, 0, 60000),
WatermarkStrategy.noWatermarks(),
"Periodic-Source",
TypeInformation.of(String.class))
.map(v -> Instant.now() + ": " + v)
.print();
env.execute();