Skip to content
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void setExtFields() {
super.getRocketmqProducer().setCompressMsgBodyOverHowmuch(10);
}

public SendResult send(CloudEvent cloudEvent) {
public SendResult send(CloudEvent cloudEvent) throws InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your modification is not much different from the original code, because the catch here does not throw InterruptedException, so it still swallows InterruptedException. What I mean is to throw InterruptedException directly in catch(InterruptedException) or return empty SendResult in catch(InterruptedException).

您这样修改和原代码区别不大,因为这里的catch不会抛出InterruptedException,所以还是将InterruptedException吞掉了。我的意思是在catch(InterruptedException)中直接抛出InterruptedException或者返回空的SendResult

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very sorry again. QAQ

this.checkProducerServiceState(rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msg =
RocketMQMessageFactory.createWriter(Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
Expand All @@ -94,6 +94,9 @@ public void sendOneway(CloudEvent cloudEvent) {
supplySysProp(msg, cloudEvent);
try {
this.rocketmqProducer.sendOneway(msg);
} catch (InterruptedException e) {
log.error("Send message oneway InterruptedException", e);
Thread.currentThread().interrupt(); // Restore interrupted status
} catch (Exception e) {
log.error(String.format("Send message oneway Exception, %s", msg), e);
throw this.checkProducerException(msg.getTopic(), MessageClientIDSetter.getUniqID(msg), e);
Expand All @@ -104,9 +107,12 @@ public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) {
this.checkProducerServiceState(this.rocketmqProducer.getDefaultMQProducerImpl());
org.apache.rocketmq.common.message.Message msg =
RocketMQMessageFactory.createWriter(Objects.requireNonNull(cloudEvent.getSubject())).writeBinary(cloudEvent);
msg = supplySysProp(msg, cloudEvent);
supplySysProp(msg, cloudEvent);
try {
this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback));
} catch (InterruptedException e) {
log.error("Send message async InterruptedException", e);
Thread.currentThread().interrupt(); // Restore interrupted status
} catch (Exception e) {
log.error(String.format("Send message async Exception, %s", msg), e);
throw this.checkProducerException(msg.getTopic(), MessageClientIDSetter.getUniqID(msg), e);
Expand Down Expand Up @@ -134,6 +140,9 @@ public void reply(final CloudEvent cloudEvent, final SendCallback sendCallback)

try {
this.rocketmqProducer.send(msg, this.sendCallbackConvert(msg, sendCallback));
} catch (InterruptedException e) {
log.error("Send message async InterruptedException", e);
Thread.currentThread().interrupt(); // Restore interrupted status
} catch (Exception e) {
log.error(String.format("Send message async Exception, %s", msg), e);
throw this.checkProducerException(msg.getTopic(), MessageClientIDSetter.getUniqID(msg), e);
Expand Down