diff --git a/rxjava-top-tags-distributed/README.md b/rxjava-top-tags-distributed/README.md
new file mode 100644
index 0000000..19f1f5e
--- /dev/null
+++ b/rxjava-top-tags-distributed/README.md
@@ -0,0 +1,47 @@
+Distributed trending tags calculator
+====================================
+
+This example demonstrates how to create a distributed trending tags calculator as a multi-module project.
+
+The example consists of:
+
+- counter modules that count occurences of a tag over a sliding time window, distributing the count workload;
+- intermediate modules that create partial tag rankings, emitting them at regular intervals;
+- a final module that aggregates the partial rankings into a single final ranking, emitted at regular intervals.
+
+## Requirements
+
+In order to install the module run it in your Spring XD installation, you will need to have installed:
+
+* Spring XD version 1.1.x ([Instructions](http://docs.spring.io/spring-xd/docs/current/reference/html/#getting-started)). You'll need to build Spring XD with Java 8+ to use this sample (which uses lambda expressions).
+
+## Building
+
+mvn clean package
+
+## Installing
+
+Once Spring XD is started, run the following commands from the shell.
+
+````
+module upload --file [path-to]/spring-xd-samples/rxjava-top-tags-distributed/rxjava-top-tags-counter/target/rxjava-top-tags-counter-1.0.0-SNAPSHOT.jar --type processor --name top-tags-counter
+
+module upload --file [path-to]/spring-xd-samples/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/target/rxjava-top-tags-intermediate-ranker-1.0.0-SNAPSHOT.jar --type processor --name top-tags-intermediate-ranker
+
+module upload --file [path-to]/spring-xd-samples/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/target/rxjava-top-tags-final-ranker-1.0.0-SNAPSHOT.jar --type processor --name top-tags-final-ranker
+
+````
+
+Create the stream
+
+````
+stream create twittertags --definition "twitterstream | transform --expression=#jsonPath(payload,'$.entities.hashtags[*].text') | splitter | top-tags-counter | top-tags-intermediate-ranker | top-tags-final-ranker | log" --deploy
+````
+
+Deploy the stream
+
+````
+stream deploy twittertags
+````
+
+
diff --git a/rxjava-top-tags-distributed/pom.xml b/rxjava-top-tags-distributed/pom.xml
new file mode 100644
index 0000000..63524dd
--- /dev/null
+++ b/rxjava-top-tags-distributed/pom.xml
@@ -0,0 +1,98 @@
+
+
+ 4.0.0
+
+ com.acme
+ rxjava-top-tags-distributed-parent
+ 1.0.0-SNAPSHOT
+
+ pom
+
+ org.springframework.xd
+ spring-xd-module-parent
+ 1.1.0.RELEASE
+
+
+
+ 1.1.0.RELEASE
+ 1.0.0
+
+
+
+ rxjava-top-tags-counter
+ rxjava-top-tags-intermediate-ranker
+ rxjava-top-tags-common
+ rxjava-top-tags-final-ranker
+
+
+
+
+
+ spring-milestones
+ Spring Milestones
+ http://repo.spring.io/libs-milestone
+
+ false
+
+
+
+ spring-releases
+ Spring Releases
+ http://repo.spring.io/release
+
+ false
+
+
+
+ spring-snapshots
+ Spring Snapshots
+ http://repo.spring.io/libs-snapshot
+
+ true
+
+
+
+
+
+
+
+ org.springframework.xd
+ spring-xd-rxjava
+ ${spring.xd.rxjava.version}
+
+
+ io.reactivex
+ rxjava
+ ${rxjava.version}
+
+
+ io.reactivex
+ rxjava-math
+ ${rxjava.version}
+
+
+
+ com.acme
+ rxjava-top-tags-common
+ ${project.version}
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.2
+
+ 1.8
+ 1.8
+
+
+
+
+
+
\ No newline at end of file
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-common/pom.xml b/rxjava-top-tags-distributed/rxjava-top-tags-common/pom.xml
new file mode 100644
index 0000000..e5b5ac5
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-common/pom.xml
@@ -0,0 +1,15 @@
+
+
+
+ rxjava-top-tags-distributed-parent
+ com.acme
+ 1.0.0-SNAPSHOT
+
+ 4.0.0
+
+ rxjava-top-tags-common
+
+
+
\ No newline at end of file
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-common/src/main/java/com/acme/toptags/common/Keys.java b/rxjava-top-tags-distributed/rxjava-top-tags-common/src/main/java/com/acme/toptags/common/Keys.java
new file mode 100644
index 0000000..d811b88
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-common/src/main/java/com/acme/toptags/common/Keys.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2015 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.acme.toptags.common;
+
+/**
+ * @author Marius Bogoevici
+ */
+public abstract class Keys {
+
+ public static final String TAG = "tag";
+
+ public static final String COUNT = "count";
+
+ public static final String RANKINGS = "rankings";
+
+ public static final String TOPTAGS = "topTags";
+}
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-common/src/main/java/com/acme/toptags/common/TupleComparatorByCount.java b/rxjava-top-tags-distributed/rxjava-top-tags-common/src/main/java/com/acme/toptags/common/TupleComparatorByCount.java
new file mode 100644
index 0000000..62b1657
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-common/src/main/java/com/acme/toptags/common/TupleComparatorByCount.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2015 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.acme.toptags.common;
+
+import java.util.Comparator;
+
+import org.springframework.xd.tuple.Tuple;
+
+/**
+ * @author Marius Bogoevici
+ */
+public class TupleComparatorByCount implements Comparator {
+
+ public static final TupleComparatorByCount INSTANCE = new TupleComparatorByCount();
+
+ @Override
+ public int compare(Tuple o1, Tuple o2) {
+ return - Integer.valueOf(o1.getInt(Keys.COUNT)).compareTo(o2.getInt(Keys.COUNT));
+ }
+
+}
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-counter/pom.xml b/rxjava-top-tags-distributed/rxjava-top-tags-counter/pom.xml
new file mode 100644
index 0000000..9dcb652
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-counter/pom.xml
@@ -0,0 +1,30 @@
+
+
+
+ com.acme
+ rxjava-top-tags-distributed-parent
+ 1.0.0-SNAPSHOT
+
+ 4.0.0
+
+ rxjava-top-tags-counter
+
+
+
+ org.springframework.xd
+ spring-xd-rxjava
+
+
+ io.reactivex
+ rxjava
+
+
+ com.acme
+ rxjava-top-tags-common
+
+
+
+
+
\ No newline at end of file
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-counter/src/main/java/com/acme/toptags/counter/RollingTagCounter.java b/rxjava-top-tags-distributed/rxjava-top-tags-counter/src/main/java/com/acme/toptags/counter/RollingTagCounter.java
new file mode 100644
index 0000000..494e984
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-counter/src/main/java/com/acme/toptags/counter/RollingTagCounter.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2015 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.acme.toptags.counter;
+
+import static com.acme.toptags.common.Keys.COUNT;
+import static com.acme.toptags.common.Keys.TAG;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import rx.Observable;
+
+import org.springframework.xd.rxjava.Processor;
+import org.springframework.xd.tuple.Tuple;
+import org.springframework.xd.tuple.TupleBuilder;
+
+/**
+ * @author Marius Bogoevici
+ */
+public class RollingTagCounter implements Processor {
+
+ private final int timeWindow;
+
+ private final int timeShift;
+
+ public RollingTagCounter(int timeWindow, int timeShift) {
+ this.timeWindow = timeWindow;
+ this.timeShift = timeShift;
+ }
+
+
+ @Override
+ public Observable process(Observable observable) {
+ return observable.window(timeWindow, timeShift, SECONDS)
+ .flatMap(
+ // group by word
+ w -> w.groupBy(e -> e)
+ .flatMap(s -> Observable.zip(Observable.just(s.getKey()), s.count(), (a, b) -> TupleBuilder.tuple().of(TAG, a, COUNT, b)
+ )
+ )
+ );
+ }
+}
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-counter/src/main/java/com/acme/toptags/counter/RollingTagCounterOptions.java b/rxjava-top-tags-distributed/rxjava-top-tags-counter/src/main/java/com/acme/toptags/counter/RollingTagCounterOptions.java
new file mode 100644
index 0000000..2b9cd7d
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-counter/src/main/java/com/acme/toptags/counter/RollingTagCounterOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2015 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.acme.toptags.counter;
+
+import org.springframework.xd.module.options.spi.ModuleOption;
+
+/**
+ * @author Marius Bogoevici
+ */
+public class RollingTagCounterOptions {
+
+ private int timeWindow = 9;
+
+ private int timeShift = 3;
+
+ public int getTimeWindow() {
+ return timeWindow;
+ }
+
+ @ModuleOption("The length in seconds of the time window over which tags are counted")
+ public void setTimeWindow(int timeWindow) {
+ this.timeWindow = timeWindow;
+ }
+
+ public int getTimeShift() {
+ return timeShift;
+ }
+
+ @ModuleOption("The frequency in seconds with which tag counts are emitted")
+ public void setTimeShift(int timeShift) {
+ this.timeShift = timeShift;
+ }
+}
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-counter/src/main/resources/config/spring-module.properties b/rxjava-top-tags-distributed/rxjava-top-tags-counter/src/main/resources/config/spring-module.properties
new file mode 100644
index 0000000..873527c
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-counter/src/main/resources/config/spring-module.properties
@@ -0,0 +1 @@
+options_class = com.acme.toptags.counter.RollingTagCounterOptions
\ No newline at end of file
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-counter/src/main/resources/config/toptags-counter.xml b/rxjava-top-tags-distributed/rxjava-top-tags-counter/src/main/resources/config/toptags-counter.xml
new file mode 100644
index 0000000..55215c1
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-counter/src/main/resources/config/toptags-counter.xml
@@ -0,0 +1,29 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/pom.xml b/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/pom.xml
new file mode 100644
index 0000000..881a541
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/pom.xml
@@ -0,0 +1,32 @@
+
+
+
+ rxjava-top-tags-distributed-parent
+ com.acme
+ 1.0.0-SNAPSHOT
+
+ 4.0.0
+
+ rxjava-top-tags-final-ranker
+
+
+
+ org.springframework.xd
+ spring-xd-rxjava
+
+
+ io.reactivex
+ rxjava
+
+
+ io.reactivex
+ rxjava-math
+
+
+ com.acme
+ rxjava-top-tags-common
+
+
+
\ No newline at end of file
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/src/main/java/com/acme/toptags/finalranker/FinalRanker.java b/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/src/main/java/com/acme/toptags/finalranker/FinalRanker.java
new file mode 100644
index 0000000..f52c735
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/src/main/java/com/acme/toptags/finalranker/FinalRanker.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2015 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.acme.toptags.finalranker;
+
+import static com.acme.toptags.common.Keys.COUNT;
+import static com.acme.toptags.common.Keys.RANKINGS;
+import static com.acme.toptags.common.Keys.TAG;
+import static com.acme.toptags.common.Keys.TOPTAGS;
+import static org.springframework.xd.tuple.TupleBuilder.tuple;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.acme.toptags.common.TupleComparatorByCount;
+import rx.Observable;
+
+import org.springframework.xd.rxjava.Processor;
+import org.springframework.xd.tuple.Tuple;
+import org.springframework.xd.tuple.TupleBuilder;
+
+/**
+ * @author Marius Bogoevici
+ */
+public class FinalRanker implements Processor {
+
+ private int timeWindow;
+
+ private int timeShift;
+
+ private int topN;
+
+ public FinalRanker(int timeWindow, int timeShift, int topN) {
+ this.timeWindow = timeWindow;
+ this.timeShift = timeShift;
+ this.topN = topN;
+ }
+
+ @Override
+ public Observable process(Observable observable) {
+ return observable.window(timeWindow, timeShift, TimeUnit.SECONDS)
+ .flatMap(w ->
+ // first, we merge all the intermediate rankings in a single stream
+ w.flatMap(t -> Observable.from(t.getValue(RANKINGS, Tuple[].class)))
+ // sort them
+ .groupBy(e -> e.getString(TAG))
+ // just take the last count emitted (which represents the latest update)
+ .flatMap(s -> Observable.zip(
+ Observable.just(s.getKey()),
+ s.last().map(t -> t.getInt(COUNT)),
+ (a, b) -> TupleBuilder.tuple().of(TAG, a, COUNT, b))
+ )
+ .toSortedList(TupleComparatorByCount.INSTANCE::compare)
+ // take topN
+ .map(l -> l.subList(0, Math.min(topN, l.size())))
+ .map(l -> tuple().of(TOPTAGS, asMap(l))));
+ }
+
+ private static Map asMap(List tuples) {
+ Map returnValue = new LinkedHashMap<>();
+ for (Tuple tuple : tuples) {
+ returnValue.put(tuple.getString(TAG), tuple.getInt(COUNT));
+ }
+ return returnValue;
+ }
+}
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/src/main/java/com/acme/toptags/finalranker/FinalRankerOptions.java b/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/src/main/java/com/acme/toptags/finalranker/FinalRankerOptions.java
new file mode 100644
index 0000000..60b9b1d
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/src/main/java/com/acme/toptags/finalranker/FinalRankerOptions.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2015 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.acme.toptags.finalranker;
+
+import org.springframework.xd.module.options.spi.ModuleOption;
+
+/**
+ * @author Marius Bogoevici
+ */
+public class FinalRankerOptions {
+
+ private int timeWindow = 4;
+
+ private int timeShift = 2;
+
+ private int topN = 5;
+
+ public int getTimeWindow() {
+ return timeWindow;
+ }
+
+ @ModuleOption("The length in seconds of the time window over with which top N tags are calculated")
+ public void setTimeWindow(int timeWindow) {
+ this.timeWindow = timeWindow;
+ }
+
+ public int getTopN() {
+ return topN;
+ }
+
+ @ModuleOption("The number of ranked items")
+ public void setTopN(int topN) {
+ this.topN = topN;
+ }
+
+ public int getTimeShift() {
+ return timeShift;
+ }
+
+ @ModuleOption("The length in seconds of the time window over with which top N tags are emmitted")
+ public void setTimeShift(int timeShift) {
+ this.timeShift = timeShift;
+ }
+}
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/src/main/resources/config/spring-module.properties b/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/src/main/resources/config/spring-module.properties
new file mode 100644
index 0000000..ed2c0c2
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/src/main/resources/config/spring-module.properties
@@ -0,0 +1 @@
+options_class = com.acme.toptags.finalranker.FinalRankerOptions
\ No newline at end of file
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/src/main/resources/config/toptags-final-ranker.xml b/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/src/main/resources/config/toptags-final-ranker.xml
new file mode 100644
index 0000000..991c046
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/src/main/resources/config/toptags-final-ranker.xml
@@ -0,0 +1,30 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/pom.xml b/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/pom.xml
new file mode 100644
index 0000000..18bef9e
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/pom.xml
@@ -0,0 +1,32 @@
+
+
+
+ rxjava-top-tags-distributed-parent
+ com.acme
+ 1.0.0-SNAPSHOT
+
+ 4.0.0
+
+ rxjava-top-tags-intermediate-ranker
+
+
+
+ org.springframework.xd
+ spring-xd-rxjava
+
+
+ io.reactivex
+ rxjava
+
+
+ io.reactivex
+ rxjava-math
+
+
+ com.acme
+ rxjava-top-tags-common
+
+
+
\ No newline at end of file
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/src/main/java/com/acme/toptags/intermediate/IntermediateRanker.java b/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/src/main/java/com/acme/toptags/intermediate/IntermediateRanker.java
new file mode 100644
index 0000000..51c2d92
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/src/main/java/com/acme/toptags/intermediate/IntermediateRanker.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2015 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.acme.toptags.intermediate;
+
+import static com.acme.toptags.common.Keys.COUNT;
+import static com.acme.toptags.common.Keys.RANKINGS;
+import static com.acme.toptags.common.Keys.TAG;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static rx.math.operators.OperatorMinMax.max;
+
+import com.acme.toptags.common.Keys;
+import com.acme.toptags.common.TupleComparatorByCount;
+import rx.Observable;
+
+import org.springframework.xd.rxjava.Processor;
+import org.springframework.xd.tuple.Tuple;
+import org.springframework.xd.tuple.TupleBuilder;
+
+/**
+ * @author Marius Bogoevici
+ */
+public class IntermediateRanker implements Processor {
+
+ public static final TupleComparatorByCount COMPARATOR = new TupleComparatorByCount();
+
+ private final int timeWindow;
+
+ private final int timeShift;
+
+ private final int topN;
+
+ public IntermediateRanker(int timeWindow, int timeShift, int topN) {
+ this.timeWindow = timeWindow;
+ this.timeShift = timeShift;
+ this.topN = topN;
+ }
+
+ @Override
+ public Observable process(Observable observable) {
+
+ return observable
+ // collect data every timeWindow seconds
+ .window(timeWindow, timeShift, SECONDS)
+ .flatMap(w ->
+ // group data by word
+ w.groupBy(e -> e.getString(TAG))
+ // just take the last count emitted (which represents the latest update)
+ .flatMap(s -> Observable.zip(
+ Observable.just(s.getKey()),
+ s.last().map(t -> t.getInt(COUNT)),
+ (a, b) -> TupleBuilder.tuple().of(TAG, a, COUNT, b))
+ )
+ // rank words by count
+ .toSortedList(TupleComparatorByCount.INSTANCE::compare)
+ // merge word counts together
+ .map(l -> l.subList(0, Math.min(topN, l.size())))
+ .map(l -> TupleBuilder.tuple().of(RANKINGS, l.toArray(new Tuple[l.size()]))));
+ }
+
+}
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/src/main/java/com/acme/toptags/intermediate/IntermediateRankerOptions.java b/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/src/main/java/com/acme/toptags/intermediate/IntermediateRankerOptions.java
new file mode 100644
index 0000000..c1712f7
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/src/main/java/com/acme/toptags/intermediate/IntermediateRankerOptions.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2015 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.acme.toptags.intermediate;
+
+import org.springframework.xd.module.options.spi.ModuleOption;
+
+/**
+ * @author Marius Bogoevici
+ */
+public class IntermediateRankerOptions {
+
+ private int timeWindow = 6;
+
+ private int timeShift = 2;
+
+ private int topN = 5;
+
+ public int getTimeWindow() {
+ return timeWindow;
+ }
+
+ @ModuleOption("The length in seconds of the time window over with which top N tags are calculated")
+ public void setTimeWindow(int timeWindow) {
+ this.timeWindow = timeWindow;
+ }
+
+ public int getTopN() {
+ return topN;
+ }
+
+ @ModuleOption("The number of ranked items")
+ public void setTopN(int topN) {
+ this.topN = topN;
+ }
+
+ public int getTimeShift() {
+ return timeShift;
+ }
+
+ @ModuleOption("The length in seconds of the time window over with which top N tags are emmitted")
+ public void setTimeShift(int timeShift) {
+ this.timeShift = timeShift;
+ }
+}
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/src/main/resources/config/spring-module.properties b/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/src/main/resources/config/spring-module.properties
new file mode 100644
index 0000000..daf977d
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/src/main/resources/config/spring-module.properties
@@ -0,0 +1 @@
+options_class = com.acme.toptags.intermediate.IntermediateRankerOptions
\ No newline at end of file
diff --git a/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/src/main/resources/config/toptags-intermediate-ranker.xml b/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/src/main/resources/config/toptags-intermediate-ranker.xml
new file mode 100644
index 0000000..5894317
--- /dev/null
+++ b/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/src/main/resources/config/toptags-intermediate-ranker.xml
@@ -0,0 +1,30 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+