Closed
Description
I have the following simple reproducer application:
package com.example.demo
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.messaging.Message
import org.springframework.messaging.MessageHeaders
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
@SpringBootApplication
class DemoApplication {
@Bean
fun kafkaProcessorBean(): (String) -> List<Message<Char>> = { it.map { MessageBuilder.createMessage(it, MessageHeaders(emptyMap())) } }
}
@Component
class KafkaProcessorComponent : (String) -> List<Message<Char>> {
override fun invoke(input: String): List<Message<Char>> {
return input.map { MessageBuilder.createMessage(it, MessageHeaders(emptyMap())) }
}
}
fun main(args: Array<String>) {
runApplication<DemoApplication>(*args)
}
with spring-cloud-stream-kafka on the classpath and this configuration:
spring.cloud.function.definition=kafkaProcessorBean;kafkaProcessorComponent
When I run this application with Spring Cloud 2024.0.1 and write a message into kafkaProcessorComponent-in-0
, I get only one output message in kafkaProcessorComponent-out-0
:
kcat -b localhost -t kafkaProcessorComponent-in-0 -P <<< bla
# kafkaProcessorComponent-out-0 now contains:
[{"payload":"b","headers":{"id":"70a871ab-556e-b618-fd62-72e790cc0c62","timestamp":1744104346397}},{"payload":"l","headers":{"id":"bb009a60-0f8f-15ac-bb7d-d14ef372f8b1","timestamp":1744104346397}},{"payload":"a","headers":{"id":"4064ed6a-21f0-308f-a7d7-5a6a545e7d7e","timestamp":1744104346397}}]
With 2024.0.0, I get (as expected) three output messages:
"b"
"l"
"a"
I played around with the versions and found out that Spring Cloud 2024.0.0 with spring-cloud-function 4.2.2 also has this problem, while Spring Cloud 2024.0.1 with spring-cloud-function 4.2.0 does not. That's why I'm reporting this here and not in spring-cloud-stream.
The kafkaProcessorBean
function works as expected in all versions:
kcat -b localhost -t kafkaProcessorBean-in-0 -P <<< bla
# kafkaProcessorBean-out-0 now contains:
"b"
"l"
"a"