Description
Describe the bug
An error is logged when sending a message to a topic when message is ServiceBusMessage of string.
The error logged is "This method can be called if AMQP Message body type is 'VALUE'. The actual type is [DATA]."
The message seems to be sent even if the error is logged.
Exception or Stack Trace
Stack-Trace available in debug
java.lang.IllegalArgumentException: This method can be called if AMQP Message body type is 'VALUE'. The actual type is [DATA].
at com.azure.core.amqp.models.AmqpMessageBody.getValue(AmqpMessageBody.java:346)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:688)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:183)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:183)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:732)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:770)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:183)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:502)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:341)
at com.fasterxml.jackson.databind.ObjectMapper._writeValueAndClose(ObjectMapper.java:4799)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:4065)
at org.springframework.cloud.function.json.JacksonMapper.toJson(JacksonMapper.java:75)
at org.springframework.cloud.function.context.config.JsonMessageConverter.convertToInternal(JsonMessageConverter.java:131)
at org.springframework.cloud.stream.converter.CompositeMessageConverterFactory$2.convertToInternal(CompositeMessageConverterFactory.java:117)
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201)
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:189)
at org.springframework.cloud.function.context.config.SmartCompositeMessageConverter.toMessage(SmartCompositeMessageConverter.java:158)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputMessageIfNecessary(SimpleFunctionRegistry.java:1451)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputIfNecessary(SimpleFunctionRegistry.java:1263)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$convertOutputPublisherIfNecessary$27(SimpleFunctionRegistry.java:1525)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.SinkManyUnicast.drainRegular(SinkManyUnicast.java:284)
at reactor.core.publisher.SinkManyUnicast.drain(SinkManyUnicast.java:365)
at reactor.core.publisher.SinkManyUnicast.tryEmitNext(SinkManyUnicast.java:239)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
To Reproduce
Utilize the code snippet below to send a message to a topic.
Code Snippet
@Bean
public Supplier<Flux<Message<ServiceBusMessage>>> publishTopicMessage() {
return () -> publisher.asFlux()
.doOnError(t -> log.error("An error occurred while sending a message. Cause: %s.".formatted(t.getLocalizedMessage())));
}
public synchronized void publishMessage(final String message, final String messageType) {
var tenantId = "id";
var principalId = "id";
var azureMessage = new ServiceBusMessage(message);
azureMessage.setCorrelationId(MDC.get(MDC_CORRELATION_ID_KEY));
azureMessage.getApplicationProperties().put(EVENT_HEADER, messageType);
azureMessage.getApplicationProperties().put(TENANT_ID_HEADER, tenantId);
azureMessage.getApplicationProperties().put(PRINCIPAL_ID_HEADER, principalId);
publisher.emitNext(
MessageBuilder.withPayload(azureMessage).build(),
Sinks.EmitFailureHandler.FAIL_FAST
);
}
Expected behavior
No error should be logged if the message was sent with success.
Screenshots
If applicable, add screenshots to help explain your problem.
Setup (please complete the following information):
- OS: Windows/Linux
- IDE: IntelliJ
- Library/Libraries: spring-cloud-azure-stream-binder-servicebus:5.15.0
- Java version: 17
- Frameworks: Spring Boot 3.3.3
Additional context
It seems the error is logged two times.
Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
- Bug Description Added
- Repro Steps Added
- Setup information Added