Skip to content

Commit fa9c380

Browse files
committed
[FLINK-32733] Add AlgoOperator for FPGrowth
1 parent ba327b0 commit fa9c380

12 files changed

Lines changed: 2703 additions & 1 deletion

File tree

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
---
2+
title: "Swing"
3+
type: docs
4+
aliases:
5+
6+
- /operators/recommendation/swing.html
7+
8+
---
9+
10+
<!--
11+
Licensed to the Apache Software Foundation (ASF) under one
12+
or more contributor license agreements. See the NOTICE file
13+
distributed with this work for additional information
14+
regarding copyright ownership. The ASF licenses this file
15+
to you under the Apache License, Version 2.0 (the
16+
"License"); you may not use this file except in compliance
17+
with the License. You may obtain a copy of the License at
18+
19+
http://www.apache.org/licenses/LICENSE-2.0
20+
21+
Unless required by applicable law or agreed to in writing,
22+
software distributed under the License is distributed on an
23+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
24+
KIND, either express or implied. See the License for the
25+
specific language governing permissions and limitations
26+
under the License.
27+
-->
28+
29+
## FPGrowth
30+
31+
An AlgoOperator which implements the FPGrowth algorithm.
32+
33+
FPGrowth is an algorithm for frequent pattern mining. FP growth algorithm represents the database in the form of a
34+
tree called a frequent pattern tree or FP tree.
35+
36+
Ignore NULL values and empty sequence in the feature column during <i>transform()</i>.
37+
38+
Use distinct elements from a sequence to mine frequent pattern.
39+
40+
See
41+
<a href="http://dx.doi.org/10.1145/335191.335372">
42+
Han et al., Mining frequent patterns without candidate generation</a>,
43+
<a href="https://doi.org/10.1145/1454008.1454027">
44+
Li et al., PFP Parallel FP-growth for query recommendation</a> and
45+
<a href="https://dl.acm.org/doi/abs/10.1145/1133905.1133907">
46+
Borgelt C. An Implementation of the FP-growth Algorithm</a> for more information.
47+
48+
### Input Columns
49+
50+
| Param name | Type | Default | Description |
51+
|:-----------|:-------|:----------|:-------------------------------------------|
52+
| itemsCol | String | `"items"` | Items sequence. (e.g. "item1,item2,item3") |
53+
54+
### Structure of Output Table
55+
56+
#### Frequent Pattern Table
57+
58+
| Name | Type | Description |
59+
|:--------------|:----------------|:---------------------------------------------------------|
60+
| items | String | Frequent pattern. |
61+
| support_count | Long | Number of occurrences of the frequent pattern. |
62+
| item_count | Long | Number of elements in the frequent pattern. |
63+
64+
#### Association Rule Table
65+
66+
| Name | Type | Description |
67+
|:----------|:-------|:-----------------------------------------------|
68+
| rule | String | Association rule. (e.g. "item1,item2=>item3") |
69+
| item_count | Double | Number of elements in the association rule. |
70+
| lift | Double | Lift. |
71+
| support_percent | Double | Support (frequency of the association rule). |
72+
| confidence_percent | Double | Confidence. |
73+
| transaction_count | Long | Number of occurrences of the association rule. |
74+
75+
### Parameters
76+
77+
Below are the parameters required by `FPGrowth`.
78+
79+
| Key | Default | Type | Required | Description |
80+
|:-----------------------|:----------|:--------|:---------|:--------------------------------------------------------------------------------------|
81+
| itemsCol | `"items"` | String | no | Item sequence column name. |
82+
| fieldDelimiter | `","` | String | no | Field delimiter of item sequence. |
83+
| minLift | `1.0` | Double | no | Minimal lift level for association rules. |
84+
| minConfidence | `0.6` | Double | no | Minimal confidence level for association rules. |
85+
| minSupport | `0.02` | Double | no | Minimal support percent, |
86+
| minSupportCount | `-1` | Double | no | Minimal support count. MIN_ITEM_COUNT has no effect when less than or equal to 0 |
87+
| maxPatternLength | `10` | Integer | no | Max frequent pattern length. |
88+
89+
### Examples
90+
91+
{{< tabs examples >}}
92+
93+
{{< tab "Java">}}
94+
95+
```java
96+
package org.apache.flink.ml.examples.recommendation;
97+
98+
import org.apache.flink.ml.recommendation.fpgrowth.FPGrowth;
99+
import org.apache.flink.streaming.api.datastream.DataStream;
100+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
101+
import org.apache.flink.table.api.Table;
102+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
103+
import org.apache.flink.types.Row;
104+
import org.apache.flink.util.CloseableIterator;
105+
106+
/**
107+
* Simple program that creates a Swing instance and uses it to generate recommendations for items.
108+
*/
109+
public class FPGrowthExample {
110+
public static void main(String[] args) {
111+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
112+
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
113+
114+
// Generates input data.
115+
DataStream<Row> inputStream =
116+
env.fromElements(
117+
Row.of(""),
118+
Row.of("A,B,C,D"),
119+
Row.of("B,C,E"),
120+
Row.of("A,B,C,E"),
121+
Row.of("B,D,E"),
122+
Row.of("A,B,C,D,A"));
123+
124+
Table inputTable = tEnv.fromDataStream(inputStream).as("items");
125+
126+
// Creates a FPGrowth object and initializes its parameters.
127+
FPGrowth fpg = new FPGrowth().setMinSupportCount(3);
128+
129+
// Transforms the data.
130+
Table[] outputTable = fpg.transform(inputTable);
131+
132+
// Extracts and displays the frequent patterns.
133+
for (CloseableIterator<Row> it = outputTable[0].execute().collect(); it.hasNext(); ) {
134+
Row row = it.next();
135+
136+
String pattern = row.getFieldAs(0);
137+
Long support = row.getFieldAs(1);
138+
Long itemCount = row.getFieldAs(2);
139+
140+
System.out.printf("pattern: %d, support count: %d, item_count:%d\n",pattern, support, itemCount);
141+
}
142+
143+
// Extracts and displays the association rules.
144+
for (CloseableIterator<Row> it = outputTable[1].execute().collect(); it.hasNext(); ) {
145+
Row row = it.next();
146+
147+
String rule = row.getFieldAs(0);
148+
Double lift = row.getFieldAs(2);
149+
Double support = row.getFieldAs(3);
150+
Double confidence_percent = row.getFieldAs(4);
151+
152+
System.out.printf("rule: %d, list: %f, support:%f, confidence:%f\n",rule, lift, support, confidence_percent);
153+
}
154+
}
155+
}
156+
157+
158+
```
159+
160+
{{< /tab>}}
161+
162+
{{< tab "Python">}}
163+
164+
```python
165+
166+
# Simple program that creates a FPGrowth instance and gives recommendations for items.
167+
168+
from pyflink.common import Types
169+
from pyflink.datastream import StreamExecutionEnvironment
170+
from pyflink.table import StreamTableEnvironment
171+
172+
from pyflink.ml.recommendation.fpgrowth import FPGrowth
173+
174+
# Creates a new StreamExecutionEnvironment.
175+
env = StreamExecutionEnvironment.get_execution_environment()
176+
177+
# Creates a StreamTableEnvironment.
178+
t_env = StreamTableEnvironment.create(env)
179+
180+
# Generates input data.
181+
input_table = t_env.from_data_stream(
182+
env.from_collection([
183+
("A,B,C,D",),
184+
("B,C,E",),
185+
("A,B,C,E",),
186+
("B,D,E",),
187+
("A,B,C,D",)
188+
],
189+
type_info=Types.ROW_NAMED(
190+
['items'],
191+
[Types.STRING()])
192+
))
193+
194+
# Creates a fpgrowth object and initialize its parameters.
195+
fpg = FPGrowth().set_min_support(0.6)
196+
197+
# Transforms the data to fpgrowth algorithm result.
198+
output_table = fpg.transform(input_table)
199+
200+
# Extracts and display the results.
201+
pattern_result_names = output_table[0].get_schema().get_field_names()
202+
rule_result_names = output_table[1].get_schema().get_field_names()
203+
204+
patterns = t_env.to_data_stream(output_table[0]).execute_and_collect()
205+
rules = t_env.to_data_stream(output_table[1]).execute_and_collect()
206+
207+
print("|\t"+"\t|\t".join(pattern_result_names)+"\t|")
208+
for result in patterns:
209+
print(f'|\t{result[0]}\t|\t{result[1]}\t|\t{result[2]}\t|')
210+
print("|\t"+" | ".join(rule_result_names)+"\t|")
211+
for result in rules:
212+
print(f'|\t{result[0]}\t|\t{result[1]}\t|\t{result[2]}\t|\t{result[3]}'
213+
+ f'\t|\t{result[4]}\t|\t{result[5]}\t|')
214+
215+
```
216+
217+
{{< /tab>}}
218+
219+
{{< /tabs>}}

flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,59 @@ public static <IN, ACC, OUT> DataStream<OUT> aggregate(
244244
return aggregate(input, func, accType, outType);
245245
}
246246

247+
/**
248+
* Applies a {@link AggregateFunction} on a bounded keyed data stream. The output stream
249+
* contains one stream record for each key.
250+
*
251+
* @param input The input keyed data stream.
252+
* @param func The user defined aggredate function.
253+
* @param accType The type information of intermediate data.
254+
* @param outType The type information of the output.
255+
* @return The result data stream.
256+
* @param <K> The key type of input.
257+
* @param <IN> The class type of input.
258+
* @param <ACC> The type of intermediate data.
259+
* @param <OUT> The class type of output.
260+
*/
261+
public static <K, IN, ACC, OUT> DataStream<OUT> keyedAggregate(
262+
KeyedStream<IN, K> input,
263+
AggregateFunction<IN, ACC, OUT> func,
264+
TypeInformation<ACC> accType,
265+
TypeInformation<OUT> outType) {
266+
return input.transform(
267+
"Keyed GroupReduce",
268+
outType,
269+
new KeyedAggregateOperator<>(
270+
func, accType.createSerializer(input.getExecutionConfig())))
271+
.setParallelism(input.getParallelism());
272+
}
273+
274+
/**
275+
* Applies a {@link AggregateFunction} on a bounded keyed data stream. The output stream
276+
* contains one stream record for each key.
277+
*
278+
* @param input The input keyed data stream.
279+
* @param func The user defined aggredate function.
280+
* @param accTypeSerializer The type serializer of intermediate data.
281+
* @param outType The type information of the output.
282+
* @return The result data stream.
283+
* @param <K> The key type of input.
284+
* @param <IN> The class type of input.
285+
* @param <ACC> The type of intermediate data.
286+
* @param <OUT> The class type of output.
287+
*/
288+
public static <K, IN, ACC, OUT> DataStream<OUT> keyedAggregate(
289+
KeyedStream<IN, K> input,
290+
AggregateFunction<IN, ACC, OUT> func,
291+
TypeSerializer<ACC> accTypeSerializer,
292+
TypeInformation<OUT> outType) {
293+
return input.transform(
294+
"Keyed GroupReduce",
295+
outType,
296+
new KeyedAggregateOperator<>(func, accTypeSerializer))
297+
.setParallelism(input.getParallelism());
298+
}
299+
247300
/**
248301
* Aggregates the elements in each partition of the input bounded stream, and then merges the
249302
* partial results of all partitions. The output stream contains the aggregated result and its
@@ -562,6 +615,64 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
562615
}
563616
}
564617

618+
private static class KeyedAggregateOperator<IN, K, ACC, OUT>
619+
extends AbstractUdfStreamOperator<OUT, AggregateFunction<IN, ACC, OUT>>
620+
implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
621+
622+
AggregateFunction aggregator;
623+
624+
private static final String STATE_NAME = "_op_state";
625+
626+
private transient ValueState<ACC> values;
627+
628+
private final TypeSerializer<ACC> serializer;
629+
630+
private InternalTimerService<VoidNamespace> timerService;
631+
632+
public KeyedAggregateOperator(
633+
AggregateFunction<IN, ACC, OUT> aggregator, TypeSerializer<ACC> serializer) {
634+
super(aggregator);
635+
this.serializer = serializer;
636+
}
637+
638+
@Override
639+
public void open() throws Exception {
640+
super.open();
641+
ValueStateDescriptor<ACC> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer);
642+
values = getPartitionedState(stateId);
643+
timerService =
644+
getInternalTimerService("end-key-timers", new VoidNamespaceSerializer(), this);
645+
}
646+
647+
@Override
648+
public void processElement(StreamRecord<IN> element) throws Exception {
649+
IN value = element.getValue();
650+
ACC currentValue = values.value();
651+
652+
if (currentValue == null) {
653+
// Registers a timer for emitting the result at the end when this is the
654+
// first input for this key.
655+
timerService.registerEventTimeTimer(VoidNamespace.INSTANCE, Long.MAX_VALUE);
656+
currentValue = userFunction.createAccumulator();
657+
}
658+
659+
currentValue = userFunction.add(value, currentValue);
660+
values.update(currentValue);
661+
}
662+
663+
@Override
664+
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
665+
ACC currentValue = values.value();
666+
if (currentValue != null) {
667+
output.collect(
668+
new StreamRecord<>(userFunction.getResult(currentValue), Long.MAX_VALUE));
669+
}
670+
}
671+
672+
@Override
673+
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {}
674+
}
675+
565676
/**
566677
* A stream operator to apply {@link ReduceFunction} on the input bounded keyed data stream.
567678
*

0 commit comments

Comments
 (0)