Skip to content

Commit d0f57cc

Browse files
authored
feat(connector): look up connector by project and domain (#7378)
Signed-off-by: Kevin Su <pingsutw@gmail.com> Signed-off-by: Kevin Su <pingsutw@apache.org>
1 parent a3093e7 commit d0f57cc

4 files changed

Lines changed: 36 additions & 18 deletions

File tree

flyteplugins/go/tasks/plugins/webapi/connector/client.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func updateRegistry(
132132
IsConnectorApp: isConnectorApp,
133133
}
134134
supportedCategoryName := supportedCategory.GetName()
135-
registryKey := RegistryKey{domain: connectorDeployment.Domain, taskTypeName: supportedCategoryName, taskTypeVersion: supportedCategory.GetVersion()}
135+
registryKey := RegistryKey{project: connectorDeployment.Project, domain: connectorDeployment.Domain, taskTypeName: supportedCategoryName, taskTypeVersion: supportedCategory.GetVersion()}
136136
newConnectorRegistry[registryKey] = connector
137137
connectorSupportedTaskCategories[supportedCategoryName] = struct{}{}
138138
}
@@ -162,7 +162,7 @@ func getConnectorRegistry(ctx context.Context, cs *ClientSet) Registry {
162162
// If the connector doesn't implement the metadata service, we construct the registry based on the configuration
163163
for taskType, connectorDeploymentID := range cfg.ConnectorForTaskTypes {
164164
if connectorDeployment, ok := cfg.ConnectorDeployments[connectorDeploymentID]; ok {
165-
registryKey := RegistryKey{domain: connectorDeployment.Domain, taskTypeName: taskType, taskTypeVersion: defaultTaskTypeVersion}
165+
registryKey := RegistryKey{project: connectorDeployment.Project, domain: connectorDeployment.Domain, taskTypeName: taskType, taskTypeVersion: defaultTaskTypeVersion}
166166
connector := &Connector{
167167
ConnectorDeployment: connectorDeployment,
168168
ConnectorID: connectorDeploymentID,
@@ -174,7 +174,7 @@ func getConnectorRegistry(ctx context.Context, cs *ClientSet) Registry {
174174

175175
// Ensure that the old configuration is backward compatible
176176
for _, taskType := range cfg.SupportedTaskTypes {
177-
registryKey := RegistryKey{domain: cfg.DefaultConnector.Domain, taskTypeName: taskType, taskTypeVersion: defaultTaskTypeVersion}
177+
registryKey := RegistryKey{project: cfg.DefaultConnector.Project, domain: cfg.DefaultConnector.Domain, taskTypeName: taskType, taskTypeVersion: defaultTaskTypeVersion}
178178
if _, ok := newConnectorRegistry[registryKey]; !ok {
179179
connector := &Connector{
180180
ConnectorDeployment: &cfg.DefaultConnector,

flyteplugins/go/tasks/plugins/webapi/connector/config.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,10 @@ type Deployment struct {
101101
// DefaultTimeout gives the default RPC timeout if a more specific one is not defined in Timeouts; if neither DefaultTimeout nor Timeouts is defined for an operation, RPC timeout will not be enforced
102102
DefaultTimeout config.Duration `json:"defaultTimeout,omitempty" yaml:"defaultTimeout,omitempty"`
103103

104-
// The tasks in this domain will be handled by this agent
104+
// This connector will handle the tasks in this project
105+
Project string `json:"project,omitempty" yaml:"project,omitempty"`
106+
107+
// This connector will handle the tasks in this domain
105108
Domain string `json:"domain,omitempty" yaml:"domain,omitempty"`
106109
}
107110

flyteplugins/go/tasks/plugins/webapi/connector/plugin.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func (p *ConnectorService) SetSupportedTaskType(taskTypes []string) {
5454
}
5555

5656
type RegistryKey struct {
57+
project string
5758
domain string
5859
taskTypeName string
5960
taskTypeVersion int32
@@ -98,6 +99,7 @@ type ResourceMetaWrapper struct {
9899
OutputPrefix string
99100
ConnectorResourceMeta []byte
100101
TaskCategory *connectorPb.TaskCategory
102+
Project string
101103
Domain string
102104
Connection *flyteIdl.Connection
103105
}
@@ -152,7 +154,9 @@ func (p *Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContext
152154
outputPrefix := taskCtx.OutputWriter().GetOutputPrefixPath().String()
153155

154156
taskCategory := connectorPb.TaskCategory{Name: taskTemplate.GetType(), Version: taskTemplate.GetTaskTypeVersion()}
155-
connector, err := p.getFinalConnector(&taskCategory, p.cfg, taskTemplate.GetId().GetDomain())
157+
project := taskTemplate.GetId().GetProject()
158+
domain := taskTemplate.GetId().GetDomain()
159+
connector, err := p.getFinalConnector(&taskCategory, p.cfg, project, domain)
156160
if err != nil {
157161
return nil, nil, err
158162
}
@@ -215,13 +219,14 @@ func (p *Plugin) Create(ctx context.Context, taskCtx webapi.TaskExecutionContext
215219
ConnectorResourceMeta: res.GetResourceMeta(),
216220
TaskCategory: &taskCategory,
217221
Connection: &connection,
218-
Domain: taskTemplate.GetId().GetDomain(),
222+
Project: project,
223+
Domain: domain,
219224
}, nil, nil
220225
}
221226

222227
func (p *Plugin) Get(ctx context.Context, taskCtx webapi.GetContext) (latest webapi.Resource, err error) {
223228
metadata := taskCtx.ResourceMeta().(ResourceMetaWrapper)
224-
connector, err := p.getFinalConnector(metadata.TaskCategory, p.cfg, metadata.Domain)
229+
connector, err := p.getFinalConnector(metadata.TaskCategory, p.cfg, metadata.Project, metadata.Domain)
225230
if err != nil {
226231
return nil, err
227232
}
@@ -260,7 +265,7 @@ func (p *Plugin) Delete(ctx context.Context, taskCtx webapi.DeleteContext) error
260265
return nil
261266
}
262267
metadata := taskCtx.ResourceMeta().(ResourceMetaWrapper)
263-
connector, err := p.getFinalConnector(metadata.TaskCategory, p.cfg, metadata.Domain)
268+
connector, err := p.getFinalConnector(metadata.TaskCategory, p.cfg, metadata.Project, metadata.Domain)
264269
if err != nil {
265270
return err
266271
}
@@ -386,22 +391,32 @@ func (p *Plugin) watchConnectors(ctx context.Context, connectorService *Connecto
386391
}, p.cfg.PollInterval.Duration, ctx.Done())
387392
}
388393

389-
func (p *Plugin) getFinalConnector(taskCategory *connectorPb.TaskCategory, cfg *Config, domain string) (*Connector, error) {
394+
func (p *Plugin) getFinalConnector(taskCategory *connectorPb.TaskCategory, cfg *Config, project, domain string) (*Connector, error) {
390395
p.mu.RLock()
391396
defer p.mu.RUnlock()
392397

393-
registryKey := RegistryKey{domain: domain, taskTypeName: taskCategory.GetName(), taskTypeVersion: taskCategory.GetVersion()}
398+
taskTypeName := taskCategory.GetName()
399+
taskTypeVersion := taskCategory.GetVersion()
400+
401+
registryKey := RegistryKey{project: project, domain: domain, taskTypeName: taskTypeName, taskTypeVersion: taskTypeVersion}
402+
if connector, exists := p.registry[registryKey]; exists {
403+
return connector, nil
404+
}
405+
logger.Debugf(context.Background(), "No connector found for task type [%s] and version [%d] in project [%s] domain [%s].", taskTypeName, taskTypeVersion, project, domain)
406+
407+
// Fall back to a connector registered for the domain across all projects.
408+
registryKey = RegistryKey{project: "", domain: domain, taskTypeName: taskTypeName, taskTypeVersion: taskTypeVersion}
394409
if connector, exists := p.registry[registryKey]; exists {
395410
return connector, nil
396411
}
397-
logger.Debugf(context.Background(), "No connector found for task type [%s] and version [%d] in domain [%s].", taskCategory.GetName(), taskCategory.GetVersion(), domain)
412+
logger.Debugf(context.Background(), "No connector found for task type [%s] and version [%d] in domain [%s].", taskTypeName, taskTypeVersion, domain)
398413

399-
// Use the connector that supports across all domains.
400-
registryKey = RegistryKey{domain: "", taskTypeName: taskCategory.GetName(), taskTypeVersion: taskCategory.GetVersion()}
414+
// Fall back to a connector that supports across all projects and domains.
415+
registryKey = RegistryKey{project: "", domain: "", taskTypeName: taskTypeName, taskTypeVersion: taskTypeVersion}
401416
if connector, exists := p.registry[registryKey]; exists {
402417
return connector, nil
403418
}
404-
logger.Debugf(context.Background(), "No connector found for task type [%s] and version [%d] in any domain.", taskCategory.GetName(), taskCategory.GetVersion())
419+
logger.Debugf(context.Background(), "No connector found for task type [%s] and version [%d] in any project or domain.", taskTypeName, taskTypeVersion)
405420

406421
if len(cfg.DefaultConnector.Endpoint) != 0 {
407422
return &Connector{

flyteplugins/go/tasks/plugins/webapi/connector/plugin_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,16 @@ func TestPlugin(t *testing.T) {
6767
ray := &connectorPb.TaskCategory{Name: "ray", Version: defaultTaskTypeVersion}
6868
foo := &connectorPb.TaskCategory{Name: "foo", Version: defaultTaskTypeVersion}
6969
bar := &connectorPb.TaskCategory{Name: "bar", Version: defaultTaskTypeVersion}
70-
connector, err := plugin.getFinalConnector(spark, &cfg, "")
70+
connector, err := plugin.getFinalConnector(spark, &cfg, "", "")
7171
assert.NoError(t, err)
7272
assert.Equal(t, connector.ConnectorDeployment.Endpoint, "localhost:80")
73-
connector, err = plugin.getFinalConnector(foo, &cfg, "")
73+
connector, err = plugin.getFinalConnector(foo, &cfg, "", "")
7474
assert.NoError(t, err)
7575
assert.Equal(t, connector.ConnectorDeployment.Endpoint, cfg.DefaultConnector.Endpoint)
76-
connector, err = plugin.getFinalConnector(bar, &cfg, "")
76+
connector, err = plugin.getFinalConnector(bar, &cfg, "", "")
7777
assert.NoError(t, err)
7878
assert.Equal(t, connector.ConnectorDeployment.Endpoint, cfg.DefaultConnector.Endpoint)
79-
connector, err = plugin.getFinalConnector(ray, &cfg, "production")
79+
connector, err = plugin.getFinalConnector(ray, &cfg, "", "production")
8080
assert.NoError(t, err)
8181
assert.Equal(t, connector.ConnectorDeployment.Endpoint, rayConnector.ConnectorDeployment.Endpoint)
8282
})

0 commit comments

Comments
 (0)