From 35c6dbdef68361e837778a04a740f0c8f69f075e Mon Sep 17 00:00:00 2001 From: MukjepScarlet <93977077+mukjepscarlet@users.noreply.github.com> Date: Fri, 8 Aug 2025 16:23:40 +0800 Subject: [PATCH 01/21] Add okhttp-sse dependency --- gradle/libs.versions.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index c393145c69..df0fdce1ad 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -55,6 +55,7 @@ kotlinx-serialization-core = { module = "org.jetbrains.kotlinx:kotlinx-serializa kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlinx-serialization" } kotlinx-serialization-proto = { module = "org.jetbrains.kotlinx:kotlinx-serialization-protobuf", version.ref = "kotlinx-serialization" } okhttp-client = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" } +okhttp-sse = { module = "com.squareup.okhttp3:okhttp-sse", version.ref = "okhttp" } okhttp-loggingInterceptor = { module = "com.squareup.okhttp3:logging-interceptor", version.ref = "okhttp" } okhttp-mockwebserver = { module = "com.squareup.okhttp3:mockwebserver", version.ref = "okhttp" } junit = { module = "junit:junit", version = "4.13.2" } From 3fc685e9677cf18a92f080aa09a4f281366f928f Mon Sep 17 00:00:00 2001 From: MukjepScarlet <93977077+mukjepscarlet@users.noreply.github.com> Date: Mon, 11 Aug 2025 17:50:26 +0800 Subject: [PATCH 02/21] SSE/Java9: implementation --- retrofit-adapters/sse-java9/README.md | 2 + retrofit-adapters/sse-java9/build.gradle | 25 ++++ retrofit-adapters/sse-java9/gradle.properties | 3 + .../retrofit2/adapter/sse/ServerSentEvent.kt | 30 +++++ .../sse/java9/SseJucFlowCallAdapter.kt | 114 ++++++++++++++++++ .../sse/java9/SseJucFlowCallAdapterFactory.kt | 72 +++++++++++ settings.gradle | 1 + 7 files changed, 247 insertions(+) create mode 100644 retrofit-adapters/sse-java9/README.md create mode 100644 retrofit-adapters/sse-java9/build.gradle create mode 100644 retrofit-adapters/sse-java9/gradle.properties create mode 100644 retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/ServerSentEvent.kt create mode 100644 retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt create mode 100644 retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt diff --git a/retrofit-adapters/sse-java9/README.md b/retrofit-adapters/sse-java9/README.md new file mode 100644 index 0000000000..0ec7dee527 --- /dev/null +++ b/retrofit-adapters/sse-java9/README.md @@ -0,0 +1,2 @@ +[//]: # (TODO) +[//]: # (TODO: test) diff --git a/retrofit-adapters/sse-java9/build.gradle b/retrofit-adapters/sse-java9/build.gradle new file mode 100644 index 0000000000..08c6cd6b81 --- /dev/null +++ b/retrofit-adapters/sse-java9/build.gradle @@ -0,0 +1,25 @@ +apply plugin: 'java-library' +apply plugin: 'org.jetbrains.kotlin.jvm' +apply plugin: 'com.vanniktech.maven.publish' +apply plugin: 'org.jetbrains.dokka' + +dependencies { + api projects.retrofit + api libs.okhttp.sse + compileOnly libs.findBugsAnnotations + +// testImplementation libs.junit +// testImplementation libs.truth +// testImplementation libs.guava + testImplementation libs.okhttp.mockwebserver +} + +jar { + manifest { + attributes 'Automatic-Module-Name': 'retrofit2.adapter.sse-java9' + } +} + +kotlin { + jvmToolchain 9 +} diff --git a/retrofit-adapters/sse-java9/gradle.properties b/retrofit-adapters/sse-java9/gradle.properties new file mode 100644 index 0000000000..56d6c7687f --- /dev/null +++ b/retrofit-adapters/sse-java9/gradle.properties @@ -0,0 +1,3 @@ +POM_ARTIFACT_ID=adapter-sse-java9 +POM_NAME=Adapter: SSE Java 9 +POM_DESCRIPTION=A Retrofit CallAdapter for server-sent event (SSE) with Java 9's Flow. diff --git a/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/ServerSentEvent.kt b/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/ServerSentEvent.kt new file mode 100644 index 0000000000..615ddcdde4 --- /dev/null +++ b/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/ServerSentEvent.kt @@ -0,0 +1,30 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse + +/** + * A server-sent event. + */ +data class ServerSentEvent( + @get:JvmName("id") + val id: ID?, + + @get:JvmName("type") + val type: TYPE?, + + @get:JvmName("data") + val data: DATA, +) diff --git a/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt b/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt new file mode 100644 index 0000000000..06fc5ef773 --- /dev/null +++ b/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt @@ -0,0 +1,114 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.java9 + +import java.lang.reflect.ParameterizedType +import java.lang.reflect.Type +import java.util.concurrent.Executor +import java.util.concurrent.Executors +import java.util.concurrent.Flow +import java.util.concurrent.ForkJoinPool +import java.util.concurrent.SubmissionPublisher +import okhttp3.sse.EventSource +import okhttp3.sse.EventSourceListener +import okhttp3.sse.EventSources +import retrofit2.Call +import retrofit2.CallAdapter +import retrofit2.Callback +import retrofit2.Converter +import retrofit2.Response +import retrofit2.Retrofit +import retrofit2.adapter.sse.ServerSentEvent + +internal class SseJucFlowCallAdapter( + private val idType: Type, + private val typeType: Type, + private val dataType: Type, + private val idConverter: Converter, + private val typeConverter: Converter, + private val dataConverter: Converter, + private val retrofit: Retrofit, +) : CallAdapter>> { + + private val executor: Executor = + retrofit.callbackExecutor() + ?: ForkJoinPool.commonPool().takeIf { ForkJoinPool.getCommonPoolParallelism() > 1 } + ?: Executors.newCachedThreadPool() + + private val responseType = object : ParameterizedType { + override fun getActualTypeArguments(): Array { + return arrayOf(idType, typeType, dataType) + } + + override fun getOwnerType(): Type? { + return null + } + + override fun getRawType(): Type { + return Flow.Publisher::class.java + } + } + + override fun responseType(): Type = responseType + + override fun adapt(call: Call): Flow.Publisher> = SsePublisher(call) + + inner class SsePublisher( + private val call: Call, + ) : SubmissionPublisher>(executor, Flow.defaultBufferSize()) { + override fun subscribe(subscriber: Flow.Subscriber>?) { + super.subscribe(subscriber) + call.enqueue(PublisherCallback(this)) + } + + override fun close() { + call.cancel() + super.close() + } + } + + inner class PublisherCallback(private val publisher: SubmissionPublisher>) : + Callback { + override fun onResponse(call: Call, response: Response) { + EventSources.processResponse( + response.raw(), + object : EventSourceListener() { + override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) { + val convertedId = + if (id != null) idConverter.convert(id) ?: error("Failed to convert $id to $idType") else null + val convertedType = + if (type != null) typeConverter.convert(type) ?: error("Failed to convert $type to $typeType") else null + val convertedData = dataConverter.convert(data) ?: error("Failed to convert $data to $dataType") + publisher.submit(ServerSentEvent(convertedId, convertedType, convertedData)) + } + + override fun onClosed(eventSource: EventSource) { + publisher.close() + } + + override fun onFailure(eventSource: EventSource, t: Throwable?, response: okhttp3.Response?) { + publisher.closeExceptionally(t ?: RuntimeException()) // TODO + } + }, + ) + } + + override fun onFailure(call: Call, t: Throwable) { + publisher.closeExceptionally(t) + } + } + +} diff --git a/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt b/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt new file mode 100644 index 0000000000..6e5c3405d0 --- /dev/null +++ b/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.java9 + +import java.lang.reflect.ParameterizedType +import java.lang.reflect.Type +import java.util.concurrent.Flow +import retrofit2.CallAdapter +import retrofit2.Retrofit +import retrofit2.adapter.sse.ServerSentEvent + +class SseJucFlowCallAdapterFactory : CallAdapter.Factory() { + companion object { + private val EMPTY_ARRAY = emptyArray() + } + + override fun get( + returnType: Type, + annotations: Array, + retrofit: Retrofit, + ): CallAdapter<*, *>? { + if (getRawType(returnType) != Flow.Publisher::class.java) { + return null + } + + if (returnType !is ParameterizedType) { + error( + "Flow.Publisher return type must be parameterized as Flow.Publisher or Flow.Publisher", + ) + } + + val innerType = getParameterUpperBound(0, returnType) + + if (getRawType(innerType) != ServerSentEvent::class.java) { + return null + } + + if (innerType !is ParameterizedType) { + error( + "ServerSentEvent must be parameterized as ServerSentEvent" + + " or ServerSentEvent", + ) + } + + val idType = getParameterUpperBound(0, innerType) + val typeType = getParameterUpperBound(1, innerType) + val dataType = getParameterUpperBound(2, innerType) + + return SseJucFlowCallAdapter( + idType, + typeType, + dataType, + retrofit.stringConverter(idType, EMPTY_ARRAY), + retrofit.stringConverter(typeType, EMPTY_ARRAY), + retrofit.stringConverter(dataType, EMPTY_ARRAY), + retrofit, + ) + } +} diff --git a/settings.gradle b/settings.gradle index e94afa5343..a19d9b25e8 100644 --- a/settings.gradle +++ b/settings.gradle @@ -30,6 +30,7 @@ include ':retrofit-adapters:rxjava' include ':retrofit-adapters:rxjava2' include ':retrofit-adapters:rxjava3' include ':retrofit-adapters:scala' +include ':retrofit-adapters:sse-java9' include ':retrofit-converters:gson' include ':retrofit-converters:guava' From e3e63df1f0d07d0141768ea9a9c6d9839c76f2df Mon Sep 17 00:00:00 2001 From: MukjepScarlet <93977077+mukjepscarlet@users.noreply.github.com> Date: Mon, 11 Aug 2025 18:48:49 +0800 Subject: [PATCH 03/21] SSE/Java9: fix type --- retrofit-adapters/sse-java9/build.gradle | 8 +- .../sse/java9/SseJucFlowCallAdapter.kt | 58 +++++----- .../sse/java9/SseJucFlowCallAdapterFactory.kt | 12 +- .../SseJucFlowCallAdapterFactoryTest.java | 109 ++++++++++++++++++ 4 files changed, 150 insertions(+), 37 deletions(-) create mode 100644 retrofit-adapters/sse-java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java diff --git a/retrofit-adapters/sse-java9/build.gradle b/retrofit-adapters/sse-java9/build.gradle index 08c6cd6b81..51115b03f0 100644 --- a/retrofit-adapters/sse-java9/build.gradle +++ b/retrofit-adapters/sse-java9/build.gradle @@ -8,10 +8,12 @@ dependencies { api libs.okhttp.sse compileOnly libs.findBugsAnnotations -// testImplementation libs.junit -// testImplementation libs.truth -// testImplementation libs.guava + testImplementation libs.junit + testImplementation libs.truth + testImplementation libs.guava testImplementation libs.okhttp.mockwebserver + testImplementation projects.retrofitConverters.scalars + testImplementation projects.retrofitConverters.gson } jar { diff --git a/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt b/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt index 06fc5ef773..85b477aefa 100644 --- a/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt +++ b/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt @@ -15,13 +15,13 @@ */ package retrofit2.adapter.sse.java9 -import java.lang.reflect.ParameterizedType import java.lang.reflect.Type import java.util.concurrent.Executor import java.util.concurrent.Executors import java.util.concurrent.Flow import java.util.concurrent.ForkJoinPool import java.util.concurrent.SubmissionPublisher +import okhttp3.ResponseBody import okhttp3.sse.EventSource import okhttp3.sse.EventSourceListener import okhttp3.sse.EventSources @@ -29,7 +29,6 @@ import retrofit2.Call import retrofit2.CallAdapter import retrofit2.Callback import retrofit2.Converter -import retrofit2.Response import retrofit2.Retrofit import retrofit2.adapter.sse.ServerSentEvent @@ -40,34 +39,22 @@ internal class SseJucFlowCallAdapter( private val idConverter: Converter, private val typeConverter: Converter, private val dataConverter: Converter, - private val retrofit: Retrofit, -) : CallAdapter>> { + retrofit: Retrofit, +) : CallAdapter>> { private val executor: Executor = retrofit.callbackExecutor() ?: ForkJoinPool.commonPool().takeIf { ForkJoinPool.getCommonPoolParallelism() > 1 } ?: Executors.newCachedThreadPool() - private val responseType = object : ParameterizedType { - override fun getActualTypeArguments(): Array { - return arrayOf(idType, typeType, dataType) - } - - override fun getOwnerType(): Type? { - return null - } - - override fun getRawType(): Type { - return Flow.Publisher::class.java - } - } - - override fun responseType(): Type = responseType + override fun responseType(): Type = ResponseBody::class.java - override fun adapt(call: Call): Flow.Publisher> = SsePublisher(call) + override fun adapt( + call: Call, + ): Flow.Publisher> = SsePublisher(call) inner class SsePublisher( - private val call: Call, + private val call: Call, ) : SubmissionPublisher>(executor, Flow.defaultBufferSize()) { override fun subscribe(subscriber: Flow.Subscriber>?) { super.subscribe(subscriber) @@ -80,18 +67,17 @@ internal class SseJucFlowCallAdapter( } } - inner class PublisherCallback(private val publisher: SubmissionPublisher>) : - Callback { - override fun onResponse(call: Call, response: Response) { + inner class PublisherCallback( + private val publisher: SubmissionPublisher>, + ) : Callback { + override fun onResponse(call: Call, response: retrofit2.Response) { EventSources.processResponse( response.raw(), object : EventSourceListener() { override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) { - val convertedId = - if (id != null) idConverter.convert(id) ?: error("Failed to convert $id to $idType") else null - val convertedType = - if (type != null) typeConverter.convert(type) ?: error("Failed to convert $type to $typeType") else null - val convertedData = dataConverter.convert(data) ?: error("Failed to convert $data to $dataType") + val convertedId = convertId(id) + val convertedType = convertType(type) + val convertedData = convertData(data) publisher.submit(ServerSentEvent(convertedId, convertedType, convertedData)) } @@ -106,9 +92,21 @@ internal class SseJucFlowCallAdapter( ) } - override fun onFailure(call: Call, t: Throwable) { + override fun onFailure(call: Call, t: Throwable) { publisher.closeExceptionally(t) } } + private fun convertId(id: String?): ID? { + return if (id != null) idConverter.convert(id) ?: error("Failed to convert $id to $idType, actual type is ${id.javaClass}") else null + } + + private fun convertType(type: String?): TYPE? { + return if (type != null) typeConverter.convert(type) ?: error("Failed to convert $type to $typeType, actual type is ${type.javaClass}") else null + } + + private fun convertData(data: String): DATA { + return dataConverter.convert(data) ?: error("Failed to convert $data to $dataType, actual type is ${data.javaClass}") + } + } diff --git a/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt b/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt index 6e5c3405d0..afe3bc0baf 100644 --- a/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt +++ b/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt @@ -21,11 +21,11 @@ import java.util.concurrent.Flow import retrofit2.CallAdapter import retrofit2.Retrofit import retrofit2.adapter.sse.ServerSentEvent +import retrofit2.http.Streaming -class SseJucFlowCallAdapterFactory : CallAdapter.Factory() { - companion object { - private val EMPTY_ARRAY = emptyArray() - } +private val EMPTY_ARRAY = emptyArray() + +object SseJucFlowCallAdapterFactory : CallAdapter.Factory() { override fun get( returnType: Type, @@ -55,6 +55,10 @@ class SseJucFlowCallAdapterFactory : CallAdapter.Factory() { ) } + if (annotations.none { it is Streaming }) { + error("SSE endpoint must be annotated with @Streaming") + } + val idType = getParameterUpperBound(0, innerType) val typeType = getParameterUpperBound(1, innerType) val dataType = getParameterUpperBound(2, innerType) diff --git a/retrofit-adapters/sse-java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java b/retrofit-adapters/sse-java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java new file mode 100644 index 0000000000..f188777ea6 --- /dev/null +++ b/retrofit-adapters/sse-java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java @@ -0,0 +1,109 @@ +/* + * Copyright (C) 2016 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.java9; + +import static com.google.common.truth.Truth.assertThat; + +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Retrofit; +import retrofit2.adapter.sse.ServerSentEvent; +import retrofit2.converter.scalars.ScalarsConverterFactory; +import retrofit2.http.GET; +import retrofit2.http.Streaming; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public final class SseJucFlowCallAdapterFactoryTest { + + @Rule + public final MockWebServer server = new MockWebServer(); + + interface Service { + @Streaming + @GET("/") + Flow.Publisher> sse(); + } + + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(ScalarsConverterFactory.create()) +// .addConverterFactory(GsonConverterFactory.create()) + .addCallAdapterFactory(SseJucFlowCallAdapterFactory.INSTANCE) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void simpleEvents() throws Exception { + MockResponse mockResponse = new MockResponse() + .setHeader("Content-Type", "text/event-stream") + .setBody("id: 1\ndata: foo\n\nid: 2\ndata: bar\n\n"); + server.enqueue(mockResponse); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + service.sse().subscribe(new Flow.Subscriber<>() { + private final AtomicInteger count = new AtomicInteger(); + + @Override + public void onSubscribe(Flow.Subscription subscription) { + subscription.request(1); + count.incrementAndGet(); + } + + @Override + public void onNext(ServerSentEvent serverSentEvent) { + switch (count.get()) { + case 1: + assertThat(serverSentEvent.id()).isEqualTo(1); + assertThat(serverSentEvent.type()).isEqualTo(null); + assertThat(serverSentEvent.data()).isEqualTo("foo"); + break; + case 2: + assertThat(serverSentEvent.id()).isEqualTo(2); + assertThat(serverSentEvent.type()).isEqualTo(null); + assertThat(serverSentEvent.data()).isEqualTo("bar"); + break; + } + } + + @Override + public void onError(Throwable throwable) { + completableFuture.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + completableFuture.complete(null); + } + }); + + completableFuture.get(5, TimeUnit.SECONDS); + } + +} From 4ee2217f55fb2c6c63b9266d9144b08a43584d0b Mon Sep 17 00:00:00 2001 From: MukjepScarlet <93977077+mukjepscarlet@users.noreply.github.com> Date: Mon, 11 Aug 2025 19:01:45 +0800 Subject: [PATCH 04/21] SSE/Java9: fix body reading --- .../sse/java9/SseJucFlowCallAdapter.kt | 2 +- .../SseJucFlowCallAdapterFactoryTest.java | 19 +++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt b/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt index 85b477aefa..8469863046 100644 --- a/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt +++ b/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt @@ -72,7 +72,7 @@ internal class SseJucFlowCallAdapter( ) : Callback { override fun onResponse(call: Call, response: retrofit2.Response) { EventSources.processResponse( - response.raw(), + response.raw().newBuilder().body(response.body() ?: error("Response body is null")).build(), object : EventSourceListener() { override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) { val convertedId = convertId(id) diff --git a/retrofit-adapters/sse-java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java b/retrofit-adapters/sse-java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java index f188777ea6..5b79ad94b3 100644 --- a/retrofit-adapters/sse-java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java +++ b/retrofit-adapters/sse-java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java @@ -41,7 +41,7 @@ public final class SseJucFlowCallAdapterFactoryTest { interface Service { @Streaming @GET("/") - Flow.Publisher> sse(); + Flow.Publisher> sse(); } private Service service; @@ -62,30 +62,29 @@ public void setUp() { public void simpleEvents() throws Exception { MockResponse mockResponse = new MockResponse() .setHeader("Content-Type", "text/event-stream") - .setBody("id: 1\ndata: foo\n\nid: 2\ndata: bar\n\n"); + .setBody("id: 1\nevent: type1\ndata: foo\n\nid: 2\ndata: bar\n\n"); server.enqueue(mockResponse); CompletableFuture completableFuture = new CompletableFuture<>(); service.sse().subscribe(new Flow.Subscriber<>() { - private final AtomicInteger count = new AtomicInteger(); + private final AtomicInteger count = new AtomicInteger(0); @Override public void onSubscribe(Flow.Subscription subscription) { - subscription.request(1); - count.incrementAndGet(); + subscription.request(2); } @Override - public void onNext(ServerSentEvent serverSentEvent) { - switch (count.get()) { + public void onNext(ServerSentEvent serverSentEvent) { + switch (count.incrementAndGet()) { case 1: - assertThat(serverSentEvent.id()).isEqualTo(1); - assertThat(serverSentEvent.type()).isEqualTo(null); + assertThat(serverSentEvent.id()).isEqualTo("1"); + assertThat(serverSentEvent.type()).isEqualTo("type1"); assertThat(serverSentEvent.data()).isEqualTo("foo"); break; case 2: - assertThat(serverSentEvent.id()).isEqualTo(2); + assertThat(serverSentEvent.id()).isEqualTo("2"); assertThat(serverSentEvent.type()).isEqualTo(null); assertThat(serverSentEvent.data()).isEqualTo("bar"); break; From 50667b83f5a4ce7fd8988ed2142545e6a0fb40cf Mon Sep 17 00:00:00 2001 From: MukjepScarlet <93977077+mukjepscarlet@users.noreply.github.com> Date: Mon, 11 Aug 2025 19:05:10 +0800 Subject: [PATCH 05/21] SSE/Java9: error shortcut --- .../adapter/sse/java9/SseJucFlowCallAdapter.kt | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt b/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt index 8469863046..46cb246027 100644 --- a/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt +++ b/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt @@ -32,6 +32,10 @@ import retrofit2.Converter import retrofit2.Retrofit import retrofit2.adapter.sse.ServerSentEvent +@Suppress("NOTHING_TO_INLINE") +private inline fun conversionError(value: T, type: Type): Nothing = + error("Failed to convert $value to $type, actual type is ${value.javaClass}") + internal class SseJucFlowCallAdapter( private val idType: Type, private val typeType: Type, @@ -98,15 +102,15 @@ internal class SseJucFlowCallAdapter( } private fun convertId(id: String?): ID? { - return if (id != null) idConverter.convert(id) ?: error("Failed to convert $id to $idType, actual type is ${id.javaClass}") else null + return if (id != null) idConverter.convert(id) ?: conversionError(id, idType) else null } private fun convertType(type: String?): TYPE? { - return if (type != null) typeConverter.convert(type) ?: error("Failed to convert $type to $typeType, actual type is ${type.javaClass}") else null + return if (type != null) typeConverter.convert(type) ?: conversionError(type, typeType) else null } private fun convertData(data: String): DATA { - return dataConverter.convert(data) ?: error("Failed to convert $data to $dataType, actual type is ${data.javaClass}") + return dataConverter.convert(data) ?: conversionError(data, dataType) } } From 04411d3d6940a4082690f045c1d358594bacb3e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=A8=E8=91=89=20Scarlet?= <93977077+mukjepscarlet@users.noreply.github.com> Date: Mon, 11 Aug 2025 21:09:30 +0800 Subject: [PATCH 06/21] Reorganize files --- retrofit-adapters/sse-java9/README.md | 2 -- retrofit-adapters/sse/core/build.gradle | 23 +++++++++++++++++++ .../retrofit2/adapter/sse/ServerSentEvent.kt | 0 .../{sse-java9 => sse/java9}/build.gradle | 3 ++- .../java9}/gradle.properties | 0 .../sse/java9/SseJucFlowCallAdapter.kt | 0 .../sse/java9/SseJucFlowCallAdapterFactory.kt | 0 .../SseJucFlowCallAdapterFactoryTest.java | 0 settings.gradle | 3 ++- 9 files changed, 27 insertions(+), 4 deletions(-) delete mode 100644 retrofit-adapters/sse-java9/README.md create mode 100644 retrofit-adapters/sse/core/build.gradle rename retrofit-adapters/{sse-java9 => sse/core}/src/main/java/retrofit2/adapter/sse/ServerSentEvent.kt (100%) rename retrofit-adapters/{sse-java9 => sse/java9}/build.gradle (83%) rename retrofit-adapters/{sse-java9 => sse/java9}/gradle.properties (100%) rename retrofit-adapters/{sse-java9 => sse/java9}/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt (100%) rename retrofit-adapters/{sse-java9 => sse/java9}/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt (100%) rename retrofit-adapters/{sse-java9 => sse/java9}/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java (100%) diff --git a/retrofit-adapters/sse-java9/README.md b/retrofit-adapters/sse-java9/README.md deleted file mode 100644 index 0ec7dee527..0000000000 --- a/retrofit-adapters/sse-java9/README.md +++ /dev/null @@ -1,2 +0,0 @@ -[//]: # (TODO) -[//]: # (TODO: test) diff --git a/retrofit-adapters/sse/core/build.gradle b/retrofit-adapters/sse/core/build.gradle new file mode 100644 index 0000000000..c08cb69739 --- /dev/null +++ b/retrofit-adapters/sse/core/build.gradle @@ -0,0 +1,23 @@ +apply plugin: 'java-library' +apply plugin: 'org.jetbrains.kotlin.jvm' +apply plugin: 'com.vanniktech.maven.publish' +apply plugin: 'org.jetbrains.dokka' + +dependencies { + api projects.retrofit + api libs.okhttp.sse + compileOnly libs.findBugsAnnotations + +// testImplementation libs.junit +// testImplementation libs.truth +// testImplementation libs.guava +// testImplementation libs.okhttp.mockwebserver +// testImplementation projects.retrofitConverters.scalars +// testImplementation projects.retrofitConverters.gson +} + +jar { + manifest { + attributes 'Automatic-Module-Name': 'retrofit2.adapter.sse-core' + } +} diff --git a/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/ServerSentEvent.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/ServerSentEvent.kt similarity index 100% rename from retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/ServerSentEvent.kt rename to retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/ServerSentEvent.kt diff --git a/retrofit-adapters/sse-java9/build.gradle b/retrofit-adapters/sse/java9/build.gradle similarity index 83% rename from retrofit-adapters/sse-java9/build.gradle rename to retrofit-adapters/sse/java9/build.gradle index 51115b03f0..8e476f3a0f 100644 --- a/retrofit-adapters/sse-java9/build.gradle +++ b/retrofit-adapters/sse/java9/build.gradle @@ -5,6 +5,7 @@ apply plugin: 'org.jetbrains.dokka' dependencies { api projects.retrofit + api projects.retrofitAdapters.sse.core api libs.okhttp.sse compileOnly libs.findBugsAnnotations @@ -18,7 +19,7 @@ dependencies { jar { manifest { - attributes 'Automatic-Module-Name': 'retrofit2.adapter.sse-java9' + attributes 'Automatic-Module-Name': 'retrofit2.adapter.sse-java9' } } diff --git a/retrofit-adapters/sse-java9/gradle.properties b/retrofit-adapters/sse/java9/gradle.properties similarity index 100% rename from retrofit-adapters/sse-java9/gradle.properties rename to retrofit-adapters/sse/java9/gradle.properties diff --git a/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt b/retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt similarity index 100% rename from retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt rename to retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt diff --git a/retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt b/retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt similarity index 100% rename from retrofit-adapters/sse-java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt rename to retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt diff --git a/retrofit-adapters/sse-java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java b/retrofit-adapters/sse/java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java similarity index 100% rename from retrofit-adapters/sse-java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java rename to retrofit-adapters/sse/java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java diff --git a/settings.gradle b/settings.gradle index a19d9b25e8..55663d2385 100644 --- a/settings.gradle +++ b/settings.gradle @@ -30,7 +30,8 @@ include ':retrofit-adapters:rxjava' include ':retrofit-adapters:rxjava2' include ':retrofit-adapters:rxjava3' include ':retrofit-adapters:scala' -include ':retrofit-adapters:sse-java9' +include ':retrofit-adapters:sse:core' +include ':retrofit-adapters:sse:java9' include ':retrofit-converters:gson' include ':retrofit-converters:guava' From c2acec9211008dfa03ede019bb26a74cd7101dfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=A8=E8=91=89=20Scarlet?= <93977077+mukjepscarlet@users.noreply.github.com> Date: Mon, 11 Aug 2025 21:31:22 +0800 Subject: [PATCH 07/21] SSE/Java9: fix type adapter --- retrofit-adapters/sse/java9/build.gradle | 1 - .../adapter/sse/java9/SseJucFlowCallAdapter.kt | 13 +++++++------ .../sse/java9/SseJucFlowCallAdapterFactory.kt | 6 +++--- .../sse/java9/SseJucFlowCallAdapterFactoryTest.java | 9 ++++----- 4 files changed, 14 insertions(+), 15 deletions(-) diff --git a/retrofit-adapters/sse/java9/build.gradle b/retrofit-adapters/sse/java9/build.gradle index 8e476f3a0f..62c07dba4a 100644 --- a/retrofit-adapters/sse/java9/build.gradle +++ b/retrofit-adapters/sse/java9/build.gradle @@ -14,7 +14,6 @@ dependencies { testImplementation libs.guava testImplementation libs.okhttp.mockwebserver testImplementation projects.retrofitConverters.scalars - testImplementation projects.retrofitConverters.gson } jar { diff --git a/retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt b/retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt index 46cb246027..26498c7a0d 100644 --- a/retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt +++ b/retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt @@ -22,6 +22,7 @@ import java.util.concurrent.Flow import java.util.concurrent.ForkJoinPool import java.util.concurrent.SubmissionPublisher import okhttp3.ResponseBody +import okhttp3.ResponseBody.Companion.toResponseBody import okhttp3.sse.EventSource import okhttp3.sse.EventSourceListener import okhttp3.sse.EventSources @@ -40,9 +41,9 @@ internal class SseJucFlowCallAdapter( private val idType: Type, private val typeType: Type, private val dataType: Type, - private val idConverter: Converter, - private val typeConverter: Converter, - private val dataConverter: Converter, + private val idConverter: Converter, + private val typeConverter: Converter, + private val dataConverter: Converter, retrofit: Retrofit, ) : CallAdapter>> { @@ -102,15 +103,15 @@ internal class SseJucFlowCallAdapter( } private fun convertId(id: String?): ID? { - return if (id != null) idConverter.convert(id) ?: conversionError(id, idType) else null + return if (id != null) idConverter.convert(id.toResponseBody()) ?: conversionError(id, idType) else null } private fun convertType(type: String?): TYPE? { - return if (type != null) typeConverter.convert(type) ?: conversionError(type, typeType) else null + return if (type != null) typeConverter.convert(type.toResponseBody()) ?: conversionError(type, typeType) else null } private fun convertData(data: String): DATA { - return dataConverter.convert(data) ?: conversionError(data, dataType) + return dataConverter.convert(data.toResponseBody()) ?: conversionError(data, dataType) } } diff --git a/retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt b/retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt index afe3bc0baf..c87e69dfe5 100644 --- a/retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt +++ b/retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt @@ -67,9 +67,9 @@ object SseJucFlowCallAdapterFactory : CallAdapter.Factory() { idType, typeType, dataType, - retrofit.stringConverter(idType, EMPTY_ARRAY), - retrofit.stringConverter(typeType, EMPTY_ARRAY), - retrofit.stringConverter(dataType, EMPTY_ARRAY), + retrofit.responseBodyConverter(idType, EMPTY_ARRAY), + retrofit.responseBodyConverter(typeType, EMPTY_ARRAY), + retrofit.responseBodyConverter(dataType, EMPTY_ARRAY), retrofit, ) } diff --git a/retrofit-adapters/sse/java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java b/retrofit-adapters/sse/java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java index 5b79ad94b3..408cce5860 100644 --- a/retrofit-adapters/sse/java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java +++ b/retrofit-adapters/sse/java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java @@ -41,7 +41,7 @@ public final class SseJucFlowCallAdapterFactoryTest { interface Service { @Streaming @GET("/") - Flow.Publisher> sse(); + Flow.Publisher> sse(); } private Service service; @@ -52,7 +52,6 @@ public void setUp() { new Retrofit.Builder() .baseUrl(server.url("/")) .addConverterFactory(ScalarsConverterFactory.create()) -// .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(SseJucFlowCallAdapterFactory.INSTANCE) .build(); service = retrofit.create(Service.class); @@ -76,15 +75,15 @@ public void onSubscribe(Flow.Subscription subscription) { } @Override - public void onNext(ServerSentEvent serverSentEvent) { + public void onNext(ServerSentEvent serverSentEvent) { switch (count.incrementAndGet()) { case 1: - assertThat(serverSentEvent.id()).isEqualTo("1"); + assertThat(serverSentEvent.id()).isEqualTo(1); assertThat(serverSentEvent.type()).isEqualTo("type1"); assertThat(serverSentEvent.data()).isEqualTo("foo"); break; case 2: - assertThat(serverSentEvent.id()).isEqualTo("2"); + assertThat(serverSentEvent.id()).isEqualTo(2); assertThat(serverSentEvent.type()).isEqualTo(null); assertThat(serverSentEvent.data()).isEqualTo("bar"); break; From 3d916b7f8bdc807706b2cd46726de848e9316e03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=A8=E8=91=89=20Scarlet?= <93977077+mukjepscarlet@users.noreply.github.com> Date: Mon, 11 Aug 2025 21:35:50 +0800 Subject: [PATCH 08/21] SSE/Java9: rename module --- retrofit-adapters/sse/{java9 => juc-flow}/build.gradle | 6 ------ retrofit-adapters/sse/{java9 => juc-flow}/gradle.properties | 4 ++-- .../retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt | 0 .../adapter/sse/java9/SseJucFlowCallAdapterFactory.kt | 0 .../adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java | 0 settings.gradle | 2 +- 6 files changed, 3 insertions(+), 9 deletions(-) rename retrofit-adapters/sse/{java9 => juc-flow}/build.gradle (84%) rename retrofit-adapters/sse/{java9 => juc-flow}/gradle.properties (56%) rename retrofit-adapters/sse/{java9 => juc-flow}/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt (100%) rename retrofit-adapters/sse/{java9 => juc-flow}/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt (100%) rename retrofit-adapters/sse/{java9 => juc-flow}/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java (100%) diff --git a/retrofit-adapters/sse/java9/build.gradle b/retrofit-adapters/sse/juc-flow/build.gradle similarity index 84% rename from retrofit-adapters/sse/java9/build.gradle rename to retrofit-adapters/sse/juc-flow/build.gradle index 62c07dba4a..3023be2936 100644 --- a/retrofit-adapters/sse/java9/build.gradle +++ b/retrofit-adapters/sse/juc-flow/build.gradle @@ -16,12 +16,6 @@ dependencies { testImplementation projects.retrofitConverters.scalars } -jar { - manifest { - attributes 'Automatic-Module-Name': 'retrofit2.adapter.sse-java9' - } -} - kotlin { jvmToolchain 9 } diff --git a/retrofit-adapters/sse/java9/gradle.properties b/retrofit-adapters/sse/juc-flow/gradle.properties similarity index 56% rename from retrofit-adapters/sse/java9/gradle.properties rename to retrofit-adapters/sse/juc-flow/gradle.properties index 56d6c7687f..ce4c81fe2a 100644 --- a/retrofit-adapters/sse/java9/gradle.properties +++ b/retrofit-adapters/sse/juc-flow/gradle.properties @@ -1,3 +1,3 @@ -POM_ARTIFACT_ID=adapter-sse-java9 -POM_NAME=Adapter: SSE Java 9 +POM_ARTIFACT_ID=adapter-sse-juc-flow +POM_NAME=Adapter: SSE JUC flow POM_DESCRIPTION=A Retrofit CallAdapter for server-sent event (SSE) with Java 9's Flow. diff --git a/retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt similarity index 100% rename from retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt rename to retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt diff --git a/retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt similarity index 100% rename from retrofit-adapters/sse/java9/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt rename to retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt diff --git a/retrofit-adapters/sse/java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java b/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java similarity index 100% rename from retrofit-adapters/sse/java9/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java rename to retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java diff --git a/settings.gradle b/settings.gradle index 55663d2385..073a23d176 100644 --- a/settings.gradle +++ b/settings.gradle @@ -31,7 +31,7 @@ include ':retrofit-adapters:rxjava2' include ':retrofit-adapters:rxjava3' include ':retrofit-adapters:scala' include ':retrofit-adapters:sse:core' -include ':retrofit-adapters:sse:java9' +include ':retrofit-adapters:sse:juc-flow' include ':retrofit-converters:gson' include ':retrofit-converters:guava' From 1cfe1722f8eb2f6fd70cd8cd0b3ed4e16e72e6a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=A8=E8=91=89=20Scarlet?= <93977077+mukjepscarlet@users.noreply.github.com> Date: Mon, 11 Aug 2025 21:58:43 +0800 Subject: [PATCH 09/21] SSE/Ktx: implementation --- retrofit-adapters/sse/juc-flow/build.gradle | 2 - retrofit-adapters/sse/ktx-flow/build.gradle | 16 +++ .../sse/ktx-flow/gradle.properties | 3 + .../sse/kotlinx/SseKtxFlowCallAdapter.kt | 100 ++++++++++++++++++ .../kotlinx/SseKtxFlowCallAdapterFactory.kt | 76 +++++++++++++ .../SseKtxFlowCallAdapterFactoryTest.kt | 82 ++++++++++++++ settings.gradle | 1 + 7 files changed, 278 insertions(+), 2 deletions(-) create mode 100644 retrofit-adapters/sse/ktx-flow/build.gradle create mode 100644 retrofit-adapters/sse/ktx-flow/gradle.properties create mode 100644 retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt create mode 100644 retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt create mode 100644 retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt diff --git a/retrofit-adapters/sse/juc-flow/build.gradle b/retrofit-adapters/sse/juc-flow/build.gradle index 3023be2936..2eac5c98d2 100644 --- a/retrofit-adapters/sse/juc-flow/build.gradle +++ b/retrofit-adapters/sse/juc-flow/build.gradle @@ -6,12 +6,10 @@ apply plugin: 'org.jetbrains.dokka' dependencies { api projects.retrofit api projects.retrofitAdapters.sse.core - api libs.okhttp.sse compileOnly libs.findBugsAnnotations testImplementation libs.junit testImplementation libs.truth - testImplementation libs.guava testImplementation libs.okhttp.mockwebserver testImplementation projects.retrofitConverters.scalars } diff --git a/retrofit-adapters/sse/ktx-flow/build.gradle b/retrofit-adapters/sse/ktx-flow/build.gradle new file mode 100644 index 0000000000..6ef4bdc7b1 --- /dev/null +++ b/retrofit-adapters/sse/ktx-flow/build.gradle @@ -0,0 +1,16 @@ +apply plugin: 'java-library' +apply plugin: 'org.jetbrains.kotlin.jvm' +apply plugin: 'com.vanniktech.maven.publish' +apply plugin: 'org.jetbrains.dokka' + +dependencies { + api projects.retrofit + api projects.retrofitAdapters.sse.core + api libs.kotlinx.coroutines + compileOnly libs.findBugsAnnotations + + testImplementation libs.junit + testImplementation libs.truth + testImplementation libs.okhttp.mockwebserver + testImplementation projects.retrofitConverters.scalars +} diff --git a/retrofit-adapters/sse/ktx-flow/gradle.properties b/retrofit-adapters/sse/ktx-flow/gradle.properties new file mode 100644 index 0000000000..96cd0fd4e1 --- /dev/null +++ b/retrofit-adapters/sse/ktx-flow/gradle.properties @@ -0,0 +1,3 @@ +POM_ARTIFACT_ID=adapter-sse-kotlinx-flow +POM_NAME=Adapter: SSE kotlinx flow +POM_DESCRIPTION=A Retrofit CallAdapter for server-sent event (SSE) with kotlinx-coroutine's Flow. diff --git a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt new file mode 100644 index 0000000000..e76687ebc6 --- /dev/null +++ b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.kotlinx + +import java.lang.reflect.Type +import java.util.concurrent.Executor +import java.util.concurrent.Executors +import java.util.concurrent.ForkJoinPool +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.channels.onFailure +import kotlinx.coroutines.channels.onSuccess +import kotlinx.coroutines.channels.trySendBlocking +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.flow +import okhttp3.ResponseBody +import okhttp3.ResponseBody.Companion.toResponseBody +import okhttp3.sse.EventSource +import okhttp3.sse.EventSourceListener +import okhttp3.sse.EventSources +import retrofit2.Call +import retrofit2.CallAdapter +import retrofit2.Callback +import retrofit2.Converter +import retrofit2.Response +import retrofit2.Retrofit +import retrofit2.adapter.sse.ServerSentEvent +import retrofit2.awaitResponse + +@Suppress("NOTHING_TO_INLINE") +private inline fun conversionError(value: T, type: Type): Nothing = + error("Failed to convert $value to $type, actual type is ${value.javaClass}") + +internal class SseKtxFlowCallAdapter( + private val idType: Type, + private val typeType: Type, + private val dataType: Type, + private val idConverter: Converter, + private val typeConverter: Converter, + private val dataConverter: Converter, + retrofit: Retrofit, +) : CallAdapter>> { + + override fun responseType(): Type = ResponseBody::class.java + + override fun adapt( + call: Call, + ): Flow> = callbackFlow { + val response = call.awaitResponse() + val okhttpResponse = response.raw().newBuilder().body(response.body() ?: error("Response body is null")).build() + + EventSources.processResponse( + okhttpResponse, + object : EventSourceListener() { + override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) { + val convertedId = convertId(id) + val convertedType = convertType(type) + val convertedData = convertData(data) + trySendBlocking(ServerSentEvent(convertedId, convertedType, convertedData)) + } + + override fun onClosed(eventSource: EventSource) { + close() + } + + override fun onFailure(eventSource: EventSource, t: Throwable?, response: okhttp3.Response?) { + close(t ?: RuntimeException()) // TODO + } + }, + ) + + awaitClose(call::cancel) + } + + private fun convertId(id: String?): ID? { + return if (id != null) idConverter.convert(id.toResponseBody()) ?: conversionError(id, idType) else null + } + + private fun convertType(type: String?): TYPE? { + return if (type != null) typeConverter.convert(type.toResponseBody()) ?: conversionError(type, typeType) else null + } + + private fun convertData(data: String): DATA { + return dataConverter.convert(data.toResponseBody()) ?: conversionError(data, dataType) + } + +} diff --git a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt new file mode 100644 index 0000000000..463a5a845b --- /dev/null +++ b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.kotlinx + +import java.lang.reflect.ParameterizedType +import java.lang.reflect.Type +import kotlinx.coroutines.flow.Flow +import retrofit2.CallAdapter +import retrofit2.Retrofit +import retrofit2.adapter.sse.ServerSentEvent +import retrofit2.http.Streaming + +private val EMPTY_ARRAY = emptyArray() + +object SseKtxFlowCallAdapterFactory : CallAdapter.Factory() { + + override fun get( + returnType: Type, + annotations: Array, + retrofit: Retrofit, + ): CallAdapter<*, *>? { + if (getRawType(returnType) != Flow::class.java) { + return null + } + + if (returnType !is ParameterizedType) { + error( + "Flow return type must be parameterized as Flow or Flow", + ) + } + + val innerType = getParameterUpperBound(0, returnType) + + if (getRawType(innerType) != ServerSentEvent::class.java) { + return null + } + + if (innerType !is ParameterizedType) { + error( + "ServerSentEvent must be parameterized as ServerSentEvent" + + " or ServerSentEvent", + ) + } + + if (annotations.none { it is Streaming }) { + error("SSE endpoint must be annotated with @Streaming") + } + + val idType = getParameterUpperBound(0, innerType) + val typeType = getParameterUpperBound(1, innerType) + val dataType = getParameterUpperBound(2, innerType) + + return SseKtxFlowCallAdapter( + idType, + typeType, + dataType, + retrofit.responseBodyConverter(idType, EMPTY_ARRAY), + retrofit.responseBodyConverter(typeType, EMPTY_ARRAY), + retrofit.responseBodyConverter(dataType, EMPTY_ARRAY), + retrofit, + ) + } +} diff --git a/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt b/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt new file mode 100644 index 0000000000..b8fced19e5 --- /dev/null +++ b/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.kotlinx + +import com.google.common.truth.Truth.assertThat +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.runBlocking +import okhttp3.mockwebserver.MockResponse +import okhttp3.mockwebserver.MockWebServer +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import retrofit2.Retrofit +import retrofit2.adapter.sse.ServerSentEvent +import retrofit2.converter.scalars.ScalarsConverterFactory +import retrofit2.create +import retrofit2.http.GET +import retrofit2.http.Streaming + +class SseKtxFlowCallAdapterFactoryTest { + + @Rule + @JvmField + val server: MockWebServer = MockWebServer() + + interface Service { + @Streaming + @GET("/") + fun sse(): Flow> + } + + private lateinit var service: Service + + @Before + fun setUp() { + val retrofit = + Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(ScalarsConverterFactory.create()) + .addCallAdapterFactory(SseKtxFlowCallAdapterFactory) + .build() + service = retrofit.create() + } + + @Test + fun simpleEvents() = runBlocking { + val mockResponse = MockResponse() + .setHeader("Content-Type", "text/event-stream") + .setBody("id: 1\nevent: type1\ndata: foo\n\nid: 2\ndata: bar\n\n") + server.enqueue(mockResponse) + + var count = 0 + service.sse().collect { serverSentEvent -> + when (++count) { + 1 -> { + assertThat(serverSentEvent.id).isEqualTo(1) + assertThat(serverSentEvent.type).isEqualTo("type1") + assertThat(serverSentEvent.data).isEqualTo("foo") + } + 2 -> { + assertThat(serverSentEvent.id).isEqualTo(2) + assertThat(serverSentEvent.type).isEqualTo(null) + assertThat(serverSentEvent.data).isEqualTo("bar") + } + } + } + } + +} diff --git a/settings.gradle b/settings.gradle index 073a23d176..6ab03cf3d9 100644 --- a/settings.gradle +++ b/settings.gradle @@ -32,6 +32,7 @@ include ':retrofit-adapters:rxjava3' include ':retrofit-adapters:scala' include ':retrofit-adapters:sse:core' include ':retrofit-adapters:sse:juc-flow' +include ':retrofit-adapters:sse:ktx-flow' include ':retrofit-converters:gson' include ':retrofit-converters:guava' From 07b44f9fb58ffea0eeaef28321b18b23594b7998 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=A8=E8=91=89=20Scarlet?= <93977077+mukjepscarlet@users.noreply.github.com> Date: Tue, 12 Aug 2025 00:16:43 +0800 Subject: [PATCH 10/21] Add base adapter --- .../sse/internal/AbstractSseCallAdapter.kt | 73 +++++++++++++++++++ .../sse/java9/SseJucFlowCallAdapter.kt | 42 ++--------- .../sse/java9/SseJucFlowCallAdapterFactory.kt | 9 +-- .../sse/kotlinx/SseKtxFlowCallAdapter.kt | 53 ++------------ .../kotlinx/SseKtxFlowCallAdapterFactory.kt | 9 +-- 5 files changed, 91 insertions(+), 95 deletions(-) create mode 100644 retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/AbstractSseCallAdapter.kt diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/AbstractSseCallAdapter.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/AbstractSseCallAdapter.kt new file mode 100644 index 0000000000..7584a0941c --- /dev/null +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/AbstractSseCallAdapter.kt @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.internal + +import java.lang.reflect.Type +import okhttp3.ResponseBody +import okhttp3.ResponseBody.Companion.toResponseBody +import okhttp3.sse.EventSource +import okhttp3.sse.EventSourceListener +import okhttp3.sse.EventSources +import retrofit2.CallAdapter +import retrofit2.Converter +import retrofit2.Response +import retrofit2.Retrofit +import retrofit2.adapter.sse.ServerSentEvent + +private val EMPTY_ARRAY = emptyArray() + +@Suppress("NOTHING_TO_INLINE") +private inline fun conversionError(value: T, type: Type): Nothing = + error("Failed to convert $value to $type, actual type is ${value.javaClass}") + +abstract class AbstractSseCallAdapter( + retrofit: Retrofit, + private val idType: Type, + private val typeType: Type, + private val dataType: Type, +) : CallAdapter { + + private val idConverter: Converter = retrofit.responseBodyConverter(idType, EMPTY_ARRAY) + private val typeConverter: Converter = retrofit.responseBodyConverter(typeType, EMPTY_ARRAY) + private val dataConverter: Converter = retrofit.responseBodyConverter(dataType, EMPTY_ARRAY) + + final override fun responseType(): Type = ResponseBody::class.java + + protected fun Response.asSse(listener: EventSourceListener) { + val okhttpResponse = raw().newBuilder().body(body() ?: error("Response body is null")).build() + EventSources.processResponse(okhttpResponse, listener) + } + + protected fun createTypedEvent(id: String?, type: String?, data: String): ServerSentEvent { + val convertedId = convertId(id) + val convertedType = convertType(type) + val convertedData = convertData(data) + return ServerSentEvent(convertedId, convertedType, convertedData) + } + + private fun convertId(id: String?): ID? { + return if (id != null) idConverter.convert(id.toResponseBody()) ?: conversionError(id, idType) else null + } + + private fun convertType(type: String?): TYPE? { + return if (type != null) typeConverter.convert(type.toResponseBody()) ?: conversionError(type, typeType) else null + } + + private fun convertData(data: String): DATA { + return dataConverter.convert(data.toResponseBody()) ?: conversionError(data, dataType) + } + +} diff --git a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt index 26498c7a0d..03df155895 100644 --- a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt +++ b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt @@ -22,38 +22,26 @@ import java.util.concurrent.Flow import java.util.concurrent.ForkJoinPool import java.util.concurrent.SubmissionPublisher import okhttp3.ResponseBody -import okhttp3.ResponseBody.Companion.toResponseBody import okhttp3.sse.EventSource import okhttp3.sse.EventSourceListener -import okhttp3.sse.EventSources import retrofit2.Call -import retrofit2.CallAdapter import retrofit2.Callback -import retrofit2.Converter import retrofit2.Retrofit import retrofit2.adapter.sse.ServerSentEvent - -@Suppress("NOTHING_TO_INLINE") -private inline fun conversionError(value: T, type: Type): Nothing = - error("Failed to convert $value to $type, actual type is ${value.javaClass}") +import retrofit2.adapter.sse.internal.AbstractSseCallAdapter internal class SseJucFlowCallAdapter( - private val idType: Type, - private val typeType: Type, - private val dataType: Type, - private val idConverter: Converter, - private val typeConverter: Converter, - private val dataConverter: Converter, retrofit: Retrofit, -) : CallAdapter>> { + idType: Type, + typeType: Type, + dataType: Type, +) : AbstractSseCallAdapter>>(retrofit, idType, typeType, dataType) { private val executor: Executor = retrofit.callbackExecutor() ?: ForkJoinPool.commonPool().takeIf { ForkJoinPool.getCommonPoolParallelism() > 1 } ?: Executors.newCachedThreadPool() - override fun responseType(): Type = ResponseBody::class.java - override fun adapt( call: Call, ): Flow.Publisher> = SsePublisher(call) @@ -76,14 +64,10 @@ internal class SseJucFlowCallAdapter( private val publisher: SubmissionPublisher>, ) : Callback { override fun onResponse(call: Call, response: retrofit2.Response) { - EventSources.processResponse( - response.raw().newBuilder().body(response.body() ?: error("Response body is null")).build(), + response.asSse( object : EventSourceListener() { override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) { - val convertedId = convertId(id) - val convertedType = convertType(type) - val convertedData = convertData(data) - publisher.submit(ServerSentEvent(convertedId, convertedType, convertedData)) + publisher.submit(createTypedEvent(id, type, data)) } override fun onClosed(eventSource: EventSource) { @@ -102,16 +86,4 @@ internal class SseJucFlowCallAdapter( } } - private fun convertId(id: String?): ID? { - return if (id != null) idConverter.convert(id.toResponseBody()) ?: conversionError(id, idType) else null - } - - private fun convertType(type: String?): TYPE? { - return if (type != null) typeConverter.convert(type.toResponseBody()) ?: conversionError(type, typeType) else null - } - - private fun convertData(data: String): DATA { - return dataConverter.convert(data.toResponseBody()) ?: conversionError(data, dataType) - } - } diff --git a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt index c87e69dfe5..6a1d9c4ba1 100644 --- a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt +++ b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt @@ -23,8 +23,6 @@ import retrofit2.Retrofit import retrofit2.adapter.sse.ServerSentEvent import retrofit2.http.Streaming -private val EMPTY_ARRAY = emptyArray() - object SseJucFlowCallAdapterFactory : CallAdapter.Factory() { override fun get( @@ -63,14 +61,11 @@ object SseJucFlowCallAdapterFactory : CallAdapter.Factory() { val typeType = getParameterUpperBound(1, innerType) val dataType = getParameterUpperBound(2, innerType) - return SseJucFlowCallAdapter( + return SseJucFlowCallAdapter( + retrofit, idType, typeType, dataType, - retrofit.responseBodyConverter(idType, EMPTY_ARRAY), - retrofit.responseBodyConverter(typeType, EMPTY_ARRAY), - retrofit.responseBodyConverter(dataType, EMPTY_ARRAY), - retrofit, ) } } diff --git a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt index e76687ebc6..9dae3c8aa1 100644 --- a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt +++ b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt @@ -16,60 +16,33 @@ package retrofit2.adapter.sse.kotlinx import java.lang.reflect.Type -import java.util.concurrent.Executor -import java.util.concurrent.Executors -import java.util.concurrent.ForkJoinPool import kotlinx.coroutines.channels.awaitClose -import kotlinx.coroutines.channels.onFailure -import kotlinx.coroutines.channels.onSuccess import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.callbackFlow -import kotlinx.coroutines.flow.flow import okhttp3.ResponseBody -import okhttp3.ResponseBody.Companion.toResponseBody import okhttp3.sse.EventSource import okhttp3.sse.EventSourceListener -import okhttp3.sse.EventSources import retrofit2.Call -import retrofit2.CallAdapter -import retrofit2.Callback -import retrofit2.Converter -import retrofit2.Response import retrofit2.Retrofit import retrofit2.adapter.sse.ServerSentEvent +import retrofit2.adapter.sse.internal.AbstractSseCallAdapter import retrofit2.awaitResponse -@Suppress("NOTHING_TO_INLINE") -private inline fun conversionError(value: T, type: Type): Nothing = - error("Failed to convert $value to $type, actual type is ${value.javaClass}") - internal class SseKtxFlowCallAdapter( - private val idType: Type, - private val typeType: Type, - private val dataType: Type, - private val idConverter: Converter, - private val typeConverter: Converter, - private val dataConverter: Converter, retrofit: Retrofit, -) : CallAdapter>> { - - override fun responseType(): Type = ResponseBody::class.java + idType: Type, + typeType: Type, + dataType: Type, +) : AbstractSseCallAdapter>>(retrofit, idType, typeType, dataType) { override fun adapt( call: Call, ): Flow> = callbackFlow { - val response = call.awaitResponse() - val okhttpResponse = response.raw().newBuilder().body(response.body() ?: error("Response body is null")).build() - - EventSources.processResponse( - okhttpResponse, + call.awaitResponse().asSse( object : EventSourceListener() { override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) { - val convertedId = convertId(id) - val convertedType = convertType(type) - val convertedData = convertData(data) - trySendBlocking(ServerSentEvent(convertedId, convertedType, convertedData)) + trySendBlocking(createTypedEvent(id, type, data)) } override fun onClosed(eventSource: EventSource) { @@ -85,16 +58,4 @@ internal class SseKtxFlowCallAdapter( awaitClose(call::cancel) } - private fun convertId(id: String?): ID? { - return if (id != null) idConverter.convert(id.toResponseBody()) ?: conversionError(id, idType) else null - } - - private fun convertType(type: String?): TYPE? { - return if (type != null) typeConverter.convert(type.toResponseBody()) ?: conversionError(type, typeType) else null - } - - private fun convertData(data: String): DATA { - return dataConverter.convert(data.toResponseBody()) ?: conversionError(data, dataType) - } - } diff --git a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt index 463a5a845b..9bd2607a3d 100644 --- a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt +++ b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt @@ -23,8 +23,6 @@ import retrofit2.Retrofit import retrofit2.adapter.sse.ServerSentEvent import retrofit2.http.Streaming -private val EMPTY_ARRAY = emptyArray() - object SseKtxFlowCallAdapterFactory : CallAdapter.Factory() { override fun get( @@ -63,14 +61,11 @@ object SseKtxFlowCallAdapterFactory : CallAdapter.Factory() { val typeType = getParameterUpperBound(1, innerType) val dataType = getParameterUpperBound(2, innerType) - return SseKtxFlowCallAdapter( + return SseKtxFlowCallAdapter( + retrofit, idType, typeType, dataType, - retrofit.responseBodyConverter(idType, EMPTY_ARRAY), - retrofit.responseBodyConverter(typeType, EMPTY_ARRAY), - retrofit.responseBodyConverter(dataType, EMPTY_ARRAY), - retrofit, ) } } From 23d10e7f3cca7a125234a8091650ac76f9cd97aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=A8=E8=91=89=20Scarlet?= <93977077+mukjepscarlet@users.noreply.github.com> Date: Tue, 12 Aug 2025 02:05:33 +0800 Subject: [PATCH 11/21] sse/juc: Make executor configurable --- .../adapter/sse/java9/SseJucFlowCallAdapter.kt | 4 ++-- .../sse/java9/SseJucFlowCallAdapterFactory.kt | 12 +++++++++++- .../sse/java9/SseJucFlowCallAdapterFactoryTest.java | 2 +- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt index 03df155895..95efd872be 100644 --- a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt +++ b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt @@ -31,14 +31,14 @@ import retrofit2.adapter.sse.ServerSentEvent import retrofit2.adapter.sse.internal.AbstractSseCallAdapter internal class SseJucFlowCallAdapter( + executor: Executor?, retrofit: Retrofit, idType: Type, typeType: Type, dataType: Type, ) : AbstractSseCallAdapter>>(retrofit, idType, typeType, dataType) { - private val executor: Executor = - retrofit.callbackExecutor() + private val executor: Executor = executor ?: retrofit.callbackExecutor() ?: ForkJoinPool.commonPool().takeIf { ForkJoinPool.getCommonPoolParallelism() > 1 } ?: Executors.newCachedThreadPool() diff --git a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt index 6a1d9c4ba1..3e77b4ccdc 100644 --- a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt +++ b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt @@ -17,13 +17,22 @@ package retrofit2.adapter.sse.java9 import java.lang.reflect.ParameterizedType import java.lang.reflect.Type +import java.util.concurrent.Executor import java.util.concurrent.Flow import retrofit2.CallAdapter import retrofit2.Retrofit import retrofit2.adapter.sse.ServerSentEvent import retrofit2.http.Streaming -object SseJucFlowCallAdapterFactory : CallAdapter.Factory() { +class SseJucFlowCallAdapterFactory private constructor( + private val executor: Executor?, +): CallAdapter.Factory() { + + companion object { + @JvmStatic + @JvmOverloads + fun create(executor: Executor? = null) = SseJucFlowCallAdapterFactory(executor) + } override fun get( returnType: Type, @@ -62,6 +71,7 @@ object SseJucFlowCallAdapterFactory : CallAdapter.Factory() { val dataType = getParameterUpperBound(2, innerType) return SseJucFlowCallAdapter( + executor, retrofit, idType, typeType, diff --git a/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java b/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java index 408cce5860..b8df1688bc 100644 --- a/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java +++ b/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java @@ -52,7 +52,7 @@ public void setUp() { new Retrofit.Builder() .baseUrl(server.url("/")) .addConverterFactory(ScalarsConverterFactory.create()) - .addCallAdapterFactory(SseJucFlowCallAdapterFactory.INSTANCE) + .addCallAdapterFactory(SseJucFlowCallAdapterFactory.create()) .build(); service = retrofit.create(Service.class); } From bff36d57f819f2a71497f22a642ba006d388efa2 Mon Sep 17 00:00:00 2001 From: MukjepScarlet <93977077+mukjepscarlet@users.noreply.github.com> Date: Tue, 12 Aug 2025 10:39:26 +0800 Subject: [PATCH 12/21] sse/kts: add other converters test --- retrofit-adapters/sse/ktx-flow/build.gradle | 1 + .../SseKtxFlowCallAdapterFactoryTest.kt | 26 +++++++++++++++---- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/retrofit-adapters/sse/ktx-flow/build.gradle b/retrofit-adapters/sse/ktx-flow/build.gradle index 6ef4bdc7b1..070c24a3da 100644 --- a/retrofit-adapters/sse/ktx-flow/build.gradle +++ b/retrofit-adapters/sse/ktx-flow/build.gradle @@ -13,4 +13,5 @@ dependencies { testImplementation libs.truth testImplementation libs.okhttp.mockwebserver testImplementation projects.retrofitConverters.scalars + testImplementation projects.retrofitConverters.gson } diff --git a/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt b/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt index b8fced19e5..eb7b85d06e 100644 --- a/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt +++ b/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt @@ -25,6 +25,7 @@ import org.junit.Rule import org.junit.Test import retrofit2.Retrofit import retrofit2.adapter.sse.ServerSentEvent +import retrofit2.converter.gson.GsonConverterFactory import retrofit2.converter.scalars.ScalarsConverterFactory import retrofit2.create import retrofit2.http.GET @@ -36,10 +37,14 @@ class SseKtxFlowCallAdapterFactoryTest { @JvmField val server: MockWebServer = MockWebServer() + data class EventData( + val data: String, + ) + interface Service { @Streaming @GET("/") - fun sse(): Flow> + fun sse(): Flow> } private lateinit var service: Service @@ -50,6 +55,7 @@ class SseKtxFlowCallAdapterFactoryTest { Retrofit.Builder() .baseUrl(server.url("/")) .addConverterFactory(ScalarsConverterFactory.create()) + .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(SseKtxFlowCallAdapterFactory) .build() service = retrofit.create() @@ -59,7 +65,17 @@ class SseKtxFlowCallAdapterFactoryTest { fun simpleEvents() = runBlocking { val mockResponse = MockResponse() .setHeader("Content-Type", "text/event-stream") - .setBody("id: 1\nevent: type1\ndata: foo\n\nid: 2\ndata: bar\n\n") + .setBody( + """ + |id: 1 + |event: TYPE1 + |data: {"data":"foo"} + | + |id: 2 + |data: {"data":"bar"} + | + """.trimMargin(), + ) server.enqueue(mockResponse) var count = 0 @@ -67,13 +83,13 @@ class SseKtxFlowCallAdapterFactoryTest { when (++count) { 1 -> { assertThat(serverSentEvent.id).isEqualTo(1) - assertThat(serverSentEvent.type).isEqualTo("type1") - assertThat(serverSentEvent.data).isEqualTo("foo") + assertThat(serverSentEvent.type).isEqualTo("TYPE1") + assertThat(serverSentEvent.data.data).isEqualTo("foo") } 2 -> { assertThat(serverSentEvent.id).isEqualTo(2) assertThat(serverSentEvent.type).isEqualTo(null) - assertThat(serverSentEvent.data).isEqualTo("bar") + assertThat(serverSentEvent.data.data).isEqualTo("bar") } } } From ca6bc13113ed31b80298e7f2ade14e617c0bd55c Mon Sep 17 00:00:00 2001 From: MukjepScarlet <93977077+mukjepscarlet@users.noreply.github.com> Date: Tue, 12 Aug 2025 10:59:48 +0800 Subject: [PATCH 13/21] spotless + move listener to abstract class --- .../sse/internal/AbstractSseCallAdapter.kt | 45 ++++++-- .../sse/java9/SseJucFlowCallAdapter.kt | 68 +++++------- .../sse/java9/SseJucFlowCallAdapterFactory.kt | 2 +- .../SseJucFlowCallAdapterFactoryTest.java | 101 +++++++++--------- .../sse/kotlinx/SseKtxFlowCallAdapter.kt | 38 +++---- .../SseKtxFlowCallAdapterFactoryTest.kt | 3 +- 6 files changed, 138 insertions(+), 119 deletions(-) diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/AbstractSseCallAdapter.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/AbstractSseCallAdapter.kt index 7584a0941c..b222fd2fdb 100644 --- a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/AbstractSseCallAdapter.kt +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/AbstractSseCallAdapter.kt @@ -21,6 +21,7 @@ import okhttp3.ResponseBody.Companion.toResponseBody import okhttp3.sse.EventSource import okhttp3.sse.EventSourceListener import okhttp3.sse.EventSources +import retrofit2.Call import retrofit2.CallAdapter import retrofit2.Converter import retrofit2.Response @@ -30,15 +31,14 @@ import retrofit2.adapter.sse.ServerSentEvent private val EMPTY_ARRAY = emptyArray() @Suppress("NOTHING_TO_INLINE") -private inline fun conversionError(value: T, type: Type): Nothing = - error("Failed to convert $value to $type, actual type is ${value.javaClass}") +private inline fun conversionError(value: T, type: Type): Nothing = error("Failed to convert $value to $type, actual type is ${value.javaClass}") -abstract class AbstractSseCallAdapter( +abstract class AbstractSseCallAdapter( retrofit: Retrofit, private val idType: Type, private val typeType: Type, private val dataType: Type, -) : CallAdapter { +) : CallAdapter { private val idConverter: Converter = retrofit.responseBodyConverter(idType, EMPTY_ARRAY) private val typeConverter: Converter = retrofit.responseBodyConverter(typeType, EMPTY_ARRAY) @@ -46,12 +46,40 @@ abstract class AbstractSseCallAdapter.asSse(listener: EventSourceListener) { + protected fun Call.attachEventSourceListener(builder: I) { + this.enqueue( + object : retrofit2.Callback { + override fun onResponse(call: Call, response: Response) { + response.asSse( + object : EventSourceListener() { + override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) { + emit(builder, createTypedEvent(id, type, data)) + } + + override fun onClosed(eventSource: EventSource) { + close(builder) + } + + override fun onFailure(eventSource: EventSource, t: Throwable?, response: okhttp3.Response?) { + closeExceptionally(builder, t ?: RuntimeException()) // TODO: exception type + } + }, + ) + } + + override fun onFailure(call: Call, t: Throwable) { + closeExceptionally(builder, t) + } + }, + ) + } + + private fun Response.asSse(listener: EventSourceListener) { val okhttpResponse = raw().newBuilder().body(body() ?: error("Response body is null")).build() EventSources.processResponse(okhttpResponse, listener) } - protected fun createTypedEvent(id: String?, type: String?, data: String): ServerSentEvent { + private fun createTypedEvent(id: String?, type: String?, data: String): ServerSentEvent { val convertedId = convertId(id) val convertedType = convertType(type) val convertedData = convertData(data) @@ -70,4 +98,9 @@ abstract class AbstractSseCallAdapter) + + protected abstract fun close(builder: I) + + protected abstract fun closeExceptionally(builder: I, t: Throwable) } diff --git a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt index 95efd872be..8d05f2d768 100644 --- a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt +++ b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt @@ -22,10 +22,7 @@ import java.util.concurrent.Flow import java.util.concurrent.ForkJoinPool import java.util.concurrent.SubmissionPublisher import okhttp3.ResponseBody -import okhttp3.sse.EventSource -import okhttp3.sse.EventSourceListener import retrofit2.Call -import retrofit2.Callback import retrofit2.Retrofit import retrofit2.adapter.sse.ServerSentEvent import retrofit2.adapter.sse.internal.AbstractSseCallAdapter @@ -36,54 +33,43 @@ internal class SseJucFlowCallAdapter( idType: Type, typeType: Type, dataType: Type, -) : AbstractSseCallAdapter>>(retrofit, idType, typeType, dataType) { +) : AbstractSseCallAdapter>, SubmissionPublisher>>(retrofit, idType, typeType, dataType) { private val executor: Executor = executor ?: retrofit.callbackExecutor() - ?: ForkJoinPool.commonPool().takeIf { ForkJoinPool.getCommonPoolParallelism() > 1 } - ?: Executors.newCachedThreadPool() + ?: ForkJoinPool.commonPool().takeIf { ForkJoinPool.getCommonPoolParallelism() > 1 } + ?: Executors.newCachedThreadPool() override fun adapt( call: Call, - ): Flow.Publisher> = SsePublisher(call) + ): Flow.Publisher> { + return object : SubmissionPublisher>(executor, Flow.defaultBufferSize()) { + override fun subscribe(subscriber: Flow.Subscriber>?) { + super.subscribe(subscriber) + call.attachEventSourceListener(this) + } - inner class SsePublisher( - private val call: Call, - ) : SubmissionPublisher>(executor, Flow.defaultBufferSize()) { - override fun subscribe(subscriber: Flow.Subscriber>?) { - super.subscribe(subscriber) - call.enqueue(PublisherCallback(this)) - } - - override fun close() { - call.cancel() - super.close() + override fun close() { + call.cancel() + super.close() + } } } - inner class PublisherCallback( - private val publisher: SubmissionPublisher>, - ) : Callback { - override fun onResponse(call: Call, response: retrofit2.Response) { - response.asSse( - object : EventSourceListener() { - override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) { - publisher.submit(createTypedEvent(id, type, data)) - } - - override fun onClosed(eventSource: EventSource) { - publisher.close() - } - - override fun onFailure(eventSource: EventSource, t: Throwable?, response: okhttp3.Response?) { - publisher.closeExceptionally(t ?: RuntimeException()) // TODO - } - }, - ) - } + override fun emit( + builder: SubmissionPublisher>, + event: ServerSentEvent, + ) { + builder.submit(event) + } - override fun onFailure(call: Call, t: Throwable) { - publisher.closeExceptionally(t) - } + override fun close(builder: SubmissionPublisher>) { + builder.close() } + override fun closeExceptionally( + builder: SubmissionPublisher>, + t: Throwable, + ) { + builder.closeExceptionally(t) + } } diff --git a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt index 3e77b4ccdc..0ad8e80237 100644 --- a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt +++ b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt @@ -26,7 +26,7 @@ import retrofit2.http.Streaming class SseJucFlowCallAdapterFactory private constructor( private val executor: Executor?, -): CallAdapter.Factory() { +) : CallAdapter.Factory() { companion object { @JvmStatic diff --git a/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java b/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java index b8df1688bc..8f018b3b52 100644 --- a/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java +++ b/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java @@ -17,6 +17,10 @@ import static com.google.common.truth.Truth.assertThat; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import org.junit.Before; @@ -28,15 +32,9 @@ import retrofit2.http.GET; import retrofit2.http.Streaming; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Flow; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - public final class SseJucFlowCallAdapterFactoryTest { - @Rule - public final MockWebServer server = new MockWebServer(); + @Rule public final MockWebServer server = new MockWebServer(); interface Service { @Streaming @@ -49,59 +47,62 @@ interface Service { @Before public void setUp() { Retrofit retrofit = - new Retrofit.Builder() - .baseUrl(server.url("/")) - .addConverterFactory(ScalarsConverterFactory.create()) - .addCallAdapterFactory(SseJucFlowCallAdapterFactory.create()) - .build(); + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(ScalarsConverterFactory.create()) + .addCallAdapterFactory(SseJucFlowCallAdapterFactory.create()) + .build(); service = retrofit.create(Service.class); } @Test public void simpleEvents() throws Exception { - MockResponse mockResponse = new MockResponse() - .setHeader("Content-Type", "text/event-stream") - .setBody("id: 1\nevent: type1\ndata: foo\n\nid: 2\ndata: bar\n\n"); + MockResponse mockResponse = + new MockResponse() + .setHeader("Content-Type", "text/event-stream") + .setBody("id: 1\nevent: type1\ndata: foo\n\nid: 2\ndata: bar\n\n"); server.enqueue(mockResponse); CompletableFuture completableFuture = new CompletableFuture<>(); - service.sse().subscribe(new Flow.Subscriber<>() { - private final AtomicInteger count = new AtomicInteger(0); - - @Override - public void onSubscribe(Flow.Subscription subscription) { - subscription.request(2); - } - - @Override - public void onNext(ServerSentEvent serverSentEvent) { - switch (count.incrementAndGet()) { - case 1: - assertThat(serverSentEvent.id()).isEqualTo(1); - assertThat(serverSentEvent.type()).isEqualTo("type1"); - assertThat(serverSentEvent.data()).isEqualTo("foo"); - break; - case 2: - assertThat(serverSentEvent.id()).isEqualTo(2); - assertThat(serverSentEvent.type()).isEqualTo(null); - assertThat(serverSentEvent.data()).isEqualTo("bar"); - break; - } - } - - @Override - public void onError(Throwable throwable) { - completableFuture.completeExceptionally(throwable); - } - - @Override - public void onComplete() { - completableFuture.complete(null); - } - }); + service + .sse() + .subscribe( + new Flow.Subscriber<>() { + private final AtomicInteger count = new AtomicInteger(0); + + @Override + public void onSubscribe(Flow.Subscription subscription) { + subscription.request(2); + } + + @Override + public void onNext(ServerSentEvent serverSentEvent) { + switch (count.incrementAndGet()) { + case 1: + assertThat(serverSentEvent.id()).isEqualTo(1); + assertThat(serverSentEvent.type()).isEqualTo("type1"); + assertThat(serverSentEvent.data()).isEqualTo("foo"); + break; + case 2: + assertThat(serverSentEvent.id()).isEqualTo(2); + assertThat(serverSentEvent.type()).isEqualTo(null); + assertThat(serverSentEvent.data()).isEqualTo("bar"); + break; + } + } + + @Override + public void onError(Throwable throwable) { + completableFuture.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + completableFuture.complete(null); + } + }); completableFuture.get(5, TimeUnit.SECONDS); } - } diff --git a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt index 9dae3c8aa1..ff3884799d 100644 --- a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt +++ b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt @@ -16,46 +16,46 @@ package retrofit2.adapter.sse.kotlinx import java.lang.reflect.Type +import kotlinx.coroutines.channels.ProducerScope import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.callbackFlow import okhttp3.ResponseBody -import okhttp3.sse.EventSource -import okhttp3.sse.EventSourceListener import retrofit2.Call import retrofit2.Retrofit import retrofit2.adapter.sse.ServerSentEvent import retrofit2.adapter.sse.internal.AbstractSseCallAdapter -import retrofit2.awaitResponse internal class SseKtxFlowCallAdapter( retrofit: Retrofit, idType: Type, typeType: Type, dataType: Type, -) : AbstractSseCallAdapter>>(retrofit, idType, typeType, dataType) { +) : AbstractSseCallAdapter>, ProducerScope>>(retrofit, idType, typeType, dataType) { override fun adapt( call: Call, ): Flow> = callbackFlow { - call.awaitResponse().asSse( - object : EventSourceListener() { - override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) { - trySendBlocking(createTypedEvent(id, type, data)) - } - - override fun onClosed(eventSource: EventSource) { - close() - } + call.attachEventSourceListener(this) + awaitClose(call::cancel) + } - override fun onFailure(eventSource: EventSource, t: Throwable?, response: okhttp3.Response?) { - close(t ?: RuntimeException()) // TODO - } - }, - ) + override fun emit( + builder: ProducerScope>, + event: ServerSentEvent, + ) { + builder.trySendBlocking(event) + } - awaitClose(call::cancel) + override fun close(builder: ProducerScope>) { + builder.close() } + override fun closeExceptionally( + builder: ProducerScope>, + t: Throwable, + ) { + builder.close(t) + } } diff --git a/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt b/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt index eb7b85d06e..bb1be88c82 100644 --- a/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt +++ b/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt @@ -74,7 +74,7 @@ class SseKtxFlowCallAdapterFactoryTest { |id: 2 |data: {"data":"bar"} | - """.trimMargin(), + """.trimMargin(), ) server.enqueue(mockResponse) @@ -94,5 +94,4 @@ class SseKtxFlowCallAdapterFactoryTest { } } } - } From 18aa33d03b5ce115590dde323f1f3b6f29d0e4eb Mon Sep 17 00:00:00 2001 From: MukjepScarlet <93977077+mukjepscarlet@users.noreply.github.com> Date: Tue, 12 Aug 2025 11:40:16 +0800 Subject: [PATCH 14/21] sse/core: callback style adapter --- retrofit-adapters/sse/core/build.gradle | 10 +- .../java/retrofit2/adapter/sse/EventSource.kt | 27 +++++ .../adapter/sse/EventSourceCallAdapter.kt | 44 +++++++ .../sse/EventSourceCallAdapterFactory.kt | 57 +++++++++ .../java/retrofit2/adapter/sse/SseCallback.kt | 31 +++++ .../adapter/sse/internal/RealEventSource.kt | 97 +++++++++++++++ .../EventSourceCallAdapterFactoryTest.java | 114 ++++++++++++++++++ 7 files changed, 374 insertions(+), 6 deletions(-) create mode 100644 retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSource.kt create mode 100644 retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapter.kt create mode 100644 retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt create mode 100644 retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/SseCallback.kt create mode 100644 retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/RealEventSource.kt create mode 100644 retrofit-adapters/sse/core/src/test/java/retrofit2/adapter/sse/EventSourceCallAdapterFactoryTest.java diff --git a/retrofit-adapters/sse/core/build.gradle b/retrofit-adapters/sse/core/build.gradle index c08cb69739..b6f72dc012 100644 --- a/retrofit-adapters/sse/core/build.gradle +++ b/retrofit-adapters/sse/core/build.gradle @@ -8,12 +8,10 @@ dependencies { api libs.okhttp.sse compileOnly libs.findBugsAnnotations -// testImplementation libs.junit -// testImplementation libs.truth -// testImplementation libs.guava -// testImplementation libs.okhttp.mockwebserver -// testImplementation projects.retrofitConverters.scalars -// testImplementation projects.retrofitConverters.gson + testImplementation libs.junit + testImplementation libs.truth + testImplementation libs.okhttp.mockwebserver + testImplementation projects.retrofitConverters.scalars } jar { diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSource.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSource.kt new file mode 100644 index 0000000000..c9d741f78b --- /dev/null +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSource.kt @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse + +import okhttp3.Request + +interface EventSource { + + fun request(): Request + + fun cancel() + + fun subscribe(callback: SseCallback) +} diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapter.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapter.kt new file mode 100644 index 0000000000..67a5232f0d --- /dev/null +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapter.kt @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse + +import java.lang.reflect.Type +import okhttp3.ResponseBody +import retrofit2.Call +import retrofit2.CallAdapter +import retrofit2.Converter +import retrofit2.Retrofit +import retrofit2.adapter.sse.internal.RealEventSource + +private val EMPTY_ARRAY = emptyArray() + +internal class EventSourceCallAdapter( + retrofit: Retrofit, + private val idType: Type, + private val typeType: Type, + private val dataType: Type, +) : CallAdapter> { + + private val idConverter: Converter = retrofit.responseBodyConverter(idType, EMPTY_ARRAY) + private val typeConverter: Converter = retrofit.responseBodyConverter(typeType, EMPTY_ARRAY) + private val dataConverter: Converter = retrofit.responseBodyConverter(dataType, EMPTY_ARRAY) + + override fun responseType(): Type = ResponseBody::class.java + + override fun adapt(call: Call): EventSource { + return RealEventSource(idType, typeType, dataType, idConverter, typeConverter, dataConverter, call) + } +} diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt new file mode 100644 index 0000000000..ceee5121aa --- /dev/null +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse + +import java.lang.reflect.ParameterizedType +import java.lang.reflect.Type +import retrofit2.CallAdapter +import retrofit2.Retrofit +import retrofit2.http.Streaming + +object EventSourceCallAdapterFactory : CallAdapter.Factory() { + + override fun get( + returnType: Type, + annotations: Array, + retrofit: Retrofit, + ): CallAdapter<*, *>? { + if (getRawType(returnType) != EventSource::class.java) { + return null + } + + if (returnType !is ParameterizedType) { + error( + "EventSource return type must be parameterized as EventSource" + + " or EventSource", + ) + } + + if (annotations.none { it is Streaming }) { + error("SSE endpoint must be annotated with @Streaming") + } + + val idType = getParameterUpperBound(0, returnType) + val typeType = getParameterUpperBound(1, returnType) + val dataType = getParameterUpperBound(2, returnType) + + return EventSourceCallAdapter( + retrofit, + idType, + typeType, + dataType, + ) + } +} diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/SseCallback.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/SseCallback.kt new file mode 100644 index 0000000000..722c85b82b --- /dev/null +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/SseCallback.kt @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse + +interface SseCallback { + + fun onOpen(eventSource: EventSource) { + } + + fun onEvent(eventSource: EventSource, id: ID?, type: TYPE?, data: DATA) { + } + + fun onClosed(eventSource: EventSource) { + } + + fun onFailure(eventSource: EventSource, t: Throwable?) { + } +} diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/RealEventSource.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/RealEventSource.kt new file mode 100644 index 0000000000..dd5c1e2e38 --- /dev/null +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/RealEventSource.kt @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse.internal + +import java.lang.reflect.Type +import okhttp3.Request +import okhttp3.ResponseBody +import okhttp3.ResponseBody.Companion.toResponseBody +import okhttp3.sse.EventSourceListener +import okhttp3.sse.EventSources +import retrofit2.Call +import retrofit2.Converter +import retrofit2.Response +import retrofit2.adapter.sse.EventSource +import retrofit2.adapter.sse.SseCallback + +@Suppress("NOTHING_TO_INLINE") +private inline fun conversionError(value: T, type: Type): Nothing = error("Failed to convert $value to $type, actual type is ${value.javaClass}") + +private fun Response.asSse(listener: EventSourceListener) { + val okhttpResponse = raw().newBuilder().body(body() ?: error("Response body is null")).build() + EventSources.processResponse(okhttpResponse, listener) +} + +internal class RealEventSource( + private val idType: Type, + private val typeType: Type, + private val dataType: Type, + private val idConverter: Converter, + private val typeConverter: Converter, + private val dataConverter: Converter, + private val call: Call, +) : EventSource { + override fun request(): Request = call.request() + + override fun cancel() = call.cancel() + + override fun subscribe(callback: SseCallback) { + call.enqueue( + object : retrofit2.Callback { + override fun onResponse(call: Call, response: Response) { + response.asSse( + object : EventSourceListener() { + override fun onOpen(eventSource: okhttp3.sse.EventSource, response: okhttp3.Response) { + callback.onOpen(this@RealEventSource) + } + + override fun onEvent(eventSource: okhttp3.sse.EventSource, id: String?, type: String?, data: String) { + val convertedId = convertId(id) + val convertedType = convertType(type) + val convertedData = convertData(data) + callback.onEvent(this@RealEventSource, convertedId, convertedType, convertedData) + } + + override fun onClosed(eventSource: okhttp3.sse.EventSource) { + callback.onClosed(this@RealEventSource) + } + + override fun onFailure(eventSource: okhttp3.sse.EventSource, t: Throwable?, response: okhttp3.Response?) { + callback.onFailure(this@RealEventSource, t) + } + }, + ) + } + + override fun onFailure(call: Call, t: Throwable) { + callback.onFailure(this@RealEventSource, t) + } + }, + ) + } + + private fun convertId(id: String?): ID? { + return if (id != null) idConverter.convert(id.toResponseBody()) ?: conversionError(id, idType) else null + } + + private fun convertType(type: String?): TYPE? { + return if (type != null) typeConverter.convert(type.toResponseBody()) ?: conversionError(type, typeType) else null + } + + private fun convertData(data: String): DATA { + return dataConverter.convert(data.toResponseBody()) ?: conversionError(data, dataType) + } +} diff --git a/retrofit-adapters/sse/core/src/test/java/retrofit2/adapter/sse/EventSourceCallAdapterFactoryTest.java b/retrofit-adapters/sse/core/src/test/java/retrofit2/adapter/sse/EventSourceCallAdapterFactoryTest.java new file mode 100644 index 0000000000..560670a736 --- /dev/null +++ b/retrofit-adapters/sse/core/src/test/java/retrofit2/adapter/sse/EventSourceCallAdapterFactoryTest.java @@ -0,0 +1,114 @@ +/* + * Copyright (C) 2017 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.sse; + +import static com.google.common.truth.Truth.assertThat; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Retrofit; +import retrofit2.converter.scalars.ScalarsConverterFactory; +import retrofit2.http.GET; +import retrofit2.http.Streaming; + +public class EventSourceCallAdapterFactoryTest { + + @Rule public final MockWebServer server = new MockWebServer(); + + interface Service { + @Streaming + @GET("/") + EventSource sse(); + } + + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(ScalarsConverterFactory.create()) + .addCallAdapterFactory(EventSourceCallAdapterFactory.INSTANCE) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void simpleEvents() throws Exception { + MockResponse mockResponse = + new MockResponse() + .setHeader("Content-Type", "text/event-stream") + .setBody("id: 1\nevent: type1\ndata: foo\n\nid: 2\ndata: bar\n\n"); + server.enqueue(mockResponse); + + CompletableFuture completableFuture = new CompletableFuture<>(); + + service + .sse() + .subscribe( + new SseCallback() { + private final AtomicInteger count = new AtomicInteger(0); + + @Override + public void onOpen(@NotNull EventSource eventSource) { + assertThat(count.get()).isEqualTo(0); + } + + @Override + public void onEvent( + @NotNull EventSource eventSource, + @Nullable Integer id, + @Nullable String type, + @NotNull String data) { + switch (count.incrementAndGet()) { + case 1: + assertThat(id).isEqualTo(1); + assertThat(type).isEqualTo("type1"); + assertThat(data).isEqualTo("foo"); + break; + case 2: + assertThat(id).isEqualTo(2); + assertThat(type).isEqualTo(null); + assertThat(data).isEqualTo("bar"); + break; + } + } + + @Override + public void onClosed(@NotNull EventSource eventSource) { + completableFuture.complete(null); + } + + @Override + public void onFailure( + @NotNull EventSource eventSource, + @Nullable Throwable t) { + completableFuture.completeExceptionally(t); + } + }); + + completableFuture.get(5, TimeUnit.SECONDS); + } +} From 63028d2ce001b3fa23637b9f8f74405a7a6ce07e Mon Sep 17 00:00:00 2001 From: MukjepScarlet <93977077+mukjepscarlet@users.noreply.github.com> Date: Tue, 12 Aug 2025 13:03:56 +0800 Subject: [PATCH 15/21] move abstract to delegation --- .../sse/EventSourceCallAdapterFactory.kt | 1 + .../sse/internal/AbstractSseCallAdapter.kt | 106 ------------------ .../{ => internal}/EventSourceCallAdapter.kt | 6 +- .../sse/java9/SseJucFlowCallAdapter.kt | 55 +++++---- .../sse/kotlinx/SseKtxFlowCallAdapter.kt | 59 ++++++---- 5 files changed, 76 insertions(+), 151 deletions(-) delete mode 100644 retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/AbstractSseCallAdapter.kt rename retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/{ => internal}/EventSourceCallAdapter.kt (90%) diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt index ceee5121aa..6384a3a100 100644 --- a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt @@ -19,6 +19,7 @@ import java.lang.reflect.ParameterizedType import java.lang.reflect.Type import retrofit2.CallAdapter import retrofit2.Retrofit +import retrofit2.adapter.sse.internal.EventSourceCallAdapter import retrofit2.http.Streaming object EventSourceCallAdapterFactory : CallAdapter.Factory() { diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/AbstractSseCallAdapter.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/AbstractSseCallAdapter.kt deleted file mode 100644 index b222fd2fdb..0000000000 --- a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/AbstractSseCallAdapter.kt +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright (C) 2017 Square, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package retrofit2.adapter.sse.internal - -import java.lang.reflect.Type -import okhttp3.ResponseBody -import okhttp3.ResponseBody.Companion.toResponseBody -import okhttp3.sse.EventSource -import okhttp3.sse.EventSourceListener -import okhttp3.sse.EventSources -import retrofit2.Call -import retrofit2.CallAdapter -import retrofit2.Converter -import retrofit2.Response -import retrofit2.Retrofit -import retrofit2.adapter.sse.ServerSentEvent - -private val EMPTY_ARRAY = emptyArray() - -@Suppress("NOTHING_TO_INLINE") -private inline fun conversionError(value: T, type: Type): Nothing = error("Failed to convert $value to $type, actual type is ${value.javaClass}") - -abstract class AbstractSseCallAdapter( - retrofit: Retrofit, - private val idType: Type, - private val typeType: Type, - private val dataType: Type, -) : CallAdapter { - - private val idConverter: Converter = retrofit.responseBodyConverter(idType, EMPTY_ARRAY) - private val typeConverter: Converter = retrofit.responseBodyConverter(typeType, EMPTY_ARRAY) - private val dataConverter: Converter = retrofit.responseBodyConverter(dataType, EMPTY_ARRAY) - - final override fun responseType(): Type = ResponseBody::class.java - - protected fun Call.attachEventSourceListener(builder: I) { - this.enqueue( - object : retrofit2.Callback { - override fun onResponse(call: Call, response: Response) { - response.asSse( - object : EventSourceListener() { - override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) { - emit(builder, createTypedEvent(id, type, data)) - } - - override fun onClosed(eventSource: EventSource) { - close(builder) - } - - override fun onFailure(eventSource: EventSource, t: Throwable?, response: okhttp3.Response?) { - closeExceptionally(builder, t ?: RuntimeException()) // TODO: exception type - } - }, - ) - } - - override fun onFailure(call: Call, t: Throwable) { - closeExceptionally(builder, t) - } - }, - ) - } - - private fun Response.asSse(listener: EventSourceListener) { - val okhttpResponse = raw().newBuilder().body(body() ?: error("Response body is null")).build() - EventSources.processResponse(okhttpResponse, listener) - } - - private fun createTypedEvent(id: String?, type: String?, data: String): ServerSentEvent { - val convertedId = convertId(id) - val convertedType = convertType(type) - val convertedData = convertData(data) - return ServerSentEvent(convertedId, convertedType, convertedData) - } - - private fun convertId(id: String?): ID? { - return if (id != null) idConverter.convert(id.toResponseBody()) ?: conversionError(id, idType) else null - } - - private fun convertType(type: String?): TYPE? { - return if (type != null) typeConverter.convert(type.toResponseBody()) ?: conversionError(type, typeType) else null - } - - private fun convertData(data: String): DATA { - return dataConverter.convert(data.toResponseBody()) ?: conversionError(data, dataType) - } - - protected abstract fun emit(builder: I, event: ServerSentEvent) - - protected abstract fun close(builder: I) - - protected abstract fun closeExceptionally(builder: I, t: Throwable) -} diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapter.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/EventSourceCallAdapter.kt similarity index 90% rename from retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapter.kt rename to retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/EventSourceCallAdapter.kt index 67a5232f0d..b521207bb2 100644 --- a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapter.kt +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/EventSourceCallAdapter.kt @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package retrofit2.adapter.sse +package retrofit2.adapter.sse.internal import java.lang.reflect.Type import okhttp3.ResponseBody @@ -21,11 +21,11 @@ import retrofit2.Call import retrofit2.CallAdapter import retrofit2.Converter import retrofit2.Retrofit -import retrofit2.adapter.sse.internal.RealEventSource +import retrofit2.adapter.sse.EventSource private val EMPTY_ARRAY = emptyArray() -internal class EventSourceCallAdapter( +class EventSourceCallAdapter( retrofit: Retrofit, private val idType: Type, private val typeType: Type, diff --git a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt index 8d05f2d768..2b0def5459 100644 --- a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt +++ b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt @@ -23,9 +23,12 @@ import java.util.concurrent.ForkJoinPool import java.util.concurrent.SubmissionPublisher import okhttp3.ResponseBody import retrofit2.Call +import retrofit2.CallAdapter import retrofit2.Retrofit +import retrofit2.adapter.sse.EventSource import retrofit2.adapter.sse.ServerSentEvent -import retrofit2.adapter.sse.internal.AbstractSseCallAdapter +import retrofit2.adapter.sse.SseCallback +import retrofit2.adapter.sse.internal.EventSourceCallAdapter internal class SseJucFlowCallAdapter( executor: Executor?, @@ -33,7 +36,16 @@ internal class SseJucFlowCallAdapter( idType: Type, typeType: Type, dataType: Type, -) : AbstractSseCallAdapter>, SubmissionPublisher>>(retrofit, idType, typeType, dataType) { +) : CallAdapter>> { + + private val delegate = EventSourceCallAdapter( + retrofit, + idType, + typeType, + dataType, + ) + + override fun responseType(): Type = delegate.responseType() private val executor: Executor = executor ?: retrofit.callbackExecutor() ?: ForkJoinPool.commonPool().takeIf { ForkJoinPool.getCommonPoolParallelism() > 1 } @@ -42,34 +54,35 @@ internal class SseJucFlowCallAdapter( override fun adapt( call: Call, ): Flow.Publisher> { + val delegate = delegate.adapt(call) return object : SubmissionPublisher>(executor, Flow.defaultBufferSize()) { override fun subscribe(subscriber: Flow.Subscriber>?) { super.subscribe(subscriber) - call.attachEventSourceListener(this) + delegate.subscribe(object : SseCallback { + override fun onEvent( + eventSource: EventSource, + id: ID?, + type: TYPE?, + data: DATA, + ) { + submit(ServerSentEvent(id, type, data)) + } + + override fun onClosed(eventSource: EventSource) { + close() + } + + override fun onFailure(eventSource: EventSource, t: Throwable?) { + closeExceptionally(t ?: RuntimeException()) // TODO exception type + } + }) } override fun close() { - call.cancel() + delegate.cancel() super.close() } } } - override fun emit( - builder: SubmissionPublisher>, - event: ServerSentEvent, - ) { - builder.submit(event) - } - - override fun close(builder: SubmissionPublisher>) { - builder.close() - } - - override fun closeExceptionally( - builder: SubmissionPublisher>, - t: Throwable, - ) { - builder.closeExceptionally(t) - } } diff --git a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt index ff3884799d..a07ad6b219 100644 --- a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt +++ b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt @@ -16,46 +16,63 @@ package retrofit2.adapter.sse.kotlinx import java.lang.reflect.Type -import kotlinx.coroutines.channels.ProducerScope import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.callbackFlow import okhttp3.ResponseBody import retrofit2.Call +import retrofit2.CallAdapter import retrofit2.Retrofit +import retrofit2.adapter.sse.EventSource import retrofit2.adapter.sse.ServerSentEvent -import retrofit2.adapter.sse.internal.AbstractSseCallAdapter +import retrofit2.adapter.sse.SseCallback +import retrofit2.adapter.sse.internal.EventSourceCallAdapter internal class SseKtxFlowCallAdapter( retrofit: Retrofit, idType: Type, typeType: Type, dataType: Type, -) : AbstractSseCallAdapter>, ProducerScope>>(retrofit, idType, typeType, dataType) { +) : CallAdapter>> { + + private val delegate = EventSourceCallAdapter( + retrofit, + idType, + typeType, + dataType, + ) + + override fun responseType(): Type = delegate.responseType() override fun adapt( call: Call, - ): Flow> = callbackFlow { - call.attachEventSourceListener(this) - awaitClose(call::cancel) - } + ): Flow> { + val delegate = delegate.adapt(call) + return callbackFlow { + delegate.subscribe( + object : SseCallback { + override fun onEvent( + eventSource: EventSource, + id: ID?, + type: TYPE?, + data: DATA, + ) { + trySendBlocking(ServerSentEvent(id, type, data)) + } - override fun emit( - builder: ProducerScope>, - event: ServerSentEvent, - ) { - builder.trySendBlocking(event) - } + override fun onClosed(eventSource: EventSource) { + close() + } - override fun close(builder: ProducerScope>) { - builder.close() - } + override fun onFailure(eventSource: EventSource, t: Throwable?) { + close(t ?: RuntimeException()) // TODO exception type + } + }, + ) - override fun closeExceptionally( - builder: ProducerScope>, - t: Throwable, - ) { - builder.close(t) + awaitClose(call::cancel) + } } + } From 22f07cdd040cea8d5dc10c76ccce1ced7e77b817 Mon Sep 17 00:00:00 2001 From: MukjepScarlet <93977077+mukjepscarlet@users.noreply.github.com> Date: Tue, 12 Aug 2025 13:14:18 +0800 Subject: [PATCH 16/21] sse/juc: allow set maxBufferCapacity --- .../retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt | 3 ++- .../adapter/sse/java9/SseJucFlowCallAdapterFactory.kt | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt index 2b0def5459..d540c23541 100644 --- a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt +++ b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt @@ -32,6 +32,7 @@ import retrofit2.adapter.sse.internal.EventSourceCallAdapter internal class SseJucFlowCallAdapter( executor: Executor?, + private val maxBufferCapacity: Int, retrofit: Retrofit, idType: Type, typeType: Type, @@ -55,7 +56,7 @@ internal class SseJucFlowCallAdapter( call: Call, ): Flow.Publisher> { val delegate = delegate.adapt(call) - return object : SubmissionPublisher>(executor, Flow.defaultBufferSize()) { + return object : SubmissionPublisher>(executor, maxBufferCapacity) { override fun subscribe(subscriber: Flow.Subscriber>?) { super.subscribe(subscriber) delegate.subscribe(object : SseCallback { diff --git a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt index 0ad8e80237..3b3f357414 100644 --- a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt +++ b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt @@ -26,12 +26,14 @@ import retrofit2.http.Streaming class SseJucFlowCallAdapterFactory private constructor( private val executor: Executor?, + private val maxBufferCapacity: Int, ) : CallAdapter.Factory() { companion object { @JvmStatic @JvmOverloads - fun create(executor: Executor? = null) = SseJucFlowCallAdapterFactory(executor) + fun create(executor: Executor? = null, maxBufferCapacity: Int = Flow.defaultBufferSize()) = + SseJucFlowCallAdapterFactory(executor, maxBufferCapacity) } override fun get( @@ -72,6 +74,7 @@ class SseJucFlowCallAdapterFactory private constructor( return SseJucFlowCallAdapter( executor, + maxBufferCapacity, retrofit, idType, typeType, From 03ef06e5091c49819f372650a966238f50296426 Mon Sep 17 00:00:00 2001 From: MukjepScarlet <93977077+mukjepscarlet@users.noreply.github.com> Date: Tue, 12 Aug 2025 14:31:21 +0800 Subject: [PATCH 17/21] requires GET and Streaming on SSE adapters --- .../retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt | 5 +++++ .../adapter/sse/java9/SseJucFlowCallAdapterFactory.kt | 5 +++++ .../adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt | 5 +++++ 3 files changed, 15 insertions(+) diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt index 6384a3a100..79b1ef2f72 100644 --- a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt @@ -20,6 +20,7 @@ import java.lang.reflect.Type import retrofit2.CallAdapter import retrofit2.Retrofit import retrofit2.adapter.sse.internal.EventSourceCallAdapter +import retrofit2.http.GET import retrofit2.http.Streaming object EventSourceCallAdapterFactory : CallAdapter.Factory() { @@ -44,6 +45,10 @@ object EventSourceCallAdapterFactory : CallAdapter.Factory() { error("SSE endpoint must be annotated with @Streaming") } + if (annotations.none { it is GET }) { + error("SSE endpoint must use @GET method") + } + val idType = getParameterUpperBound(0, returnType) val typeType = getParameterUpperBound(1, returnType) val dataType = getParameterUpperBound(2, returnType) diff --git a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt index 3b3f357414..495e3570c0 100644 --- a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt +++ b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt @@ -22,6 +22,7 @@ import java.util.concurrent.Flow import retrofit2.CallAdapter import retrofit2.Retrofit import retrofit2.adapter.sse.ServerSentEvent +import retrofit2.http.GET import retrofit2.http.Streaming class SseJucFlowCallAdapterFactory private constructor( @@ -68,6 +69,10 @@ class SseJucFlowCallAdapterFactory private constructor( error("SSE endpoint must be annotated with @Streaming") } + if (annotations.none { it is GET }) { + error("SSE endpoint must use @GET method") + } + val idType = getParameterUpperBound(0, innerType) val typeType = getParameterUpperBound(1, innerType) val dataType = getParameterUpperBound(2, innerType) diff --git a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt index 9bd2607a3d..7dd63d6535 100644 --- a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt +++ b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt @@ -21,6 +21,7 @@ import kotlinx.coroutines.flow.Flow import retrofit2.CallAdapter import retrofit2.Retrofit import retrofit2.adapter.sse.ServerSentEvent +import retrofit2.http.GET import retrofit2.http.Streaming object SseKtxFlowCallAdapterFactory : CallAdapter.Factory() { @@ -57,6 +58,10 @@ object SseKtxFlowCallAdapterFactory : CallAdapter.Factory() { error("SSE endpoint must be annotated with @Streaming") } + if (annotations.none { it is GET }) { + error("SSE endpoint must use @GET method") + } + val idType = getParameterUpperBound(0, innerType) val typeType = getParameterUpperBound(1, innerType) val dataType = getParameterUpperBound(2, innerType) From 3cd290846236b0e1d430ab869b394f13431b5b72 Mon Sep 17 00:00:00 2001 From: MukjepScarlet <93977077+mukjepscarlet@users.noreply.github.com> Date: Tue, 12 Aug 2025 16:54:24 +0800 Subject: [PATCH 18/21] refactor: use delegation from Retrofit Now it requires the default callback style adapter. --- .../sse/java9/SseJucFlowCallAdapter.kt | 26 +++-------------- .../sse/java9/SseJucFlowCallAdapterFactory.kt | 28 +++++++++++++++---- .../SseJucFlowCallAdapterFactoryTest.java | 2 ++ .../sse/kotlinx/SseKtxFlowCallAdapter.kt | 18 ++---------- .../kotlinx/SseKtxFlowCallAdapterFactory.kt | 19 +++++++++---- .../SseKtxFlowCallAdapterFactoryTest.kt | 2 ++ 6 files changed, 46 insertions(+), 49 deletions(-) diff --git a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt index d540c23541..75fc036801 100644 --- a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt +++ b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt @@ -17,45 +17,27 @@ package retrofit2.adapter.sse.java9 import java.lang.reflect.Type import java.util.concurrent.Executor -import java.util.concurrent.Executors import java.util.concurrent.Flow -import java.util.concurrent.ForkJoinPool import java.util.concurrent.SubmissionPublisher import okhttp3.ResponseBody import retrofit2.Call import retrofit2.CallAdapter -import retrofit2.Retrofit import retrofit2.adapter.sse.EventSource import retrofit2.adapter.sse.ServerSentEvent import retrofit2.adapter.sse.SseCallback -import retrofit2.adapter.sse.internal.EventSourceCallAdapter internal class SseJucFlowCallAdapter( - executor: Executor?, + private val executor: Executor, private val maxBufferCapacity: Int, - retrofit: Retrofit, - idType: Type, - typeType: Type, - dataType: Type, + private val delegation: CallAdapter>, ) : CallAdapter>> { - private val delegate = EventSourceCallAdapter( - retrofit, - idType, - typeType, - dataType, - ) - - override fun responseType(): Type = delegate.responseType() - - private val executor: Executor = executor ?: retrofit.callbackExecutor() - ?: ForkJoinPool.commonPool().takeIf { ForkJoinPool.getCommonPoolParallelism() > 1 } - ?: Executors.newCachedThreadPool() + override fun responseType(): Type = delegation.responseType() override fun adapt( call: Call, ): Flow.Publisher> { - val delegate = delegate.adapt(call) + val delegate = delegation.adapt(call) return object : SubmissionPublisher>(executor, maxBufferCapacity) { override fun subscribe(subscriber: Flow.Subscriber>?) { super.subscribe(subscriber) diff --git a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt index 495e3570c0..e15f034e31 100644 --- a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt +++ b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt @@ -18,10 +18,14 @@ package retrofit2.adapter.sse.java9 import java.lang.reflect.ParameterizedType import java.lang.reflect.Type import java.util.concurrent.Executor +import java.util.concurrent.Executors import java.util.concurrent.Flow +import java.util.concurrent.ForkJoinPool import retrofit2.CallAdapter import retrofit2.Retrofit +import retrofit2.adapter.sse.EventSource import retrofit2.adapter.sse.ServerSentEvent +import retrofit2.adapter.sse.internal.EventSourceCallAdapter import retrofit2.http.GET import retrofit2.http.Streaming @@ -77,13 +81,25 @@ class SseJucFlowCallAdapterFactory private constructor( val typeType = getParameterUpperBound(1, innerType) val dataType = getParameterUpperBound(2, innerType) - return SseJucFlowCallAdapter( - executor, + val returnType = object : ParameterizedType { + override fun getRawType(): Type = EventSource::class.java + override fun getOwnerType(): Type? = null + override fun getActualTypeArguments(): Array = arrayOf(idType, typeType, dataType) + } + + val delegation = runCatching { + retrofit.nextCallAdapter(this, returnType, annotations) as? EventSourceCallAdapter<*, *, *> + }.getOrNull() ?: return null + + return SseJucFlowCallAdapter( + executorOrDefault(retrofit), maxBufferCapacity, - retrofit, - idType, - typeType, - dataType, + delegation, ) } + + private fun executorOrDefault(retrofit: Retrofit): Executor = + executor ?: retrofit.callbackExecutor() + ?: ForkJoinPool.commonPool().takeIf { ForkJoinPool.getCommonPoolParallelism() > 1 } + ?: Executors.newCachedThreadPool() } diff --git a/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java b/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java index 8f018b3b52..da03277301 100644 --- a/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java +++ b/retrofit-adapters/sse/juc-flow/src/test/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactoryTest.java @@ -27,6 +27,7 @@ import org.junit.Rule; import org.junit.Test; import retrofit2.Retrofit; +import retrofit2.adapter.sse.EventSourceCallAdapterFactory; import retrofit2.adapter.sse.ServerSentEvent; import retrofit2.converter.scalars.ScalarsConverterFactory; import retrofit2.http.GET; @@ -51,6 +52,7 @@ public void setUp() { .baseUrl(server.url("/")) .addConverterFactory(ScalarsConverterFactory.create()) .addCallAdapterFactory(SseJucFlowCallAdapterFactory.create()) + .addCallAdapterFactory(EventSourceCallAdapterFactory.INSTANCE) .build(); service = retrofit.create(Service.class); } diff --git a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt index a07ad6b219..a0549fc27f 100644 --- a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt +++ b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt @@ -23,32 +23,20 @@ import kotlinx.coroutines.flow.callbackFlow import okhttp3.ResponseBody import retrofit2.Call import retrofit2.CallAdapter -import retrofit2.Retrofit import retrofit2.adapter.sse.EventSource import retrofit2.adapter.sse.ServerSentEvent import retrofit2.adapter.sse.SseCallback -import retrofit2.adapter.sse.internal.EventSourceCallAdapter internal class SseKtxFlowCallAdapter( - retrofit: Retrofit, - idType: Type, - typeType: Type, - dataType: Type, + private val delegation: CallAdapter>, ) : CallAdapter>> { - private val delegate = EventSourceCallAdapter( - retrofit, - idType, - typeType, - dataType, - ) - - override fun responseType(): Type = delegate.responseType() + override fun responseType(): Type = delegation.responseType() override fun adapt( call: Call, ): Flow> { - val delegate = delegate.adapt(call) + val delegate = delegation.adapt(call) return callbackFlow { delegate.subscribe( object : SseCallback { diff --git a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt index 7dd63d6535..056dca6c90 100644 --- a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt +++ b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt @@ -20,7 +20,9 @@ import java.lang.reflect.Type import kotlinx.coroutines.flow.Flow import retrofit2.CallAdapter import retrofit2.Retrofit +import retrofit2.adapter.sse.EventSource import retrofit2.adapter.sse.ServerSentEvent +import retrofit2.adapter.sse.internal.EventSourceCallAdapter import retrofit2.http.GET import retrofit2.http.Streaming @@ -66,11 +68,16 @@ object SseKtxFlowCallAdapterFactory : CallAdapter.Factory() { val typeType = getParameterUpperBound(1, innerType) val dataType = getParameterUpperBound(2, innerType) - return SseKtxFlowCallAdapter( - retrofit, - idType, - typeType, - dataType, - ) + val returnType = object : ParameterizedType { + override fun getRawType(): Type = EventSource::class.java + override fun getOwnerType(): Type? = null + override fun getActualTypeArguments(): Array = arrayOf(idType, typeType, dataType) + } + + val delegation = runCatching { + retrofit.nextCallAdapter(this, returnType, annotations) as? EventSourceCallAdapter<*, *, *> + }.getOrNull() ?: return null + + return SseKtxFlowCallAdapter(delegation) } } diff --git a/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt b/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt index bb1be88c82..52bdd332a8 100644 --- a/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt +++ b/retrofit-adapters/sse/ktx-flow/src/test/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactoryTest.kt @@ -24,6 +24,7 @@ import org.junit.Before import org.junit.Rule import org.junit.Test import retrofit2.Retrofit +import retrofit2.adapter.sse.EventSourceCallAdapterFactory import retrofit2.adapter.sse.ServerSentEvent import retrofit2.converter.gson.GsonConverterFactory import retrofit2.converter.scalars.ScalarsConverterFactory @@ -57,6 +58,7 @@ class SseKtxFlowCallAdapterFactoryTest { .addConverterFactory(ScalarsConverterFactory.create()) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(SseKtxFlowCallAdapterFactory) + .addCallAdapterFactory(EventSourceCallAdapterFactory) .build() service = retrofit.create() } From 12ae45b6094dace17a2356bc56ef4ebb1d75cfd2 Mon Sep 17 00:00:00 2001 From: MukjepScarlet <93977077+mukjepscarlet@users.noreply.github.com> Date: Tue, 12 Aug 2025 17:05:29 +0800 Subject: [PATCH 19/21] change nextCallAdapter to callAdapter --- .../retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt | 2 +- .../adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt index e15f034e31..182b89c688 100644 --- a/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt +++ b/retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapterFactory.kt @@ -88,7 +88,7 @@ class SseJucFlowCallAdapterFactory private constructor( } val delegation = runCatching { - retrofit.nextCallAdapter(this, returnType, annotations) as? EventSourceCallAdapter<*, *, *> + retrofit.callAdapter(returnType, annotations) as? EventSourceCallAdapter<*, *, *> }.getOrNull() ?: return null return SseJucFlowCallAdapter( diff --git a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt index 056dca6c90..0113630ec0 100644 --- a/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt +++ b/retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapterFactory.kt @@ -75,7 +75,7 @@ object SseKtxFlowCallAdapterFactory : CallAdapter.Factory() { } val delegation = runCatching { - retrofit.nextCallAdapter(this, returnType, annotations) as? EventSourceCallAdapter<*, *, *> + retrofit.callAdapter(returnType, annotations) as? EventSourceCallAdapter<*, *, *> }.getOrNull() ?: return null return SseKtxFlowCallAdapter(delegation) From b1f790f2806a0594e58d8d9846ac488c404d4bee Mon Sep 17 00:00:00 2001 From: MukjepScarlet <93977077+mukjepscarlet@users.noreply.github.com> Date: Tue, 12 Aug 2025 18:57:11 +0800 Subject: [PATCH 20/21] direct cast String --- .../adapter/sse/internal/RealEventSource.kt | 27 ++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/RealEventSource.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/RealEventSource.kt index dd5c1e2e38..6ab2be73ad 100644 --- a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/RealEventSource.kt +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/RealEventSource.kt @@ -15,7 +15,9 @@ */ package retrofit2.adapter.sse.internal +import java.lang.reflect.ParameterizedType import java.lang.reflect.Type +import java.lang.reflect.WildcardType import okhttp3.Request import okhttp3.ResponseBody import okhttp3.ResponseBody.Companion.toResponseBody @@ -35,6 +37,11 @@ private fun Response.asSse(listener: EventSourceListener) { EventSources.processResponse(okhttpResponse, listener) } +private fun Type.acceptsString(): Boolean { + return this === String::class.java || this === Object::class.java || this === CharSequence::class.java || this === Comparable::class.java || + this is ParameterizedType && this.rawType === Comparable::class.java && this.actualTypeArguments[0].let { it === String::class.java || it is WildcardType && it.upperBounds[0] === String::class.java } +} + internal class RealEventSource( private val idType: Type, private val typeType: Type, @@ -84,14 +91,28 @@ internal class RealEventSource( } private fun convertId(id: String?): ID? { - return if (id != null) idConverter.convert(id.toResponseBody()) ?: conversionError(id, idType) else null + @Suppress("UNCHECKED_CAST") + return when { + idType.acceptsString() -> id as ID? + id != null -> idConverter.convert(id.toResponseBody()) ?: conversionError(id, idType) + else -> null + } } private fun convertType(type: String?): TYPE? { - return if (type != null) typeConverter.convert(type.toResponseBody()) ?: conversionError(type, typeType) else null + @Suppress("UNCHECKED_CAST") + return when { + typeType.acceptsString() -> type as TYPE? + type != null -> typeConverter.convert(type.toResponseBody()) ?: conversionError(type, typeType) + else -> null + } } private fun convertData(data: String): DATA { - return dataConverter.convert(data.toResponseBody()) ?: conversionError(data, dataType) + @Suppress("UNCHECKED_CAST") + return when { + dataType.acceptsString() -> data as DATA + else -> dataConverter.convert(data.toResponseBody()) ?: conversionError(data, dataType) + } } } From 23e0362aa5ee1e435b154dd24f0af898c7e368a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=A8=E8=91=89=20Scarlet?= <93977077+mukjepscarlet@users.noreply.github.com> Date: Tue, 12 Aug 2025 20:23:47 +0800 Subject: [PATCH 21/21] sse: improve String type check --- .../adapter/sse/internal/RealEventSource.kt | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/RealEventSource.kt b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/RealEventSource.kt index 6ab2be73ad..0fff69ed72 100644 --- a/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/RealEventSource.kt +++ b/retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/RealEventSource.kt @@ -37,10 +37,16 @@ private fun Response.asSse(listener: EventSourceListener) { EventSources.processResponse(okhttpResponse, listener) } -private fun Type.acceptsString(): Boolean { - return this === String::class.java || this === Object::class.java || this === CharSequence::class.java || this === Comparable::class.java || - this is ParameterizedType && this.rawType === Comparable::class.java && this.actualTypeArguments[0].let { it === String::class.java || it is WildcardType && it.upperBounds[0] === String::class.java } -} +private fun Type.acceptsString(): Boolean = + when (this) { + String::class.java -> true + Object::class.java -> true + CharSequence::class.java -> true + Comparable::class.java -> true + is ParameterizedType -> rawType === Comparable::class.java && actualTypeArguments[0].acceptsString() + is WildcardType -> upperBounds[0].acceptsString() + else -> false + } internal class RealEventSource( private val idType: Type,