-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-8300 Transaction handling #30
base: main
Are you sure you want to change the base?
Conversation
…trieveJournal.java commented out
Hi @jpechane. Thank you for your valuable contribution. |
Welcome as a new contributor to Debezium, @jpechane. Reviewers, please add missing author name(s) and alias name(s) to the COPYRIGHT.txt and Aliases.txt respectively. |
Hi @jpechane. Thank you for your valuable contribution. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aziza-calm Ho, this is a good start! Could you please take a look at the comments and polish the PR if possible? Thanks
@@ -16,12 +16,14 @@ | |||
|
|||
public class TestHelper { | |||
|
|||
private static final String DATABASE_NAME = "DTEST"; | |||
private static final String DATABASE_NAME = "AZIZACALM2"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be no changes in this class. These are just defaults and if you need to use different values then they should be passed via system properties from outside during runtime. OTH thinking about that it makes sense to keep HOTNAME/PORT change to point to PUB400 as the default.
"INSERT INTO " + TABLE + " VALUES (2, 'second')", | ||
records.print(); | ||
|
||
// JdbcConnection conn = TestHelper.testConnection().setAutoCommit(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not modify existng test but intorduce a new one similar to io.debezium.connector.postgresql.TransactionMetadataIT
to test the transaction handling in detail.
if (config.filtering() && !config.includeFiles().isEmpty()) { | ||
builder.withFileFilters(config.includeFiles()); | ||
} | ||
// if (config.filtering() && !config.includeFiles().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove the commented out code if needed.
But I guess it should not be removed as such but change elswher is probbaly needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might consult @msillence and explain why you need it and you migh find a different solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put in a flag to turn of filtering as it was causing so many problems with working out how to paginate though the journals. I think that's hopefully resolved now. Though I think it was only a few weeks ago I fixed another edge case, this time my mistake rather than a missunderstanding of how the API works. Still I'd like to keep the filtering flag
The is empty check probably belongs in RetrievalCriteria.withFile:207
|
||
log.debug("update event id {} tx {} table {}", nextOffset, txId, tableId); | ||
|
||
dispatcher.dispatchDataChangeEvent(partition, tableId, new As400ChangeRecordEmitter(partition, | ||
offsetContext, Operation.UPDATE, dataBefore, dataNext, clock, connectorConfig)); | ||
if ("0".equals(txId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please intorduce a constant with name descirbing meaning of "0"
value.
builder.withFileFilters(config.includeFiles()); | ||
} | ||
// if (config.filtering() && !config.includeFiles().isEmpty()) { | ||
// builder.withFileFilters(config.includeFiles()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh we really do need this ability
We have clients with millions of events an hour and we absolutely need to filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
entries in journal that correspond to start commit and end commit has null object, and they are not processed by connector because of this filter
Need to think about how to make a filter that will not cause commit/rollback entries filtering
dispatcher.dispatchDataChangeEvent(partition, tableId, new As400ChangeRecordEmitter(partition, | ||
offsetContext, Operation.UPDATE, dataBefore, dataNext, clock, connectorConfig)); | ||
if ("0".equals(txId)) { | ||
log.debug("update not in transaction, dispatching it"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these blocks of code look very similar - I wonder if they could be refactored into something common?
https://issues.redhat.com/browse/DBZ-8301