forked from raystack/optimus
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathupstream_resolver.go
More file actions
139 lines (112 loc) · 5.99 KB
/
Copy pathupstream_resolver.go
File metadata and controls
139 lines (112 loc) · 5.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package resolver
import (
"context"
"fmt"
"github.com/goto/optimus/core/job"
"github.com/goto/optimus/core/resource"
"github.com/goto/optimus/core/tenant"
"github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/internal/writer"
)
const (
ConcurrentTicketPerSec = 50
ConcurrentLimit = 100
)
type UpstreamResolver struct {
jobRepository JobRepository
externalUpstreamResolver ExternalUpstreamResolver
internalUpstreamResolver InternalUpstreamResolver
}
func NewUpstreamResolver(jobRepository JobRepository, externalUpstreamResolver ExternalUpstreamResolver, internalUpstreamResolver InternalUpstreamResolver) *UpstreamResolver {
return &UpstreamResolver{jobRepository: jobRepository, externalUpstreamResolver: externalUpstreamResolver, internalUpstreamResolver: internalUpstreamResolver}
}
type ExternalUpstreamResolver interface {
Resolve(ctx context.Context, jobWithUpstream *job.WithUpstream, lw writer.LogWriter) (*job.WithUpstream, error)
BulkResolve(context.Context, []*job.WithUpstream, writer.LogWriter) ([]*job.WithUpstream, error)
}
type InternalUpstreamResolver interface {
Resolve(context.Context, *job.WithUpstream) (*job.WithUpstream, error)
BulkResolve(context.Context, tenant.ProjectName, []*job.WithUpstream) ([]*job.WithUpstream, error)
}
type JobRepository interface {
ResolveUpstreams(ctx context.Context, projectName tenant.ProjectName, jobNames []job.Name) (map[job.Name][]*job.Upstream, error)
GetAllEnabledByResourceDestination(ctx context.Context, resourceDestination resource.URN) ([]*job.Job, error)
GetEnabledJobByName(ctx context.Context, projectName tenant.ProjectName, jobName job.Name) (*job.Job, error)
}
func (u UpstreamResolver) CheckStaticResolvable(ctx context.Context, tnnt tenant.Tenant, incomingJobs []*job.Job, logWriter writer.LogWriter) error {
me := errors.NewMultiError("check static resolvable incomingJobs errors")
incomingJobNameMap := make(map[job.Name]*job.Job)
for _, incomingJob := range incomingJobs {
incomingJobNameMap[incomingJob.Spec().Name()] = incomingJob
}
jobsWithUnresolvedStaticUpstream, err := job.Jobs(incomingJobs).GetJobsWithUnresolvedStaticUpstreams()
me.Append(err)
jobsWithResolvedStaticInternalUpstreams, err := u.internalUpstreamResolver.BulkResolve(ctx, tnnt.ProjectName(), jobsWithUnresolvedStaticUpstream)
me.Append(err)
jobsWithResolvedStaticExternalUpstreams, err := u.externalUpstreamResolver.BulkResolve(ctx, jobsWithResolvedStaticInternalUpstreams, logWriter)
me.Append(err)
me.Append(checkForUnresolvedStaticUpstreams(tnnt, incomingJobNameMap, jobsWithResolvedStaticExternalUpstreams, logWriter))
return me.ToErr()
}
func (u UpstreamResolver) BulkResolve(ctx context.Context, projectName tenant.ProjectName, jobs []*job.Job, logWriter writer.LogWriter) ([]*job.WithUpstream, error) {
me := errors.NewMultiError("bulk resolve jobs errors")
jobsWithUnresolvedUpstream, err := job.Jobs(jobs).GetJobsWithUnresolvedUpstreams()
if err != nil {
errorMsg := fmt.Sprintf("[%s] %s", jobs[0].Tenant().NamespaceName().String(), err.Error())
logWriter.Write(writer.LogLevelError, errorMsg)
me.Append(err)
}
jobsWithResolvedInternalUpstreams, err := u.internalUpstreamResolver.BulkResolve(ctx, projectName, jobsWithUnresolvedUpstream)
if err != nil {
errorMsg := fmt.Sprintf("unable to resolve upstream: %s", err.Error())
logWriter.Write(writer.LogLevelError, errorMsg)
me.Append(errors.NewError(errors.ErrInternalError, job.EntityJob, errorMsg))
return nil, me.ToErr()
}
jobsWithResolvedExternalUpstreams, err := u.externalUpstreamResolver.BulkResolve(ctx, jobsWithResolvedInternalUpstreams, logWriter)
me.Append(err)
me.Append(u.getUnresolvedUpstreamsErrors(jobsWithResolvedExternalUpstreams, logWriter))
return jobsWithResolvedExternalUpstreams, me.ToErr()
}
func (u UpstreamResolver) Resolve(ctx context.Context, subjectJob *job.Job, logWriter writer.LogWriter) ([]*job.Upstream, error) {
me := errors.NewMultiError("upstream resolution errors")
jobWithUnresolvedUpstream, err := subjectJob.GetJobWithUnresolvedUpstream()
me.Append(err)
jobWithInternalUpstream, err := u.internalUpstreamResolver.Resolve(ctx, jobWithUnresolvedUpstream)
me.Append(err)
jobWithInternalExternalUpstream, err := u.externalUpstreamResolver.Resolve(ctx, jobWithInternalUpstream, logWriter)
me.Append(err)
return jobWithInternalExternalUpstream.Upstreams(), me.ToErr()
}
func (UpstreamResolver) getUnresolvedUpstreamsErrors(jobsWithUpstreams []*job.WithUpstream, logWriter writer.LogWriter) error {
me := errors.NewMultiError("unresolved upstreams errors")
for _, jobWithUpstreams := range jobsWithUpstreams {
for _, unresolvedUpstream := range jobWithUpstreams.GetUnresolvedUpstreams() {
if unresolvedUpstream.Type() == job.UpstreamTypeStatic {
errMsg := fmt.Sprintf("[%s] found unknown upstream for job %s: %s", jobWithUpstreams.Job().Tenant().NamespaceName().String(), jobWithUpstreams.Name().String(), unresolvedUpstream.FullName())
logWriter.Write(writer.LogLevelError, errMsg)
me.Append(errors.NewError(errors.ErrNotFound, job.EntityJob, errMsg))
}
}
}
return me.ToErr()
}
func checkForUnresolvedStaticUpstreams(tnnt tenant.Tenant, incomingJobNameMap map[job.Name]*job.Job, jobs []*job.WithUpstream, logWriter writer.LogWriter) error {
me := errors.NewMultiError("check for unresolved static upstreams errors")
for _, jobObj := range jobs {
for _, upstream := range jobObj.Upstreams() {
if upstream.Type() == job.UpstreamTypeInferred {
continue
}
if upstream.State() == job.UpstreamStateResolved {
continue
}
if _, ok := incomingJobNameMap[upstream.Name()]; ok {
logWriter.Write(writer.LogLevelInfo, fmt.Sprintf("[%s] %s dependency: %s, for job: %s, is part of the incoming jobs themselves", tnnt.NamespaceName(), upstream.Type(), upstream.Name(), jobObj.GetName()))
continue
}
me.Append(errors.NewError(errors.ErrInvalidState, job.EntityJob, fmt.Sprintf("could not resolve for %s upstream: %s, for job: %s", upstream.Type(), upstream.FullName(), jobObj.GetName())))
}
}
return me.ToErr()
}