Skip to content

Improves/Fixes TableBackedProgressManager#10

Open
hartmut-co-uk wants to merge 1 commit into
scylladb:masterfrom
hartmut-co-uk:master
Open

Improves/Fixes TableBackedProgressManager#10
hartmut-co-uk wants to merge 1 commit into
scylladb:masterfrom
hartmut-co-uk:master

Conversation

@hartmut-co-uk

Copy link
Copy Markdown
Contributor

Hi, I've been playing, testing and debugging with this lib + simple-printer example, also making use of TableBackedProgressManager + NewPeriodicProgressReporter in particular.

For some unexpected behaviour I came up with improvements/fixes I wanted to share and maybe discuss.
I've added my current findings + a new example into this PR.

Changes included

  • adds method Empty(..) to ChangeConsumer
    (allows for marking CDC log stream time as processed, fixes 'bug' where second start of a reader begins reading CDC log on generated_created timestamp instead of ChangeAgeLimit / actual latest processed state)
  • Simplify AdvancedReaderConfig options [PostEmptyQueryDelay, PostFailedQueryDelay] -> single 'PostQueryDelay' which is also considered when calculating next PollWindow
  • adds new example 'simple-printer-stateful' which correctly maintains the CDC reader's state using TableBackedProgressManager + NewPeriodicProgressReporter
  • adds some commented out 'logging' lines for debugging purposes

Options for further improvements:

  • gracefully shutdown the reader+consumers allowing to mark/save current progress before exit (~call consumer.End() / reporter.SaveAndStop(ctx context.Context))

Any questions, please reach out!

- adds method Empty(..) to ChangeConsumer
  (allows for marking CDC log stream time as processed, fixes 'bug' where second start of a reader begins reading CDC log on generated_created timestamp instead of ChangeAgeLimit / actual latest processed state)
- Simplify AdvancedReaderConfig options [PostEmptyQueryDelay, PostFailedQueryDelay] -> single 'PostQueryDelay' which is also considered when calculating next PollWindow
- adds new example 'simple-printer-stateful' which correctly maintains the CDC reader's state using TableBackedProgressManager + NewPeriodicProgressReporter
- adds some commented out 'logging' lines for debugging purposes
@hartmut-co-uk

Copy link
Copy Markdown
Contributor Author

cc: @avelanarius, @piodul

@hartmut-co-uk

Copy link
Copy Markdown
Contributor Author

cc: @haaawk

@hartmut-co-uk

hartmut-co-uk commented Dec 14, 2021

Copy link
Copy Markdown
Contributor Author

Latency (from 'write to table' ... 'Reader.Consume()') when the reader has fully caught up can be calculated as simple as
Latency = ConfidenceWindowSize + PostQueryDelay

e.g. for

Advanced: scyllacdc.AdvancedReaderConfig{
	ChangeAgeLimit:       15 * time.Minute,
	ConfidenceWindowSize: 10 * time.Second,
	PostQueryDelay:       5 * time.Second,
	PostFailedQueryDelay: 5 * time.Second,
	QueryTimeWindowSize:  5 * 60 * time.Second,
},

=> Latency: 15s
*with queries against scylla every 5s (PostQueryDelay) per [reader::table::vnodeId] ... -> with query windows of 5s
**also on initial startup or when behind + catching up QueryTimeWindowSize is used allowing for faster progress -> with query window size 5min (with above example).

@piodul piodul left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I apologize for leaving this PR unattended for so long.

The idea to introduce Empty() so that saving progress is possible when no rows are appearing on the stream looks good. I wasn't sure whether you are still interested in this PR, so I went ahead and used your idea in another PR: #13 (marking you as a co-author, of course). The most important change I made was moving Empty() from ChangeConsumer to a new, optional interface which extends ChangeConsumer, so that the changes are backwards-compatible.

I left review comments in this PR with some questions. I'm not sure whether you are still interested in the PR, so feel free to respond or disregard my comments.

Comment thread stream_batch.go
return pollWindow{
begin: windowStart,
end: gocql.MinTimeUUID(queryWindowRightEnd),
end: gocql.MinTimeUUID(queryWindowRightEnd.Add(sbr.config.Advanced.PostQueryDelay)),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem correct. Due to the distributed nature of Scylla and CDC, rows with recent cdc$time may appear after some delay, so ConfidenceWindowSize is supposed to protect from that - we don't read rows with cdc$time newer than Now() - ConfidenceWindowSize. Here, it looks like you may break that assumption as queryWindowRightEnd + PostQueryDelay may be further than confidenceWindowStart.

Do you remember why you introduced that change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. after reading my code multiple times now - I think it was because the window is calculated first, then the delay is resolved and executed. I think this was very weird to understand and maybe some wider refactoring should be done?

see ref:

wnd = sbr.getPollWindow()
var delay time.Duration
if err != nil {
delay = sbr.config.Advanced.PostFailedQueryDelay
} else {
delay = sbr.config.Advanced.PostQueryDelay
}
delayUntil := windowProcessingStartTime.Add(delay)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was one of those details I only was able to discover while debugging with the added detailed logs. I remember this was confusing and time consuming. ^^

With this in place I was able to achieve and actually see the exact 15s expected latency, as I mentioned in my 4th comment.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. after reading my code multiple times now - I think it was because the window is calculated first, then the delay is resolved and executed. I think this was very weird to understand and maybe some wider refactoring should be done?

Right, now I see that the delay is wrong. The window is calculated first, then there is a sleep, then the loop goes to the next iteration and uses the query window calculated before the sleep. This is clearly wrong, the window should be used in a query immediately after being computed. The proper fix would be to move wnd = sbr.getPollWindow() at the end of the loop.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or - even better - wnd := sbr.getPollWindow() should be put at the beginning of the for loop and the other calls to getPollWindow() removed. I really don't know why it wasn't written like that in the beginning...

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree that this code could use some refactoring :) Perhaps the logic responsible for tracking per-stream progress and calculating windows could be moved to an abstraction separate from the streamBatchReader.

@hartmut-co-uk hartmut-co-uk Oct 5, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, current state of the PR could be called a workaround at best 😅, glad I remembered why added this at least. Thanks for pointing out 🧐

Comment thread stream_batch.go
return pollWindow{
begin: windowStart,
end: gocql.MinTimeUUID(confidenceWindowStart),
end: gocql.MinTimeUUID(confidenceWindowStart.Add(sbr.config.Advanced.PostQueryDelay)),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same issue as above - here, the end will be later than confidenceWindowStart.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(see above)

Comment thread reader.go
// If the parameter is left as 0, the library will automatically adjust
// the length of the delay.
PostEmptyQueryDelay time.Duration
PostQueryDelay time.Duration

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this is a backwards-incompatible change and will require releasing a v2. I'd rather postpone it until more ideas for breaking changes accumulate and releasing v2 makes more sense.

Could you explain what is your motivation for unifying both parameters? The idea for having separate delays was that an empty query result was a signal that the library is polling too fast and wastes time on empty queries. Waiting longer for the next query should increase chances that the next query will have a non-empty result.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I can't remember. I think I couldn't see any valid use case.
For my own CDC scenarios I was keen to get/keep my latency from record write time to CDC event as low as possible.

I agree it's not a good idea to introduce a breaking change for this.

But I think it also had something to do with the problem mentioned in regards to getPollWindow() executed before the delay is happening... (see further down)

Comment thread stream_batch.go
if compareTimeuuid(wnd.begin, wnd.end) < 0 {
var iter *changeRowIterator
//sbr.config.Logger.Printf("queryRange: %s.%s :: %s (%s) [%s ... %s]",
// crq.keyspaceName, crq.tableName, crq.pkCondition, crq.bindArgs[0], wnd.begin.Time(), wnd.end.Time())

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not leave commented out code in the repository. Perhaps the Logger interface could be extended so that it understand various verbosity level and this message could have low verbosity (debug/trace)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine for me either way - I mostly wanted to help understand how I've been debugging this - so I committed this along with for review purposes.

If you actually would like to ultimately merge this PR, let me know if I should drop..

Comment thread utils.go
ppr.mu.Unlock()

// TODO: Log errors?
//ppr.logger.Printf("MarkProgress for %s: %s", ppr.reporter.streamID, timeToReport.Time())

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue with logging here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, same as above

// Make sure you create the following table before you run this example:
// CREATE TABLE ks.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH cdc = {'enabled': 'true'};

func main() {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what are the differences between simple-printer-stateful and complicated-consumer. Is there a good reason why you kept them separate? Perhaps the complicated-consumer could be improved in some way?

@hartmut-co-uk hartmut-co-uk Oct 5, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complicated-consumer

What do you mean by 'complicated-consumer'?

I think this simply was the example I've put together and was working with while debugging and attempting to fix the behaviour. So I've added it for reference.

Please feel free to keep, improve or drop. :-)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, "complicated-consumer" (which is actually called "complicated-printer") was an example that I apparently forgot to push. It's fine then, we can take your example instead.

@hartmut-co-uk

Copy link
Copy Markdown
Contributor Author

thx @piodul - good to see some movement on the CDC front...
I'm still interested to contribute / help with feedback, but I'm quite out of touch with this PR, so I'll need some focus time to review again myself... :-)
I'll try to make time soon.

Comment thread stream_batch.go
return pollWindow{
begin: windowStart,
end: gocql.MinTimeUUID(queryWindowRightEnd),
end: gocql.MinTimeUUID(queryWindowRightEnd.Add(sbr.config.Advanced.PostQueryDelay)),

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. after reading my code multiple times now - I think it was because the window is calculated first, then the delay is resolved and executed. I think this was very weird to understand and maybe some wider refactoring should be done?

see ref:

wnd = sbr.getPollWindow()
var delay time.Duration
if err != nil {
delay = sbr.config.Advanced.PostFailedQueryDelay
} else {
delay = sbr.config.Advanced.PostQueryDelay
}
delayUntil := windowProcessingStartTime.Add(delay)

Comment thread stream_batch.go
return pollWindow{
begin: windowStart,
end: gocql.MinTimeUUID(confidenceWindowStart),
end: gocql.MinTimeUUID(confidenceWindowStart.Add(sbr.config.Advanced.PostQueryDelay)),

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(see above)

// Make sure you create the following table before you run this example:
// CREATE TABLE ks.tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH cdc = {'enabled': 'true'};

func main() {

@hartmut-co-uk hartmut-co-uk Oct 5, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complicated-consumer

What do you mean by 'complicated-consumer'?

I think this simply was the example I've put together and was working with while debugging and attempting to fix the behaviour. So I've added it for reference.

Please feel free to keep, improve or drop. :-)

Comment thread progress.go
tbpm.concurrentQueryLimiter.Acquire(ctx, 1)
defer tbpm.concurrentQueryLimiter.Release(1)

//log.Printf("SaveProgress for %s = %s\n", streamID, progress.LastProcessedRecordTime)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: here's one more unused log

Comment thread reader.go
// If the parameter is left as 0, the library will automatically adjust
// the length of the delay.
PostEmptyQueryDelay time.Duration
PostQueryDelay time.Duration

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I can't remember. I think I couldn't see any valid use case.
For my own CDC scenarios I was keen to get/keep my latency from record write time to CDC event as low as possible.

I agree it's not a good idea to introduce a breaking change for this.

But I think it also had something to do with the problem mentioned in regards to getPollWindow() executed before the delay is happening... (see further down)

Comment thread stream_batch.go
if compareTimeuuid(wnd.begin, wnd.end) < 0 {
var iter *changeRowIterator
//sbr.config.Logger.Printf("queryRange: %s.%s :: %s (%s) [%s ... %s]",
// crq.keyspaceName, crq.tableName, crq.pkCondition, crq.bindArgs[0], wnd.begin.Time(), wnd.end.Time())

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine for me either way - I mostly wanted to help understand how I've been debugging this - so I committed this along with for review purposes.

If you actually would like to ultimately merge this PR, let me know if I should drop..

Comment thread utils.go
ppr.mu.Unlock()

// TODO: Log errors?
//ppr.logger.Printf("MarkProgress for %s: %s", ppr.reporter.streamID, timeToReport.Time())

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, same as above

@hartmut-co-uk

Copy link
Copy Markdown
Contributor Author

@piodul shall I close this PR?
Or would you want to reuse anything else or some of the suggestions?
e.g.

Options for further improvements:

  • gracefully shutdown the reader+consumers allowing to mark/save current progress before exit (~call consumer.End() / reporter.SaveAndStop(ctx context.Context))

@dkropachev

Copy link
Copy Markdown
Collaborator

@hartmut-co-uk , thanks for your contribution, this project recently have changed it's maintainers, can you please let me know if you are willing to continue work on this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants