Skip to content
This repository was archived by the owner on Apr 5, 2022. It is now read-only.

[Sprint 49] Reactive top tags #26

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions rxjava-top-tags-distributed/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
Distributed trending tags calculator
====================================

This example demonstrates how to create a distributed trending tags calculator as a multi-module project.

The example consists of:

- counter modules that count occurences of a tag over a sliding time window, distributing the count workload;
- intermediate modules that create partial tag rankings, emitting them at regular intervals;
- a final module that aggregates the partial rankings into a single final ranking, emitted at regular intervals.

## Requirements

In order to install the module run it in your Spring XD installation, you will need to have installed:

* Spring XD version 1.1.x ([Instructions](http://docs.spring.io/spring-xd/docs/current/reference/html/#getting-started)). You'll need to build Spring XD with Java 8+ to use this sample (which uses lambda expressions).

## Building

mvn clean package

## Installing

Once Spring XD is started, run the following commands from the shell.

````
module upload --file [path-to]/spring-xd-samples/rxjava-top-tags-distributed/rxjava-top-tags-counter/target/rxjava-top-tags-counter-1.0.0-SNAPSHOT.jar --type processor --name top-tags-counter

module upload --file [path-to]/spring-xd-samples/rxjava-top-tags-distributed/rxjava-top-tags-intermediate-ranker/target/rxjava-top-tags-intermediate-ranker-1.0.0-SNAPSHOT.jar --type processor --name top-tags-intermediate-ranker

module upload --file [path-to]/spring-xd-samples/rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/target/rxjava-top-tags-final-ranker-1.0.0-SNAPSHOT.jar --type processor --name top-tags-final-ranker

````

Create the stream

````
stream create twittertags --definition "twitterstream | transform --expression=#jsonPath(payload,'$.entities.hashtags[*].text') | splitter | top-tags-counter | top-tags-intermediate-ranker | top-tags-final-ranker | log" --deploy
````

Deploy the stream

````
stream deploy twittertags
````


98 changes: 98 additions & 0 deletions rxjava-top-tags-distributed/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.acme</groupId>
<artifactId>rxjava-top-tags-distributed-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>

<packaging>pom</packaging>
<parent>
<groupId>org.springframework.xd</groupId>
<artifactId>spring-xd-module-parent</artifactId>
<version>1.1.0.RELEASE</version>
</parent>

<properties>
<spring.xd.rxjava.version>1.1.0.RELEASE</spring.xd.rxjava.version>
<rxjava.version>1.0.0</rxjava.version>
</properties>

<modules>
<module>rxjava-top-tags-counter</module>
<module>rxjava-top-tags-intermediate-ranker</module>
<module>rxjava-top-tags-common</module>
<module>rxjava-top-tags-final-ranker</module>
</modules>


<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>http://repo.spring.io/libs-milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>http://repo.spring.io/release</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>http://repo.spring.io/libs-snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.xd</groupId>
<artifactId>spring-xd-rxjava</artifactId>
<version>${spring.xd.rxjava.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-math</artifactId>
<version>${rxjava.version}</version>
</dependency>
<!-- Internal dependencies -->
<dependency>
<groupId>com.acme</groupId>
<artifactId>rxjava-top-tags-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

</project>
15 changes: 15 additions & 0 deletions rxjava-top-tags-distributed/rxjava-top-tags-common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rxjava-top-tags-distributed-parent</artifactId>
<groupId>com.acme</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>rxjava-top-tags-common</artifactId>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.acme.toptags.common;

/**
* @author Marius Bogoevici
*/
public abstract class Keys {

public static final String TAG = "tag";

public static final String COUNT = "count";

public static final String RANKINGS = "rankings";

public static final String TOPTAGS = "topTags";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.acme.toptags.common;

import java.util.Comparator;

import org.springframework.xd.tuple.Tuple;

/**
* @author Marius Bogoevici
*/
public class TupleComparatorByCount implements Comparator<Tuple> {

public static final TupleComparatorByCount INSTANCE = new TupleComparatorByCount();

@Override
public int compare(Tuple o1, Tuple o2) {
return - Integer.valueOf(o1.getInt(Keys.COUNT)).compareTo(o2.getInt(Keys.COUNT));
}

}
30 changes: 30 additions & 0 deletions rxjava-top-tags-distributed/rxjava-top-tags-counter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.acme</groupId>
<artifactId>rxjava-top-tags-distributed-parent</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>rxjava-top-tags-counter</artifactId>

<dependencies>
<dependency>
<groupId>org.springframework.xd</groupId>
<artifactId>spring-xd-rxjava</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
</dependency>
<dependency>
<groupId>com.acme</groupId>
<artifactId>rxjava-top-tags-common</artifactId>
</dependency>
</dependencies>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.acme.toptags.counter;

import static com.acme.toptags.common.Keys.COUNT;
import static com.acme.toptags.common.Keys.TAG;
import static java.util.concurrent.TimeUnit.SECONDS;

import rx.Observable;

import org.springframework.xd.rxjava.Processor;
import org.springframework.xd.tuple.Tuple;
import org.springframework.xd.tuple.TupleBuilder;

/**
* @author Marius Bogoevici
*/
public class RollingTagCounter implements Processor<String, Tuple> {

private final int timeWindow;

private final int timeShift;

public RollingTagCounter(int timeWindow, int timeShift) {
this.timeWindow = timeWindow;
this.timeShift = timeShift;
}


@Override
public Observable<Tuple> process(Observable<String> observable) {
return observable.window(timeWindow, timeShift, SECONDS)
.flatMap(
// group by word
w -> w.groupBy(e -> e)
.flatMap(s -> Observable.zip(Observable.just(s.getKey()), s.count(), (a, b) -> TupleBuilder.tuple().of(TAG, a, COUNT, b)
)
)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.acme.toptags.counter;

import org.springframework.xd.module.options.spi.ModuleOption;

/**
* @author Marius Bogoevici
*/
public class RollingTagCounterOptions {

private int timeWindow = 9;

private int timeShift = 3;

public int getTimeWindow() {
return timeWindow;
}

@ModuleOption("The length in seconds of the time window over which tags are counted")
public void setTimeWindow(int timeWindow) {
this.timeWindow = timeWindow;
}

public int getTimeShift() {
return timeShift;
}

@ModuleOption("The frequency in seconds with which tag counts are emitted")
public void setTimeShift(int timeShift) {
this.timeShift = timeShift;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
options_class = com.acme.toptags.counter.RollingTagCounterOptions
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">


<bean id="processor" class="com.acme.toptags.counter.RollingTagCounter">
<constructor-arg index="0" value="${timeWindow}"/>
<constructor-arg index="1" value="${timeShift}"/>
</bean>


<!-- The rest is boilerplate that XD 1.1 RC1 will avoid you having to provide -->

<int:channel id="input"/>

<bean name="messageHandler" class="org.springframework.xd.rxjava.SubjectMessageHandler">
<constructor-arg ref="processor"/>
</bean>


<int:service-activator input-channel="input" ref="messageHandler"
output-channel="output"/>

<int:channel id="output"/>

</beans>
32 changes: 32 additions & 0 deletions rxjava-top-tags-distributed/rxjava-top-tags-final-ranker/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rxjava-top-tags-distributed-parent</artifactId>
<groupId>com.acme</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>rxjava-top-tags-final-ranker</artifactId>

<dependencies>
<dependency>
<groupId>org.springframework.xd</groupId>
<artifactId>spring-xd-rxjava</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-math</artifactId>
</dependency>
<dependency>
<groupId>com.acme</groupId>
<artifactId>rxjava-top-tags-common</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.acme.toptags.finalranker;

import static com.acme.toptags.common.Keys.COUNT;
import static com.acme.toptags.common.Keys.RANKINGS;
import static com.acme.toptags.common.Keys.TAG;
import static com.acme.toptags.common.Keys.TOPTAGS;
import static org.springframework.xd.tuple.TupleBuilder.tuple;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import com.acme.toptags.common.TupleComparatorByCount;
import rx.Observable;

import org.springframework.xd.rxjava.Processor;
import org.springframework.xd.tuple.Tuple;
import org.springframework.xd.tuple.TupleBuilder;

/**
* @author Marius Bogoevici
*/
public class FinalRanker implements Processor<Tuple, Tuple> {

private int timeWindow;

private int timeShift;

private int topN;

public FinalRanker(int timeWindow, int timeShift, int topN) {
this.timeWindow = timeWindow;
this.timeShift = timeShift;
this.topN = topN;
}

@Override
public Observable<Tuple> process(Observable<Tuple> observable) {
return observable.window(timeWindow, timeShift, TimeUnit.SECONDS)
.flatMap(w ->
// first, we merge all the intermediate rankings in a single stream
w.flatMap(t -> Observable.from(t.getValue(RANKINGS, Tuple[].class)))
// sort them
.groupBy(e -> e.getString(TAG))
// just take the last count emitted (which represents the latest update)
.flatMap(s -> Observable.zip(
Observable.just(s.getKey()),
s.last().map(t -> t.getInt(COUNT)),
(a, b) -> TupleBuilder.tuple().of(TAG, a, COUNT, b))
)
.toSortedList(TupleComparatorByCount.INSTANCE::compare)
// take topN
.map(l -> l.subList(0, Math.min(topN, l.size())))
.map(l -> tuple().of(TOPTAGS, asMap(l))));
}

private static Map<String, Integer> asMap(List<Tuple> tuples) {
Map<String, Integer> returnValue = new LinkedHashMap<>();
for (Tuple tuple : tuples) {
returnValue.put(tuple.getString(TAG), tuple.getInt(COUNT));
}
return returnValue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.acme.toptags.finalranker;

import org.springframework.xd.module.options.spi.ModuleOption;

/**
* @author Marius Bogoevici
*/
public class FinalRankerOptions {

private int timeWindow = 4;

private int timeShift = 2;

private int topN = 5;

public int getTimeWindow() {
return timeWindow;
}

@ModuleOption("The length in seconds of the time window over with which top N tags are calculated")
public void setTimeWindow(int timeWindow) {
this.timeWindow = timeWindow;
}

public int getTopN() {
return topN;
}

@ModuleOption("The number of ranked items")
public void setTopN(int topN) {
this.topN = topN;
}

public int getTimeShift() {
return timeShift;
}

@ModuleOption("The length in seconds of the time window over with which top N tags are emmitted")
public void setTimeShift(int timeShift) {
this.timeShift = timeShift;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
options_class = com.acme.toptags.finalranker.FinalRankerOptions
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">


<bean id="processor" class="com.acme.toptags.finalranker.FinalRanker">
<constructor-arg index="0" value="${timeWindow}"/>
<constructor-arg index="1" value="${timeShift}"/>
<constructor-arg index="2" value="${topN}"/>
</bean>


<!-- The rest is boilerplate that XD 1.1 RC1 will avoid you having to provide -->

<int:channel id="input"/>

<bean name="messageHandler" class="org.springframework.xd.rxjava.SubjectMessageHandler">
<constructor-arg ref="processor"/>
</bean>


<int:service-activator input-channel="input" ref="messageHandler"
output-channel="output"/>

<int:channel id="output"/>

</beans>
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rxjava-top-tags-distributed-parent</artifactId>
<groupId>com.acme</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>rxjava-top-tags-intermediate-ranker</artifactId>

<dependencies>
<dependency>
<groupId>org.springframework.xd</groupId>
<artifactId>spring-xd-rxjava</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-math</artifactId>
</dependency>
<dependency>
<groupId>com.acme</groupId>
<artifactId>rxjava-top-tags-common</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.acme.toptags.intermediate;

import static com.acme.toptags.common.Keys.COUNT;
import static com.acme.toptags.common.Keys.RANKINGS;
import static com.acme.toptags.common.Keys.TAG;
import static java.util.concurrent.TimeUnit.SECONDS;
import static rx.math.operators.OperatorMinMax.max;

import com.acme.toptags.common.Keys;
import com.acme.toptags.common.TupleComparatorByCount;
import rx.Observable;

import org.springframework.xd.rxjava.Processor;
import org.springframework.xd.tuple.Tuple;
import org.springframework.xd.tuple.TupleBuilder;

/**
* @author Marius Bogoevici
*/
public class IntermediateRanker implements Processor<Tuple, Tuple> {

public static final TupleComparatorByCount COMPARATOR = new TupleComparatorByCount();

private final int timeWindow;

private final int timeShift;

private final int topN;

public IntermediateRanker(int timeWindow, int timeShift, int topN) {
this.timeWindow = timeWindow;
this.timeShift = timeShift;
this.topN = topN;
}

@Override
public Observable<Tuple> process(Observable<Tuple> observable) {

return observable
// collect data every timeWindow seconds
.window(timeWindow, timeShift, SECONDS)
.flatMap(w ->
// group data by word
w.groupBy(e -> e.getString(TAG))
// just take the last count emitted (which represents the latest update)
.flatMap(s -> Observable.zip(
Observable.just(s.getKey()),
s.last().map(t -> t.getInt(COUNT)),
(a, b) -> TupleBuilder.tuple().of(TAG, a, COUNT, b))
)
// rank words by count
.toSortedList(TupleComparatorByCount.INSTANCE::compare)
// merge word counts together
.map(l -> l.subList(0, Math.min(topN, l.size())))
.map(l -> TupleBuilder.tuple().of(RANKINGS, l.toArray(new Tuple[l.size()]))));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.acme.toptags.intermediate;

import org.springframework.xd.module.options.spi.ModuleOption;

/**
* @author Marius Bogoevici
*/
public class IntermediateRankerOptions {

private int timeWindow = 6;

private int timeShift = 2;

private int topN = 5;

public int getTimeWindow() {
return timeWindow;
}

@ModuleOption("The length in seconds of the time window over with which top N tags are calculated")
public void setTimeWindow(int timeWindow) {
this.timeWindow = timeWindow;
}

public int getTopN() {
return topN;
}

@ModuleOption("The number of ranked items")
public void setTopN(int topN) {
this.topN = topN;
}

public int getTimeShift() {
return timeShift;
}

@ModuleOption("The length in seconds of the time window over with which top N tags are emmitted")
public void setTimeShift(int timeShift) {
this.timeShift = timeShift;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
options_class = com.acme.toptags.intermediate.IntermediateRankerOptions
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">


<bean id="processor" class="com.acme.toptags.intermediate.IntermediateRanker">
<constructor-arg index="0" value="${timeWindow}"/>
<constructor-arg index="1" value="${timeShift}"/>
<constructor-arg index="2" value="${topN}"/>
</bean>


<!-- The rest is boilerplate that XD 1.1 RC1 will avoid you having to provide -->

<int:channel id="input"/>

<bean name="messageHandler" class="org.springframework.xd.rxjava.SubjectMessageHandler">
<constructor-arg ref="processor"/>
</bean>


<int:service-activator input-channel="input" ref="messageHandler"
output-channel="output"/>

<int:channel id="output"/>

</beans>