-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-18988: Connect Multiversion Support (Updates to status and metrics) #17988
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
Hi @gharris1727. Apologies for the delay in getting this PR updated. Please review when you get some time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @snehashisp! Similar to the last PR, can you open a JIRA for this PR?
And are there any further PRs after this one? More code, system tests, documentation, etc?
@@ -34,18 +34,29 @@ public enum State { | |||
private final State state; | |||
private final String trace; | |||
private final String workerId; | |||
private final String version; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this also be part of equals/hashCode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it should be, thanks for catching this.
public String connectorVersion(String connName) { | ||
if (connectors.get(connName) == null) { | ||
return null; | ||
} | ||
return connectors.get(connName).connectorVersion(); | ||
} | ||
|
||
public String taskVersion(ConnectorTaskId taskId) { | ||
if (tasks.get(taskId) == null) { | ||
return null; | ||
} | ||
return tasks.get(taskId).taskVersion(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are thread-unsafe and could cause NullPointerExceptions.
@@ -391,6 +391,11 @@ public void execute() { | |||
finalOffsetCommit(false); | |||
} | |||
|
|||
@Override | |||
public String taskVersion() { | |||
return task.version(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a call into the real task object, which requires a context classloader swap, can throw arbitary exceptions, and may block indefinitely. We should either use Plugins#pluginVersion to compute this, or compute it once during instantiation and then return it whenever necessary.
The same comment applies to WorkerConnector, WorkerSinkTask, and TaskPluginsMetadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opted to compute it during initialization. Plugins don't have description entries for tasks, only connectors so fetching the task versions would be complicated and require us to somehow get the associated connector version. This may also not be accurate if the version overrides for connector and its task don't match.
public ConnectorStatus(String connector, State state, String msg, String workerUrl, int generation, String version) { | ||
super(connector, state, workerUrl, generation, msg, version); | ||
} | ||
|
||
public ConnectorStatus(String connector, State state, String msg, String workerUrl, int generation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This constructor is unused.
public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int generation, String trace, String version) { | ||
super(id, state, workerUrl, generation, trace, version); | ||
} | ||
|
||
public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int generation, String trace) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This method is unused.
@@ -87,14 +90,26 @@ public String workerId() { | |||
public String trace() { | |||
return trace; | |||
} | |||
|
|||
@JsonProperty | |||
@JsonInclude(JsonInclude.Include.NON_EMPTY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other place in the REST API that surfaces the version is the PluginInfo object, which filters out "undefined"
. We should probably be consistent here.
That doesn't filter out null
as far as i can tell, maybe it should be modified to filter out both null and undefined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can keep this consistent and add the custom filter here too. Wdyt of just showing the version (even if its null or undefined) with multi-versioning support. People who are unaware of the internals may be confused as to why its not showing up at all if the version is not set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wdyt of just showing the version (even if its null or undefined) with multi-versioning support.
I'm a bit ambivalent on explicit null
s, I think the more important thing to me is that "null"
and "undefined"
never appear in the results. That just looks like poor data quality to my eyes.
People who are unaware of the internals may be confused as to why its not showing up at all if the version is not set.
Can you elaborate? What is the situation where this happens and how might it mislead someone?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not so much as misleading users, rather it might cause them to wonder why some plugins don't show any information on what version is installed. Say if they have a bunch of legacy plugins that don't properly set a version. Of course, if they look at the code they can make an educated guess and figure out that they don't implement Versioned and likewise, but it might be good to show null/undefined to make it more readily apparent. Either way is fine by me and we can mention the behavior we choose in the docs.
this.connectorVersion = plugins.pluginVersion(connectorClass.getName(), connectorClass.getClassLoader(), PluginType.SINK, PluginType.SOURCE); | ||
this.connectorType = getConnectorType(connectorClass, pluginLoaderSwapper); | ||
this.taskClass = task.getClass().getName(); | ||
this.taskVersion = task.version(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a direct call to the task which would require a loader swap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is being called during task build, after switching the context classloader to the connector loader in the calling methods.
@@ -379,6 +435,7 @@ public ConnectMetricsRegistry(Set<String> tags) { | |||
"The number of failed writes to the dead letter queue.", taskErrorHandlingTags); | |||
lastErrorTimestamp = createTemplate("last-error-timestamp", TASK_ERROR_HANDLING_GROUP_NAME, | |||
"The epoch timestamp when this task last encountered an error.", taskErrorHandlingTags); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
@@ -510,4 +512,16 @@ public static VersionRange connectorVersionRequirement(String version) throws In | |||
version = "[" + version + "]"; | |||
return VersionRange.createFromVersionSpec(version); | |||
} | |||
|
|||
public static <T> String getVersionOrUndefined(T obj, Function<ClassLoader, LoaderSwap> pluginLoaderSwapper) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO this is an unhealthy method to add. Arbitrary code should not be deriving the version of a plugin from the object, the version should be known at the time the object is being instantiated and should be tracked in parallel.
Essentially, I believe the Versioned#version method should only be called during plugin discovery, and everything downstream from that should use whatever result was returned then.
And interestingly enough, this method doesn't return the right answer for general Task instances :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, because task does not implement Versioned rather has explicitly defines it as a interface method. Anyway, I removed this as per suggestion and opted to used Plugins#PluginVersion instead.
@@ -61,15 +61,18 @@ public ConnectorType type() { | |||
return type; | |||
} | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newline
Hi @gharris1727. Thanks for the review. Will address the comments as soon as I get some time. Also yes, there are more PRs which adds tests for the newly added code in #18325, #18326 and #18360. I raised them quite some time ago but have not updated them with the latest changes. Will address those once this PR is merged. |
Hi @gharris1727. Please follow up on this review and the reviews for the subsequent test PRs, #18325, #18326 and #18360 |
The is one of a set of PRs for KIP-891. The list of total PRs given below all build one the previous one in the list. They can be reviewed individually, or if the complete set of changes is preferrable, please refer to the last PR.
This is PR#5 and contains updates to show the version of the running connector and task in the status endpoint. JMX task metrics have added connector, task and converter version. New metric groups are introduced for transformation.
Committer Checklist (excluded from commit message)