-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(clusteriing): introduce incremental sync for clustering #13157
base: master
Are you sure you want to change the base?
Conversation
88e177a
to
e64c085
Compare
e64c085
to
2692ea9
Compare
856ca04
to
50528fd
Compare
dbe6259
to
573852d
Compare
bebd9ed
to
68849c4
Compare
a1be884
to
6307e91
Compare
This reverts commit 6307e91.
|
||
|
||
local function get_all_nodes_with_sync_cap() | ||
local res, err = kong.db.clustering_data_planes:page(DEFAULT_PAGE_SIZE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we not use :each()
here instead of simply fetching one page? It seems that this function will only ever return DEFAULT_PAGE_SIZE
entries even if there are more in the database.
end | ||
|
||
|
||
function _M:notify_all_nodes(new_version) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new_version
param appears to be unused.
manager.callbacks:register("kong.sync.v2.notify_new_version", function(node_id, new_versions) | ||
-- currently only default is supported, and anything else is ignored | ||
for namespace, new_version in pairs(new_versions) do | ||
if namespace == "default" then | ||
local version = new_version.new_version | ||
if not version then | ||
return nil, "'new_version' key does not exist" | ||
end | ||
|
||
local lmdb_ver = tonumber(declarative.get_current_hash()) or 0 | ||
if lmdb_ver < version then | ||
return self:sync_once() | ||
end | ||
|
||
return true | ||
end | ||
end | ||
|
||
return nil, "default namespace does not exist inside params" | ||
end) | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible for the value of new_versions
to contain multiple entries for the default
namespace, like this?
{
{ namespace = "default", new_version = 1 },
{ namespace = "default", new_version = 3 },
}
If so, then this logic might be faulty. If lmdb_ver = 2
, then we will exit the loop after one iteration (because lmdb_ver (2) < version (1)
) instead of evaluating the next entry which has new_version = 3
and would trigger a sync operation.
|
||
|
||
function _M:sync_once(delay) | ||
local hdl, err = ngx.timer.at(delay or 0, function(premature) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This timer handler doesn't capture any upvalues from the surrounding function scope of _M:sync_once
. Should we transform it into a reusable function at the module level instead?
end | ||
|
||
local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function() | ||
-- here must be 2 times |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain why 2 times in the comment?
if ns_delta.wipe then | ||
kong.core_cache:purge() | ||
kong.cache:purge() | ||
|
||
else | ||
for _, event in ipairs(crud_events) do | ||
-- delta_type, crud_event_type, delta.row, old_entity | ||
db[event[1]]:post_crud_event(event[2], event[3], event[4]) | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me why purging the cache and executing the event handlers are mutually-exclusive outcomes. Can you explain why we skip the crud events when ns_delta.wipe
is truthy?
-- TODO: what is the proper value? | ||
local FIRST_SYNC_DELAY = 0.5 -- seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would think we would want to sync as soon as possible, no?
-- TODO: what is the proper value? | |
local FIRST_SYNC_DELAY = 0.5 -- seconds | |
local FIRST_SYNC_DELAY = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our test suits we may need to wait cp to start, here 0.5 second perhaps is a proper number, or else the next sync will be 30 seconds later.
Sure, we need to find the appropriate number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, interesting. To me, this seems like this ought to be solved at the RPC level with events/callbacks:
- RPC subsystem (client/driver) establishes a connection as soon as possible (nginx
init_worker
/ngx.timer.at(0, ...)
) with a retry/exponential back-off policy
a. Attempts to send an RPC call before the connection is established yield an error - On establishing a successful connection, the RPC subsystem emits a worker-level event*
During nginx init
, incremental sync subsystem can register a callback for this "on-connect" event and call :sync_once()
in addition to registering the recurring sync calls every EACH_SYNC_DELAY
seconds. This way we also execute sync ASAP when a connection is lost and re-established. If we add a similar "on-disconnect" event, incremental sync subsystem can use it to cancel the recurring sync timer.
* "worker-level event" doesn't necessarily imply that we need to use kong.worker_events
to drive this behavior. It could be a simple callback mechanism whereby kong.rpc:on_connect("handler-name", callback)
adds the callback function to a table, and the RPC subsystem executes those functions at the appropriate time.
|
||
function _M:get_latest_version() | ||
local sql = "SELECT MAX(version) AS max_version FROM clustering_sync_version" | ||
return self.connector:query(sql)[1].max_version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other methods in this file all return directly from connector:query()
, so I assume they will return nil, string
in case of error.
On the other hand, this method may raise an error due to invalid table access if there is a failure in connector:query()
.
I suggest we make the error-handling more consistent:
return self.connector:query(sql)[1].max_version | |
local rows, err = self.connector:query(sql) | |
if not rows then | |
return nil, err | |
end | |
-- should we raise/return an error if no rows/max_version are returned? | |
-- or should we return a default value of 0? | |
return rows[1] and rows[1].max_version |
If it's intended to raise an error when connector:query()
fails, then I suggest using assert()
or error()
to make the intention more explicit.
We should also codify the behavior in the case that clustering_sync_version
is empty. I'm seeing code elsewhere that assumes it cannot return nil
. Is the caller incorrect?
-- location: kong/clustering/services/sync/rpc.lua
function _M:init_cp(manager)
-- [...]
if default_namespace_version == 0 or
self.strategy:get_latest_version() - default_namespace_version > FULL_SYNC_THRESHOLD
then
-- [...]
CREATE TABLE IF NOT EXISTS clustering_sync_delta ( | ||
"version" INT NOT NULL, | ||
"type" TEXT NOT NULL, | ||
"id" UUID NOT NULL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if there is an entity without an id
attribute or with an id
that isn't a UUID type? Is it possible to create such an entity in Kong via a 3rd party plugin dao, or will the metaschema prevent this condition?
@@ -1,26 +1,31 @@ | |||
local declarative_config = require "kong.db.schema.others.declarative_config" | |||
local workspaces = require "kong.workspaces" | |||
local declarative_config = require("kong.db.schema.others.declarative_config") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there somewhere where I can read about the changes to declarative/dbless in this PR and why they are needed for incremental sync? This will help with reviewing.
KAG-4865
KAG-2986
KAG-2987
KAG-3502
KAG-3258
KAG-5283
Flaky (perhaps need rerun):