diff --git a/rxjava-top-tags/README.md b/rxjava-top-tags/README.md new file mode 100644 index 0000000..074c4ef --- /dev/null +++ b/rxjava-top-tags/README.md @@ -0,0 +1,57 @@ +Spring XD Reactor Stream Example +================================ + +This is an example of a custom module that uses RxJava's Observable API. + +## Requirements + +In order to install the module run it in your Spring XD installation, you will need to have installed: + +* Spring XD version 1.1.x ([Instructions](http://docs.spring.io/spring-xd/docs/current/reference/html/#getting-started)). You'll need to build Spring XD with Java 8+ to use this sample (which uses lambda expressions). + +## Code Tour + +The heart of the sample is the processing module named [TopTags.java](src/main/java/com/acme/TopTags.java). +This uses the Observable API to calculate the most referenced tags in a given time window. The[Tuple](http://docs.spring.io/spring-xd/docs/current/reference/html/#tuples) data type is used as a generic container for keyed data. + + +## Building + + $ mvn package + +## Using the Custom Module + +The uber-jar will be in `target/rxjava-top-tags-1.0.0.BUILD-SNAPSHOT.jar`. To install and register the module to your Spring XD distribution, use the `module upload` Spring XD shell command. Start Spring XD and the shell: + +``` + _____ __ _______ +/ ___| (-) \ \ / / _ \ +\ `--. _ __ _ __ _ _ __ __ _ \ V /| | | | +`--. \ '_ \| '__| | '_ \ / _` | / ^ \| | | | +/\__/ / |_) | | | | | | | (_| | / / \ \ |/ / +\____/| .__/|_| |_|_| |_|\__, | \/ \/___/ + | | __/ | + |_| |___/ +eXtreme Data +1.1.0.BUILD-SNAPSHOT | Admin Server Target: http://localhost:9393 +Welcome to the Spring XD shell. For assistance hit TAB or type "help". +xd:>module upload --file [path-to]/spring-xd-samples/rxjava-top-tags/target/rxjava-top-tags-1.0.0.BUILD-SNAPSHOT.jar --name rxjava-top-tags --type processor +Successfully uploaded module 'processor:reactor-top-tags' +xd:> +``` + +Now create an deploy a stream: + +``` +xd:>stream create reactor --definition "tweetstream | rxjava-top-tags | log" --deploy +``` + +The `rxjava-top-tags` processor also supports the `timeWindow` and `topN` parameters for customizing the processor's +behavior. + +You should see the stream output in the Spring XD log, indicating the top N tags for the given interval: + +``` +2015-02-15 20:13:49,077 1.1.0.RELEASE INFO RxComputationThreadPool-3 sink.top-tags - {"id":"8df84f9b-40ee-23c3-7473-fa611c43a19d","timestamp":1424049229077,"topTags":{"SNL40":18,"NBAAllStarNYC":4,"SpringXD":4}} + +``` diff --git a/rxjava-top-tags/pom.xml b/rxjava-top-tags/pom.xml new file mode 100644 index 0000000..720b838 --- /dev/null +++ b/rxjava-top-tags/pom.xml @@ -0,0 +1,61 @@ + + 4.0.0 + com.acme + rxjava-top-tags + 1.0.0.BUILD-SNAPSHOT + + org.springframework.xd + spring-xd-module-parent + 1.1.0.RELEASE + + + + + spring-milestones + Spring Milestones + http://repo.spring.io/libs-milestone + + false + + + + spring-releases + Spring Releases + http://repo.spring.io/release + + false + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.2 + + 1.8 + 1.8 + + + + + + + + org.springframework.xd. + spring-xd-rxjava + 1.1.0.RELEASE + + + + io.reactivex + rxjava + 1.0.0 + + + + + diff --git a/rxjava-top-tags/src/main/java/com/acme/TopTags.java b/rxjava-top-tags/src/main/java/com/acme/TopTags.java new file mode 100644 index 0000000..167765b --- /dev/null +++ b/rxjava-top-tags/src/main/java/com/acme/TopTags.java @@ -0,0 +1,84 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.acme; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.springframework.xd.tuple.TupleBuilder.tuple; + +import java.util.LinkedHashMap; +import java.util.stream.Collectors; + +import com.gs.collections.api.tuple.Pair; +import com.gs.collections.impl.tuple.Tuples; +import com.jayway.jsonpath.JsonPath; +import net.minidev.json.JSONArray; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import rx.Observable; + +import org.springframework.xd.rxjava.Processor; +import org.springframework.xd.tuple.Tuple; + +/** + * @author Marius Bogoevici + */ +public class TopTags implements Processor { + + private int timeWindow; + + private int timeShift; + + private int topN; + + public TopTags(int timeWindow, int timeShift, int topN) { + this.timeWindow = timeWindow; + this.timeShift = timeShift; + this.topN = topN; + } + + private static Log logger = LogFactory.getLog(TopTags.class); + + @Override + public Observable process(Observable inputStream) { + return inputStream.flatMap(tweet -> { + JSONArray array = JsonPath.read(tweet, "$.entities.hashtags[*].text"); + return Observable.from(array.toArray(new String[array.size()])); + }) + // create (tag,1) tuple for each incoming tag + .map(tag -> Tuples.pair(tag, 1)) + // batch all tags in the time window + .window(timeWindow, timeShift, SECONDS) + // with each time window stream + .flatMap(windowBuffer -> + windowBuffer + // reduce by tag, counting all entries with the same tag + .groupBy(Pair::getOne) + .flatMap( + groupedStream -> + groupedStream.reduce((acc, v) -> Tuples.pair(acc.getOne(), acc.getTwo() + v.getTwo())) + ) + // sort the results + .toSortedList((a, b) -> -a.getTwo().compareTo(b.getTwo())) + // convert the output to a friendlier format + .map(l -> tuple().of("topTags", + l.subList(0, Math.min(topN, l.size())) + .stream().collect(Collectors.toMap(Pair::getOne, Pair::getTwo, (v1, v2) -> v1, LinkedHashMap::new) + ) + ) + ) + ); + } +} diff --git a/rxjava-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java b/rxjava-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java new file mode 100644 index 0000000..d5310d3 --- /dev/null +++ b/rxjava-top-tags/src/main/java/com/acme/TopTagsOptionsMetadata.java @@ -0,0 +1,60 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.acme; + +import org.springframework.xd.module.options.spi.ModuleOption; + +/** + * Holds options for the TopTags module + * + * @author Mark Pollack + * @author Marius Bogoevici + */ +public class TopTagsOptionsMetadata { + + private int timeWindow = 1; + + private int timeShift = 1; + + private int topN = 10; + + public int getTopN() { + return topN; + } + + @ModuleOption("The number of entires to include in the top N listing") + public void setTopN(int topN) { + this.topN = topN; + } + + public int getTimeWindow() { + return timeWindow; + } + + @ModuleOption("The length in seconds of the time window over which the top N tags are calculated") + public void setTimeWindow(int timeWindow) { + this.timeWindow = timeWindow; + } + + public int getTimeShift() { + return timeShift; + } + + @ModuleOption("The frequency in seconds with which the top N tags are calculated") + public void setTimeShift(int timeShift) { + this.timeShift = timeShift; + } +} diff --git a/rxjava-top-tags/src/main/resources/config/rxjava-top-tags.xml b/rxjava-top-tags/src/main/resources/config/rxjava-top-tags.xml new file mode 100644 index 0000000..2ce2e36 --- /dev/null +++ b/rxjava-top-tags/src/main/resources/config/rxjava-top-tags.xml @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/rxjava-top-tags/src/main/resources/config/spring-module.properties b/rxjava-top-tags/src/main/resources/config/spring-module.properties new file mode 100644 index 0000000..4aea295 --- /dev/null +++ b/rxjava-top-tags/src/main/resources/config/spring-module.properties @@ -0,0 +1 @@ +options_class = com.acme.TopTagsOptionsMetadata