Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for KotlinX flow and deferred #6

Merged
merged 11 commits into from
Oct 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
anchors:
env_gradle: &env_gradle
environment:
# java doesn't play nice with containers, it tries to hog the entire machine
# https://circleci.com/blog/how-to-handle-java-oom-errors/
# try the experimental JVM option
_JAVA_OPTIONS: "-XX:+UnlockExperimentalVMOptions -XX:+UseCGroupMemoryLimitForHeap"
GRADLE_OPTS: "-Dorg.gradle.workers.max=2" # and we're only allowed to use 2 vCPUs
docker:
- image: cimg/openjdk:8.0
- image: cimg/openjdk:11.0
restore_cache_wrapper: &restore_cache_wrapper
restore_cache:
key: gradle-wrapper-{{ checksum "gradle/wrapper/gradle-wrapper.properties" }}
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# IntelliJ stuff
.idea/

# mac stuff
*.DS_Store
Expand Down
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# DurianRx releases

## [Unreleased]
### Added
* Added support for kotlinx `Flow` and `Deferred`. ([#6](https://github.com/diffplug/durian-rx/pull/6))

## [3.0.2] - 2020-05-26
### Fixed
Expand Down
22 changes: 19 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
plugins {
id "com.diffplug.blowdryer"
id "com.diffplug.spotless-changelog"
id 'com.diffplug.blowdryer'
id 'com.diffplug.spotless-changelog'
id 'org.jetbrains.kotlin.jvm'
}

spotlessChangelog {
Expand All @@ -14,10 +15,25 @@ apply from: 干.file('base/osgi.gradle')
apply from: 干.file('spotless/freshmark.gradle')
apply from: 干.file('spotless/java.gradle')

spotless {
kotlin {
toggleOffOn()
licenseHeaderFile 干.file("spotless/license-${license}.java")
ktfmt('0.25')
indentWithTabs(2)
}
}

def VER_DURIAN='1.2.0'
def VER_DURIAN_DEBUG='1.0.0'
def VER_RXJAVA='2.0.0'
def VER_JUNIT='4.12'

dependencies {
implementation "com.diffplug.durian:durian-core:${VER_DURIAN}"
implementation "com.diffplug.durian:durian-collect:${VER_DURIAN}"
implementation "com.diffplug.durian:durian-concurrent:${VER_DURIAN}"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.5.2"
api "io.reactivex.rxjava2:rxjava:${VER_RXJAVA}"
testImplementation "junit:junit:${VER_JUNIT}"
testImplementation "com.diffplug.durian:durian-testlib:${VER_DURIAN}"
Expand All @@ -33,7 +49,7 @@ ext.javadoc_links = [
'https://docs.oracle.com/javase/8/docs/api/'
].join(' ')
apply from: 干.file('base/maven.gradle')
apply from: 干.file('base/bintray.gradle')
apply from: 干.file('base/sonatype.gradle')

////////////////////////
// SPOTBUGS (someday) //
Expand Down
8 changes: 0 additions & 8 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,3 @@ org=diffplug

# Build requirements
VER_JAVA=1.8

# Dependencies
VER_DURIAN=1.2.0
VER_DURIAN_DEBUG=1.0.0
VER_RXJAVA=2.0.0

# Testing
VER_JUNIT=4.12
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.0.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.2-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
27 changes: 15 additions & 12 deletions settings.gradle
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
pluginManagement {
plugins {
id 'com.diffplug.blowdryer' version '1.0.0'
id 'com.diffplug.blowdryerSetup' version '1.0.0'
id 'com.diffplug.gradle.eclipse.mavencentral' version '3.18.1'
id 'com.diffplug.gradle.spotless' version '3.27.0'
id 'com.diffplug.spotless-changelog' version '1.0.0'
id 'com.github.ben-manes.versions' version '0.27.0'
id 'com.gradle.plugin-publish' version '0.10.1'
id 'com.jfrog.bintray' version '1.8.4'
id 'com.diffplug.blowdryer' version '1.4.1'
id 'com.diffplug.blowdryerSetup' version '1.4.1'
id 'com.diffplug.eclipse.mavencentral' version '3.33.1'
id 'com.diffplug.spotless' version '5.17.0'
id 'com.diffplug.spotless-changelog' version '2.2.0'
id 'com.gradle.plugin-publish' version '0.16.0'
id 'io.github.gradle-nexus.publish-plugin' version '1.1.0'
id 'org.jdrupes.mdoclet' version '1.0.10'
id 'org.jetbrains.kotlin.jvm' version '1.5.31'
}
}
plugins {
id 'com.diffplug.blowdryerSetup'
id 'com.diffplug.gradle.eclipse.mavencentral' apply false
id 'com.diffplug.gradle.spotless' apply false
id 'com.diffplug.eclipse.mavencentral' apply false
id 'com.diffplug.spotless' apply false
id 'com.diffplug.spotless-changelog' apply false
id 'com.gradle.plugin-publish' apply false
id 'com.jfrog.bintray' apply false
id 'io.github.gradle-nexus.publish-plugin' apply false
id 'org.jdrupes.mdoclet' apply false
id 'org.jetbrains.kotlin.jvm' apply false
}
blowdryerSetup {
github 'diffplug/blowdryer-diffplug', 'tag', '3.1.0'
github 'diffplug/blowdryer-diffplug', 'tag', '5.1.1'
//devLocal '../blowdryer-diffplug'
}

Expand Down
24 changes: 23 additions & 1 deletion src/main/java/com/diffplug/common/rx/GuardedExecutor.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 DiffPlug
* Copyright (C) 2020-2021 DiffPlug
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,8 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Supplier;
import kotlinx.coroutines.Deferred;
import kotlinx.coroutines.flow.Flow;

/**
* GuardedExecutor is an {@link Executor} and {@link RxSubscriber}
Expand Down Expand Up @@ -72,6 +74,16 @@ private Disposable subscribe(Supplier<Disposable> subscriber) {
}
}

@Override
public <T> Disposable subscribeDisposable(Flow<? extends T> flow, RxListener<T> listener) {
return subscribe(() -> delegate.subscribeDisposable(flow, listener));
}

@Override
public <T> Disposable subscribeDisposable(Deferred<? extends T> deferred, RxListener<T> listener) {
return subscribe(() -> delegate.subscribeDisposable(deferred, listener));
}

@Override
public final <T> Disposable subscribeDisposable(Observable<? extends T> observable, RxListener<T> listener) {
return subscribe(() -> delegate.subscribeDisposable(observable, listener));
Expand All @@ -87,6 +99,16 @@ public final <T> Disposable subscribeDisposable(CompletionStage<? extends T> fut
return subscribe(() -> delegate.subscribeDisposable(future, listener));
}

@Override
public <T> void subscribe(Flow<? extends T> flow, RxListener<T> listener) {
subscribeDisposable(flow, listener);
}

@Override
public <T> void subscribe(Deferred<? extends T> deferred, RxListener<T> listener) {
subscribeDisposable(deferred, listener);
}

@Override
public final <T> void subscribe(Observable<? extends T> observable, RxListener<T> listener) {
subscribeDisposable(observable, listener);
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/diffplug/common/rx/Rx.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 DiffPlug
* Copyright (C) 2020-2021 DiffPlug
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,7 +54,7 @@
* // call unsubscribe() on the subscription to cancel it
* io.reactivex.disposables.Disposable subscription = Rx.subscribe(listenableOrObservable, val -> doSomething);
* </pre>
* Long version: `Rx` implements both the {@link rx.Observer} and {@link com.diffplug.common.util.concurrent.FutureCallback}
* Long version: `Rx` implements both the {@link io.reactivex.Observer} and {@link com.diffplug.common.util.concurrent.FutureCallback}
* interfaces by mapping them to two `Consumer`s:
* <ul>
* <li>`Consumer<T> onValue`</li>
Expand Down Expand Up @@ -95,8 +95,8 @@
* <ul>
* <li>`Rx.on(someExecutor).subscribe( ... )`</li>
* </ul>
* Because RxJava's Observables use {@link rx.Scheduler}s rather than {@link java.util.concurrent.Executor}s,
* a Scheduler is automatically created using {@link rx.Schedulers#from}. If you'd like to specify the Scheduler manually, you can use {@link Rx#callbackOn(Executor, Scheduler)}
* Because RxJava's Observables use {@link io.reactivex.Scheduler}s rather than {@link java.util.concurrent.Executor}s,
* a Scheduler is automatically created using {@link Schedulers#from(Executor)}. If you'd like to specify the Scheduler manually, you can use {@link Rx#callbackOn(Executor, Scheduler)}
* or you can create an executor which implements {@link RxExecutor.Has}.
*
* @see <a href="https://diffplug.github.io/durian-swt/javadoc/snapshot/com/diffplug/common/swt/SwtExec.html">SwtExec</a>
Expand Down Expand Up @@ -303,7 +303,7 @@ public static RxTracingPolicy getTracingPolicy() {
} else {
// if it isn't an RxListener, then we'll apply _tracing policy
@SuppressWarnings("unchecked")
RxListener<Object> listener = Rx.onValueOnTerminate(observer::onNext, errorOpt -> {
RxListener<Object> listener = Rx.<Object> onValueOnTerminate(observer::onNext, errorOpt -> {
if (errorOpt.isPresent()) {
observer.onError(errorOpt.get());
} else {
Expand Down
Loading