How to implement ndjson Response converter #756
Answered
by
ag2s20150909
ag2s20150909
asked this question in
Q&A
-
When using retrofit it can do something like this class NdJsonResponseBodyConverter<T>(
private val loader: DeserializationStrategy<T>,
private val format: StringFormat
) : Converter<ResponseBody, Flow<T>> {
override fun convert(value: ResponseBody): Flow<T> = callbackFlow {
value.charStream().forEachLine { line->
if(line.isNotEmpty()){
val obj=if (line.startsWith("data:")){
//
format.decodeFromString(loader,line.substringAfter("data:"))
}else{
format.decodeFromString(loader,line)
}
trySendBlocking(obj)
}
}
awaitClose {
value.close()
}
}
}
class NdJsonConverterFactory(private val format:StringFormat): Converter.Factory() {
override fun responseBodyConverter(
type: Type,
annotations: Array<out Annotation>,
retrofit: Retrofit
): Converter<ResponseBody, *>? {
if (getRawType(type) != Flow::class.java) {
return null
}
check(type is ParameterizedType) { "Flow return type must be parameterized as Flow<Foo> or Flow<out Foo>" }
val responseType = getParameterUpperBound(0, type)
val loader =format.serializersModule.serializer(responseType)
return NdJsonResponseBodyConverter(loader,format)
}
}
fun StringFormat.newNdJsonFactory()=NdJsonConverterFactory(this)
@Serializable
data class FooResponse(
@SerialName("aa")
val aa: String,
@SerialName("bb")
val bb: String
)
interface TestService{
@GET("/api/test")
suspend fun test(): Flow<FooResponse>
}
val ktjson= Json {
isLenient = true
ignoreUnknownKeys = true
coerceInputValues = true
encodeDefaults = false
allowStructuredMapKeys = true
}
val retrofit= Retrofit.Builder()
....
.addConverterFactory(ktjson.newNdJsonFactory())
.addConverterFactory(ktjson.asConverterFactory("application/json; charset=UTF8".toMediaType()))
|
Beta Was this translation helpful? Give feedback.
Answered by
ag2s20150909
Jan 6, 2025
Replies: 2 comments
-
The current solution is to write it manually, it would be nice to generate it automatically. interface TestService{
@Streaming
@GET("/api/test")
suspend fun _test() : HttpStatement
}
fun TestService.testFlow():Flow<FooResponse> = callbackFlow {
val channel: ByteReadChannel=_test().body()
while (!channel.isClosedForRead) {
val line = channel.readUTF8Line()
if (!line.isNullOrBlank()) {
try {
val obj=if (line.startsWith("data:")){
//for SSE text/event-stream
ktjson.decodeFromString(FooResponse.serializer(),line.substringAfter("data:"))
}else{
//for NdJson application/x-ndjson
ktjson.decodeFromString(FooResponse.serializer(),line)
}
trySendBlocking(obj)
}catch (e:Exception){
throw e
}
}
}
awaitClose {
}
}
or
inline fun <reified T> HttpStatement.asFlow(format: StringFormat=ktjson):Flow<T> = flow{
val content:ByteReadChannel=this@asFlow.body()
while (!content.isClosedForRead) {
val line = content.readUTF8Line()
try {
if (!line.isNullOrEmpty()) {
val obj=if (line.startsWith("data:")){
//for SSE text/event-stream
//Log.e("SSE", line)
format.decodeFromString<T>(line.substringAfter("data:"))
}else{
//for NdJson application/x-ndjson
format.decodeFromString<T>(line)
}
emit(obj)
}
}catch (e:Exception){
//Log.e("SSE", e.stackTraceToString())
}
}
}
|
Beta Was this translation helpful? Give feedback.
0 replies
Answer selected by
ag2s20150909
-
Another attempt. It works, but it's not streaming. .......
install(ContentNegotiation){
json(ktjson)
register(NDJson, NdjsonContentConverter(ktjson))
register(SSE, NdjsonContentConverter(ktjson))
}
.....
private val NDJson = ContentType("application","x-ndjson")
private val SSE =ContentType("text","event-stream")
class NdjsonContentConverter(private val format: StringFormat): ContentConverter{
override suspend fun deserialize(
charset: Charset,
typeInfo: TypeInfo,
content: ByteReadChannel
): Any? {
check(typeInfo.type==Flow::class){"For NdjsonContentConverter the return type must be kotlinx.coroutines.flow.Flow ,like Flow<Foo> or Flow<out Foo>"}
val responseType = typeInfo.upperBoundType(0)!!.type.javaObjectType
val loader =format.serializersModule.serializer(responseType)
return callbackFlow {
while (!content.isClosedForRead) {
val line = content.readUTF8Line()
try {
if (!line.isNullOrEmpty()) {
val obj=if (line.startsWith("data:")){
//for SSE text/event-stream
ktjson.decodeFromString(loader,line.substringAfter("data:"))
}else{
//for NdJson application/x-ndjson
ktjson.decodeFromString(loader,line)
}
trySendBlocking(obj)
}
}catch (e:Exception){
throw e
}
}
awaitClose { }
}
} |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The current solution is to write it manually, it would be nice to generate it automatically.