Open
Description
Discussed in #9309
Originally posted by wimdeblauwe July 8, 2024
According to the documentation, you can use JSON as the serialization format for your JDBC channel message store by overriding the ChannelMessageStorePreparedStatementSetter
. However, I found it was not that easy. If you only do that, the MessageRowMapper
throws an exception that it cannot read what was written.
These are all the things I had to do to make it work:
- Inject a custom queryProvider, preparedStatementSetter and messageRowMapper into the configuration of the
JdbcChannelMessageStore
:
@Bean
JdbcChannelMessageStore jdbcChannelMessageStore(
DataSource dataSource,
ChannelMessageStoreQueryProvider queryProvider,
ChannelMessageStorePreparedStatementSetter preparedStatementSetter,
MessageRowMapper messageRowMapper) {
JdbcChannelMessageStore jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
jdbcChannelMessageStore.setTablePrefix(CONCURRENT_METADATA_STORE_PREFIX);
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(
new PostgresChannelMessageStoreQueryProvider());
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(queryProvider);
jdbcChannelMessageStore.setPreparedStatementSetter(preparedStatementSetter);
jdbcChannelMessageStore.setMessageRowMapper(messageRowMapper);
return jdbcChannelMessageStore;
}
- Custom
ChannelMessageStorePreparedStatementSetter
that writes 2 colums: one with the type of the payload, and one with the serialized JSON of the payload.
@Bean
ChannelMessageStorePreparedStatementSetter channelMessageStorePreparedStatementSetter(ObjectMapper objectMapper) {
return new ChannelMessageStorePreparedStatementSetter() {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage, Object groupId, String region,
boolean priorityEnabled) throws SQLException {
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
try {
preparedStatement.setString(6, requestMessage.getPayload().getClass().getName());
String json = objectMapper.writeValueAsString(requestMessage.getPayload());
preparedStatement.setObject(7, json, java.sql.Types.OTHER);
} catch (JsonProcessingException e) {
throw new RuntimeException("Unable to store message", e);
}
}
};
}
- Custom
MessageRowMapper
. Unfortunately, the setter atJdbcChannelMessageStore
only accepts theMessageRowMapper
class, not theRowMapper
interface, but overriden themapRow
is luckily possible.
@Bean
MessageRowMapper messageRowMapper(ObjectMapper objectMapper) {
return new MessageRowMapper(null, null) {
@Override
public Message<?> mapRow(ResultSet rs, int rowNum) throws SQLException {
try {
String payloadType = rs.getString(rs.findColumn("MESSAGE_PAYLOAD_TYPE"));
String s = rs.getString(rs.findColumn("MESSAGE_BYTES"));
Object o = objectMapper.readValue(s, Class.forName(payloadType));
return new MutableMessage<>(o);
} catch (JsonProcessingException | ClassNotFoundException e) {
throw new RuntimeException("Unable to read message", e);
}
}
};
}
- To support the extra column for the payload type, I needed to provide a custom query provider:
@Bean
ChannelMessageStoreQueryProvider queryProvider() {
return new PostgresChannelMessageStoreQueryProvider(){
@Override
public String getCreateMessageQuery() {
return """
INSERT into %PREFIX%CHANNEL_MESSAGE(
MESSAGE_ID,
GROUP_KEY,
REGION,
CREATED_DATE,
MESSAGE_PRIORITY,
MESSAGE_PAYLOAD_TYPE,
MESSAGE_BYTES)
values (?, ?, ?, ?, ?, ?, ?)
""";
}
@Override
public String getPollFromGroupQuery() {
return """
delete
from %PREFIX%CHANNEL_MESSAGE
where CTID = (select CTID
from %PREFIX%CHANNEL_MESSAGE
where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key
and %PREFIX%CHANNEL_MESSAGE.REGION = :region
order by CREATED_DATE, MESSAGE_SEQUENCE
limit 1 for update skip locked)
returning MESSAGE_ID, MESSAGE_PAYLOAD_TYPE, MESSAGE_BYTES;
""";
}
};
}
I also updated my Flyway script that creates the tables to add the MESSAGE_PAYLOAD_TYPE
column:
CREATE TABLE _spring_integration_CHANNEL_MESSAGE
(
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_PRIORITY BIGINT,
MESSAGE_SEQUENCE BIGINT NOT NULL DEFAULT NEXTVAL('_spring_integration_MESSAGE_SEQ'),
MESSAGE_PAYLOAD_TYPE VARCHAR NOT NULL,
MESSAGE_BYTES JSON,
REGION VARCHAR(100) NOT NULL,
CONSTRAINT _spring_integration_CHANNEL_MESSAGE_PK PRIMARY KEY (REGION, GROUP_KEY, CREATED_DATE, MESSAGE_SEQUENCE)
);
I wonder if this is the good approach to have (readable) JSON in the database instead of (unreadable) Java serialization in there? If so, maybe the documentation should be updated to provide a complete solution?