Skip to content

Commit 63028d2

Browse files
committed
move abstract to delegation
1 parent 18aa33d commit 63028d2

5 files changed

Lines changed: 76 additions & 151 deletions

File tree

retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapterFactory.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import java.lang.reflect.ParameterizedType
1919
import java.lang.reflect.Type
2020
import retrofit2.CallAdapter
2121
import retrofit2.Retrofit
22+
import retrofit2.adapter.sse.internal.EventSourceCallAdapter
2223
import retrofit2.http.Streaming
2324

2425
object EventSourceCallAdapterFactory : CallAdapter.Factory() {

retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/AbstractSseCallAdapter.kt

Lines changed: 0 additions & 106 deletions
This file was deleted.

retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/EventSourceCallAdapter.kt renamed to retrofit-adapters/sse/core/src/main/java/retrofit2/adapter/sse/internal/EventSourceCallAdapter.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,19 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package retrofit2.adapter.sse
16+
package retrofit2.adapter.sse.internal
1717

1818
import java.lang.reflect.Type
1919
import okhttp3.ResponseBody
2020
import retrofit2.Call
2121
import retrofit2.CallAdapter
2222
import retrofit2.Converter
2323
import retrofit2.Retrofit
24-
import retrofit2.adapter.sse.internal.RealEventSource
24+
import retrofit2.adapter.sse.EventSource
2525

2626
private val EMPTY_ARRAY = emptyArray<Annotation>()
2727

28-
internal class EventSourceCallAdapter<ID : Any, TYPE : Any, DATA : Any>(
28+
class EventSourceCallAdapter<ID : Any, TYPE : Any, DATA : Any>(
2929
retrofit: Retrofit,
3030
private val idType: Type,
3131
private val typeType: Type,

retrofit-adapters/sse/juc-flow/src/main/java/retrofit2/adapter/sse/java9/SseJucFlowCallAdapter.kt

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,29 @@ import java.util.concurrent.ForkJoinPool
2323
import java.util.concurrent.SubmissionPublisher
2424
import okhttp3.ResponseBody
2525
import retrofit2.Call
26+
import retrofit2.CallAdapter
2627
import retrofit2.Retrofit
28+
import retrofit2.adapter.sse.EventSource
2729
import retrofit2.adapter.sse.ServerSentEvent
28-
import retrofit2.adapter.sse.internal.AbstractSseCallAdapter
30+
import retrofit2.adapter.sse.SseCallback
31+
import retrofit2.adapter.sse.internal.EventSourceCallAdapter
2932

3033
internal class SseJucFlowCallAdapter<ID : Any, TYPE : Any, DATA : Any>(
3134
executor: Executor?,
3235
retrofit: Retrofit,
3336
idType: Type,
3437
typeType: Type,
3538
dataType: Type,
36-
) : AbstractSseCallAdapter<ID, TYPE, DATA, Flow.Publisher<ServerSentEvent<ID, TYPE, DATA>>, SubmissionPublisher<ServerSentEvent<ID, TYPE, DATA>>>(retrofit, idType, typeType, dataType) {
39+
) : CallAdapter<ResponseBody, Flow.Publisher<ServerSentEvent<ID, TYPE, DATA>>> {
40+
41+
private val delegate = EventSourceCallAdapter<ID, TYPE, DATA>(
42+
retrofit,
43+
idType,
44+
typeType,
45+
dataType,
46+
)
47+
48+
override fun responseType(): Type = delegate.responseType()
3749

3850
private val executor: Executor = executor ?: retrofit.callbackExecutor()
3951
?: ForkJoinPool.commonPool().takeIf { ForkJoinPool.getCommonPoolParallelism() > 1 }
@@ -42,34 +54,35 @@ internal class SseJucFlowCallAdapter<ID : Any, TYPE : Any, DATA : Any>(
4254
override fun adapt(
4355
call: Call<ResponseBody>,
4456
): Flow.Publisher<ServerSentEvent<ID, TYPE, DATA>> {
57+
val delegate = delegate.adapt(call)
4558
return object : SubmissionPublisher<ServerSentEvent<ID, TYPE, DATA>>(executor, Flow.defaultBufferSize()) {
4659
override fun subscribe(subscriber: Flow.Subscriber<in ServerSentEvent<ID, TYPE, DATA>>?) {
4760
super.subscribe(subscriber)
48-
call.attachEventSourceListener(this)
61+
delegate.subscribe(object : SseCallback<ID, TYPE, DATA> {
62+
override fun onEvent(
63+
eventSource: EventSource<ID, TYPE, DATA>,
64+
id: ID?,
65+
type: TYPE?,
66+
data: DATA,
67+
) {
68+
submit(ServerSentEvent(id, type, data))
69+
}
70+
71+
override fun onClosed(eventSource: EventSource<ID, TYPE, DATA>) {
72+
close()
73+
}
74+
75+
override fun onFailure(eventSource: EventSource<ID, TYPE, DATA>, t: Throwable?) {
76+
closeExceptionally(t ?: RuntimeException()) // TODO exception type
77+
}
78+
})
4979
}
5080

5181
override fun close() {
52-
call.cancel()
82+
delegate.cancel()
5383
super.close()
5484
}
5585
}
5686
}
5787

58-
override fun emit(
59-
builder: SubmissionPublisher<ServerSentEvent<ID, TYPE, DATA>>,
60-
event: ServerSentEvent<ID, TYPE, DATA>,
61-
) {
62-
builder.submit(event)
63-
}
64-
65-
override fun close(builder: SubmissionPublisher<ServerSentEvent<ID, TYPE, DATA>>) {
66-
builder.close()
67-
}
68-
69-
override fun closeExceptionally(
70-
builder: SubmissionPublisher<ServerSentEvent<ID, TYPE, DATA>>,
71-
t: Throwable,
72-
) {
73-
builder.closeExceptionally(t)
74-
}
7588
}

retrofit-adapters/sse/ktx-flow/src/main/java/retrofit2/adapter/sse/kotlinx/SseKtxFlowCallAdapter.kt

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,46 +16,63 @@
1616
package retrofit2.adapter.sse.kotlinx
1717

1818
import java.lang.reflect.Type
19-
import kotlinx.coroutines.channels.ProducerScope
2019
import kotlinx.coroutines.channels.awaitClose
2120
import kotlinx.coroutines.channels.trySendBlocking
2221
import kotlinx.coroutines.flow.Flow
2322
import kotlinx.coroutines.flow.callbackFlow
2423
import okhttp3.ResponseBody
2524
import retrofit2.Call
25+
import retrofit2.CallAdapter
2626
import retrofit2.Retrofit
27+
import retrofit2.adapter.sse.EventSource
2728
import retrofit2.adapter.sse.ServerSentEvent
28-
import retrofit2.adapter.sse.internal.AbstractSseCallAdapter
29+
import retrofit2.adapter.sse.SseCallback
30+
import retrofit2.adapter.sse.internal.EventSourceCallAdapter
2931

3032
internal class SseKtxFlowCallAdapter<ID : Any, TYPE : Any, DATA : Any>(
3133
retrofit: Retrofit,
3234
idType: Type,
3335
typeType: Type,
3436
dataType: Type,
35-
) : AbstractSseCallAdapter<ID, TYPE, DATA, Flow<ServerSentEvent<ID, TYPE, DATA>>, ProducerScope<ServerSentEvent<ID, TYPE, DATA>>>(retrofit, idType, typeType, dataType) {
37+
) : CallAdapter<ResponseBody, Flow<ServerSentEvent<ID, TYPE, DATA>>> {
38+
39+
private val delegate = EventSourceCallAdapter<ID, TYPE, DATA>(
40+
retrofit,
41+
idType,
42+
typeType,
43+
dataType,
44+
)
45+
46+
override fun responseType(): Type = delegate.responseType()
3647

3748
override fun adapt(
3849
call: Call<ResponseBody>,
39-
): Flow<ServerSentEvent<ID, TYPE, DATA>> = callbackFlow {
40-
call.attachEventSourceListener(this)
41-
awaitClose(call::cancel)
42-
}
50+
): Flow<ServerSentEvent<ID, TYPE, DATA>> {
51+
val delegate = delegate.adapt(call)
52+
return callbackFlow {
53+
delegate.subscribe(
54+
object : SseCallback<ID, TYPE, DATA> {
55+
override fun onEvent(
56+
eventSource: EventSource<ID, TYPE, DATA>,
57+
id: ID?,
58+
type: TYPE?,
59+
data: DATA,
60+
) {
61+
trySendBlocking(ServerSentEvent(id, type, data))
62+
}
4363

44-
override fun emit(
45-
builder: ProducerScope<ServerSentEvent<ID, TYPE, DATA>>,
46-
event: ServerSentEvent<ID, TYPE, DATA>,
47-
) {
48-
builder.trySendBlocking(event)
49-
}
64+
override fun onClosed(eventSource: EventSource<ID, TYPE, DATA>) {
65+
close()
66+
}
5067

51-
override fun close(builder: ProducerScope<ServerSentEvent<ID, TYPE, DATA>>) {
52-
builder.close()
53-
}
68+
override fun onFailure(eventSource: EventSource<ID, TYPE, DATA>, t: Throwable?) {
69+
close(t ?: RuntimeException()) // TODO exception type
70+
}
71+
},
72+
)
5473

55-
override fun closeExceptionally(
56-
builder: ProducerScope<ServerSentEvent<ID, TYPE, DATA>>,
57-
t: Throwable,
58-
) {
59-
builder.close(t)
74+
awaitClose(call::cancel)
75+
}
6076
}
77+
6178
}

0 commit comments

Comments
 (0)