From c0dbff4b56562bfe45a8f0999cfa40a630d0438d Mon Sep 17 00:00:00 2001 From: Alexandra Parker Date: Tue, 25 Jan 2022 19:23:32 -0800 Subject: [PATCH 01/14] parallel container startup with deferred values This commit tries to be as unobtrusive as possible, attaching new behavior to existing types where possible rather than building out new infrastructure. constructorNode returns a deferred value when called. On the first call, it asks paramList to start building an arg slice, which may also be deferred. Once the arg slice is resolved, constructorNode schedules its constructor function to be called. Once it's called, it resolves its own deferral. Multiple paramSingles can observe the same constructorNode before it's ready. If there's an error, they may all see the same error, which is a change in behavior. There are two schedulers: synchronous and parallel. The synchronous scheduler returns things in the same order as before. The parallel may not (and the tests that rely on shuffle order will fail). The scheduler needs to be flushed after deferred values are created. The synchronous scheduler does nothing on when flushing, but the parallel scheduler runs a pool of goroutines to resolve constructors. Calls to dig functions always happen on the same goroutine as Scope.Invoke(). Calls to constructor functions can happen on pooled goroutines. The choice of scheduler is up to the Scope. Whether constructor functions are safe to call in parallel seems most logically to be a property of the scope, and the scope is passed down the constructor/param call chain. --- constructor.go | 84 +++++++++++++++-------- constructor_test.go | 8 ++- container.go | 26 +++++++ deferred.go | 104 ++++++++++++++++++++++++++++ dig_test.go | 89 ++++++++++++++++++++++++ invoke.go | 9 ++- param.go | 161 +++++++++++++++++++++++++------------------- param_test.go | 3 +- provide.go | 2 +- scheduler.go | 153 +++++++++++++++++++++++++++++++++++++++++ scope.go | 7 ++ 11 files changed, 546 insertions(+), 100 deletions(-) create mode 100644 deferred.go create mode 100644 scheduler.go diff --git a/constructor.go b/constructor.go index bab711aa..ffdf455f 100644 --- a/constructor.go +++ b/constructor.go @@ -45,12 +45,18 @@ type constructorNode struct { // id uniquely identifies the constructor that produces a node. id dot.CtorID + // Whether this node is already building its paramList and calling the constructor + calling bool + // Whether the constructor owned by this node was already called. called bool // Type information about constructor parameters. paramList paramList + // The result of calling the constructor + deferred deferred + // Type information about constructor results. resultList resultList @@ -122,42 +128,66 @@ func (n *constructorNode) String() string { return fmt.Sprintf("deps: %v, ctor: %v", n.paramList, n.ctype) } -// Call calls this constructor if it hasn't already been called and -// injects any values produced by it into the provided container. -func (n *constructorNode) Call(c containerStore) error { - if n.called { - return nil +// Call calls this constructor if it hasn't already been called and injects any values produced by it into the container +// passed to newConstructorNode. +// +// If constructorNode has a unresolved deferred already in the process of building, it will return that one. If it has +// already been successfully called, it will return an already-resolved deferred. Together these mean it will try the +// call again if it failed last time. +// +// On failure, the returned pointer is not guaranteed to stay in a failed state; another call will reset it back to its +// zero value; don't store the returned pointer. (It will still call each observer only once.) +func (n *constructorNode) Call(c containerStore) *deferred { + if n.calling || n.called { + return &n.deferred } + n.calling = true + n.deferred = deferred{} + if err := shallowCheckDependencies(c, n.paramList); err != nil { - return errMissingDependencies{ + n.deferred.resolve(errMissingDependencies{ Func: n.location, Reason: err, - } + }) } - args, err := n.paramList.BuildList(c, false /* decorating */) - if err != nil { - return errArgumentsFailed{ - Func: n.location, - Reason: err, + var args []reflect.Value + d := n.paramList.BuildList(c, false /* decorating */, &args) + + d.observe(func(err error) { + if err != nil { + n.calling = false + n.deferred.resolve(errArgumentsFailed{ + Func: n.location, + Reason: err, + }) + return } - } - - receiver := newStagingContainerWriter() - results := c.invoker()(reflect.ValueOf(n.ctor), args) - if err := n.resultList.ExtractList(receiver, false /* decorating */, results); err != nil { - return errConstructorFailed{Func: n.location, Reason: err} - } - - // Commit the result to the original container that this constructor - // was supplied to. The provided constructor is only used for a view of - // the rest of the graph to instantiate the dependencies of this - // container. - receiver.Commit(n.s) - n.called = true - return nil + var results []reflect.Value + + c.scheduler().schedule(func() { + results = c.invoker()(reflect.ValueOf(n.ctor), args) + }).observe(func(_ error) { + n.calling = false + receiver := newStagingContainerWriter() + if err := n.resultList.ExtractList(receiver, false /* decorating */, results); err != nil { + n.deferred.resolve(errConstructorFailed{Func: n.location, Reason: err}) + return + } + + // Commit the result to the original container that this constructor + // was supplied to. The provided container is only used for a view of + // the rest of the graph to instantiate the dependencies of this + // container. + receiver.Commit(n.s) + n.called = true + n.deferred.resolve(nil) + }) + }) + + return &n.deferred } // stagingContainerWriter is a containerWriter that records the changes that diff --git a/constructor_test.go b/constructor_test.go index 16fc2af7..992d06a4 100644 --- a/constructor_test.go +++ b/constructor_test.go @@ -59,7 +59,11 @@ func TestNodeAlreadyCalled(t *testing.T) { require.False(t, n.called, "node must not have been called") c := New() - require.NoError(t, n.Call(c.scope), "invoke failed") + d := n.Call(c.scope) + c.scope.sched.flush() + require.NoError(t, d.err, "invoke failed") require.True(t, n.called, "node must be called") - require.NoError(t, n.Call(c.scope), "calling again should be okay") + d = n.Call(c.scope) + c.scope.sched.flush() + require.NoError(t, d.err, "calling again should be okay") } diff --git a/container.go b/container.go index 7ab4575b..ac2decc7 100644 --- a/container.go +++ b/container.go @@ -142,6 +142,9 @@ type containerStore interface { // Returns invokerFn function to use when calling arguments. invoker() invokerFn + + // Returns the scheduler to use for this scope. + scheduler() scheduler } // New constructs a Container. @@ -231,6 +234,29 @@ func dryInvoker(fn reflect.Value, _ []reflect.Value) []reflect.Value { return results } +type maxConcurrencyOption int + +// MaxConcurrency run constructors in this container with a fixed pool of executor +// goroutines. max is the number of goroutines to start. +func MaxConcurrency(max int) Option { + return maxConcurrencyOption(max) +} + +func (m maxConcurrencyOption) applyOption(container *Container) { + container.scope.sched = ¶llelScheduler{concurrency: int(m)} +} + +type unboundedConcurrency struct{} + +// UnboundedConcurrency run constructors in this container as concurrently as possible. +// Go's resource limits like GOMAXPROCS will inherently limit how much can happen in +// parallel. +var UnboundedConcurrency Option = unboundedConcurrency{} + +func (u unboundedConcurrency) applyOption(container *Container) { + container.scope.sched = &unboundedScheduler{} +} + // String representation of the entire Container func (c *Container) String() string { return c.scope.String() diff --git a/deferred.go b/deferred.go new file mode 100644 index 00000000..001a6bd0 --- /dev/null +++ b/deferred.go @@ -0,0 +1,104 @@ +package dig + +type observer func(error) + +// A deferred is an observable future result that may fail. Its zero value is unresolved and has no observers. It can +// be resolved once, at which point every observer will be called. +type deferred struct { + observers []observer + settled bool + err error +} + +// alreadyResolved is a deferred that has already been resolved with a nil error. +var alreadyResolved = deferred{settled: true} + +// failedDeferred returns a deferred that is resolved with the given error. +func failedDeferred(err error) *deferred { + return &deferred{settled: true, err: err} +} + +// observe registers an observer to receive a callback when this deferred is resolved. It will be called at most one +// time. If this deferred is already resolved, the observer is called immediately, before observe returns. +func (d *deferred) observe(obs observer) { + if d.settled { + obs(d.err) + return + } + + d.observers = append(d.observers, obs) +} + +// resolve sets the status of this deferred and notifies all observers if it's not already resolved. +func (d *deferred) resolve(err error) { + if d.settled { + return + } + + d.settled = true + d.err = err + for _, obs := range d.observers { + obs(err) + } + d.observers = nil +} + +// then returns a new deferred that is either resolved with the same error as this deferred, or any error returned from +// the supplied function. The supplied function is only called if this deferred is resolved without error. +func (d *deferred) then(res func() error) *deferred { + d2 := new(deferred) + d.observe(func(err error) { + if err != nil { + d2.resolve(err) + return + } + d2.resolve(res()) + }) + return d2 +} + +// catch maps any error from this deferred using the supplied function. The supplied function is only called if this +// deferred is resolved with an error. If the supplied function returns a nil error, the new deferred will resolve +// successfully. +func (d *deferred) catch(rej func(error) error) *deferred { + d2 := new(deferred) + d.observe(func(err error) { + if err != nil { + err = rej(err) + } + d2.resolve(err) + }) + return d2 +} + +// whenAll returns a new deferred that resolves when all the supplied deferreds resolve. It resolves with the first +// error reported by any deferred, or nil if they all succeed. +func whenAll(others ...*deferred) *deferred { + if len(others) == 0 { + return &alreadyResolved + } + + d := new(deferred) + count := len(others) + + onResolved := func(err error) { + if d.settled { + return + } + + if err != nil { + d.resolve(err) + } + + count-- + if count == 0 { + d.resolve(nil) + } + } + + for _, other := range others { + other.observe(onResolved) + } + + return d +} diff --git a/dig_test.go b/dig_test.go index 2709466a..021ea95f 100644 --- a/dig_test.go +++ b/dig_test.go @@ -29,6 +29,7 @@ import ( "math/rand" "os" "reflect" + "sync/atomic" "testing" "time" @@ -3566,3 +3567,91 @@ func TestEndToEndSuccessWithAliases(t *testing.T) { }) } + +func TestConcurrency(t *testing.T) { + // Ensures providers will run at the same time + t.Run("TestMaxConcurrency", func(t *testing.T) { + t.Parallel() + + type ( + A int + B int + C int + ) + + var ( + timer = time.NewTimer(10 * time.Second) + max int32 = 3 + done = make(chan struct{}) + running int32 = 0 + waitForUs = func() error { + if atomic.AddInt32(&running, 1) == max { + close(done) + } + select { + case <-timer.C: + return errors.New("timeout expired") + case <-done: + return nil + } + } + c = digtest.New(t, dig.MaxConcurrency(int(max))) + ) + + c.RequireProvide(func() (A, error) { return 0, waitForUs() }) + c.RequireProvide(func() (B, error) { return 1, waitForUs() }) + c.RequireProvide(func() (C, error) { return 2, waitForUs() }) + + c.RequireInvoke(func(a A, b B, c C) { + require.Equal(t, a, A(0)) + require.Equal(t, b, B(1)) + require.Equal(t, c, C(2)) + require.Equal(t, running, int32(3)) + }) + }) + + t.Run("TestUnboundConcurrency", func(t *testing.T) { + t.Parallel() + + var ( + timer = time.NewTimer(10 * time.Second) + max int32 = 20 + done = make(chan struct{}) + running int32 = 0 + waitForUs = func() error { + if atomic.AddInt32(&running, 1) >= max { + close(done) + } + select { + case <-timer.C: + return errors.New("timeout expired") + case <-done: + return nil + } + } + c = digtest.New(t, dig.UnboundedConcurrency) + expected []int + ) + + for i := 0; i < int(max); i++ { + i := i + expected = append(expected, i) + type out struct { + dig.Out + + Value int `group:"a"` + } + c.RequireProvide(func() (out, error) { return out{Value: i}, waitForUs() }) + } + + type in struct { + dig.In + + Values []int `group:"a"` + } + + c.RequireInvoke(func(i in) { + require.ElementsMatch(t, expected, i.Values) + }) + }) +} diff --git a/invoke.go b/invoke.go index 7506a509..d5f2566d 100644 --- a/invoke.go +++ b/invoke.go @@ -82,7 +82,14 @@ func (s *Scope) Invoke(function interface{}, opts ...InvokeOption) error { s.isVerifiedAcyclic = true } - args, err := pl.BuildList(s, false /* decorating */) + var args []reflect.Value + + d := pl.BuildList(s, &args, false /* decorating */) + d.observe(func(err2 error) { + err = err2 + }) + s.sched.flush() + if err != nil { return errArgumentsFailed{ Func: digreflect.InspectFunc(function), diff --git a/param.go b/param.go index 943b3216..1ce5ed29 100644 --- a/param.go +++ b/param.go @@ -46,10 +46,13 @@ type param interface { fmt.Stringer // Build this dependency and any of its dependencies from the provided - // Container. + // Container. It stores the result in the pointed-to reflect.Value, allocating + // it first if it points to an invalid reflect.Value. + // + // Build returns a deferred that resolves once the reflect.Value is filled in. // // This MAY panic if the param does not produce a single value. - Build(store containerStore, decorating bool) (reflect.Value, error) + Build(store containerStore, decorating bool, target *reflect.Value) *deferred // DotParam returns a slice of dot.Param(s). DotParam() []*dot.Param @@ -137,23 +140,21 @@ func newParamList(ctype reflect.Type, c containerStore) (paramList, error) { return pl, nil } -func (pl paramList) Build(containerStore, bool) (reflect.Value, error) { +func (pl paramList) Build(containerStore, bool, *reflect.Value) *deferred { digerror.BugPanicf("paramList.Build() must never be called") panic("") // Unreachable, as BugPanicf above will panic. } -// BuildList returns an ordered list of values which may be passed directly -// to the underlying constructor. -func (pl paramList) BuildList(c containerStore, decorating bool) ([]reflect.Value, error) { - args := make([]reflect.Value, len(pl.Params)) +// BuildList builds an ordered list of values which may be passed directly +// to the underlying constructor and stores them in the pointed-to slice. +// It returns a deferred that resolves when the slice is filled out. +func (pl paramList) BuildList(c containerStore, decorating bool, targets *[]reflect.Value) *deferred { + children := make([]*deferred, len(pl.Params)) + *targets = make([]reflect.Value, len(pl.Params)) for i, p := range pl.Params { - var err error - args[i], err = p.Build(c, decorating) - if err != nil { - return nil, err - } + children[i] = p.Build(c, decorating, &(*targets)[i]) } - return args, nil + return whenAll(children...) } // paramSingle is an explicitly requested type, optionally with a name. @@ -244,7 +245,11 @@ func (ps paramSingle) buildWithDecorators(c containerStore) (v reflect.Value, fo return } -func (ps paramSingle) Build(c containerStore, decorating bool) (reflect.Value, error) { +func (ps paramSingle) Build(c containerStore, decorating bool, target *reflect.Value) *deferred { + if !target.IsValid() { + *target = reflect.New(ps.Type).Elem() + } + if !decorating { v, found, err := ps.buildWithDecorators(c) if found { @@ -258,7 +263,8 @@ func (ps paramSingle) Build(c containerStore, decorating bool) (reflect.Value, e } if v, ok := ps.getValue(c); ok { - return v, nil + target.Set(v) + return &alreadyResolved } // Starting at the given container and working our way up its parents, @@ -277,34 +283,52 @@ func (ps paramSingle) Build(c containerStore, decorating bool) (reflect.Value, e if len(providers) == 0 { if ps.Optional { - return reflect.Zero(ps.Type), nil + target.Set(reflect.Zero(ps.Type)) + return &alreadyResolved + } else { + return failedDeferred(newErrMissingTypes(c, key{name: ps.Name, t: ps.Type})) } - return _noValue, newErrMissingTypes(c, key{name: ps.Name, t: ps.Type}) } - for _, n := range providers { - err := n.Call(c) - if err == nil { - continue - } + var ( + doNext func(i int) + d = new(deferred) + ) - // If we're missing dependencies but the parameter itself is optional, - // we can just move on. - if _, ok := err.(errMissingDependencies); ok && ps.Optional { - return reflect.Zero(ps.Type), nil + doNext = func(i int) { + if i == len(providers) { + // If we get here, it's impossible for the value to be absent from the + // container. + v, _ := ps.getValue(c) + if v.IsValid() { + // Not valid during a dry run + target.Set(v) + } + d.resolve(nil) + return } - return _noValue, errParamSingleFailed{ - CtorID: n.ID(), - Key: key{t: ps.Type, name: ps.Name}, - Reason: err, - } + n := providers[i] + + n.Call(c).observe(func(err error) { + if err != nil { + // If we're missing dependencies but the parameter itself is optional, + // we can just move on. + if _, ok := err.(errMissingDependencies); !ok || !ps.Optional { + d.resolve(errParamSingleFailed{ + CtorID: n.ID(), + Key: key{t: ps.Type, name: ps.Name}, + Reason: err, + }) + return + } + } + doNext(i + 1) + }) } - // If we get here, it's impossible for the value to be absent from the - // container. - v, _ := ps.getValue(c) - return v, nil + doNext(0) + return d } // paramObject is a dig.In struct where each field is another param. @@ -391,16 +415,19 @@ func newParamObject(t reflect.Type, c containerStore) (paramObject, error) { return po, nil } -func (po paramObject) Build(c containerStore, decorating bool) (reflect.Value, error) { - dest := reflect.New(po.Type).Elem() - for _, f := range po.Fields { - v, err := f.Build(c, decorating) - if err != nil { - return dest, err - } - dest.Field(f.FieldIndex).Set(v) +func (po paramObject) Build(c containerStore, decorating bool, target *reflect.Value) *deferred { + if !target.IsValid() { + *target = reflect.New(po.Type).Elem() } - return dest, nil + + children := make([]*deferred, len(po.Fields)) + for i, f := range po.Fields { + f := f + field := target.Field(f.FieldIndex) + children[i] = f.Build(c, decorating, &field) + } + + return whenAll(children...) } // paramObjectField is a single field of a dig.In struct. @@ -466,12 +493,8 @@ func newParamObjectField(idx int, f reflect.StructField, c containerStore) (para return pof, nil } -func (pof paramObjectField) Build(c containerStore, decorating bool) (reflect.Value, error) { - v, err := pof.Param.Build(c, decorating) - if err != nil { - return v, err - } - return v, nil +func (pof paramObjectField) Build(c containerStore, decorating bool, target *reflect.Value) *deferred { + return pof.Param.Build(c, decorating, target) } // paramGroupedSlice is a param which produces a slice of values with the same @@ -573,25 +596,26 @@ func (pt paramGroupedSlice) callGroupDecorators(c containerStore) error { // search the given container and its parent for matching group providers and // call them to commit values. If an error is encountered, return the number // of providers called and a non-nil error from the first provided. -func (pt paramGroupedSlice) callGroupProviders(c containerStore) (int, error) { - itemCount := 0 +func (pt paramGroupedSlice) callGroupProviders(c containerStore) []*deferred { + var children []*deferred for _, c := range c.storesToRoot() { providers := c.getGroupProviders(pt.Group, pt.Type.Elem()) - itemCount += len(providers) for _, n := range providers { - if err := n.Call(c); err != nil { - return 0, errParamGroupFailed{ + n := n + child := n.Call(c) + children = append(children, child.catch(func(err error) error { + return errParamGroupFailed{ CtorID: n.ID(), Key: key{group: pt.Group, t: pt.Type.Elem()}, Reason: err, } - } + })) } } - return itemCount, nil + return children } -func (pt paramGroupedSlice) Build(c containerStore, decorating bool) (reflect.Value, error) { +func (pt paramGroupedSlice) Build(c containerStore, decorating bool, target *reflect.Value) *deferred { // do not call this if we are already inside a decorator since // it will result in an infinite recursion. (i.e. decorate -> params.BuildList() -> Decorate -> params.BuildList...) // this is safe since a value can be decorated at most once in a given scope. @@ -608,17 +632,18 @@ func (pt paramGroupedSlice) Build(c containerStore, decorating bool) (reflect.Va // If we do not have any decorated values, find the // providers and call them. - itemCount, err := pt.callGroupProviders(c) - if err != nil { - return _noValue, err - } + children := pt.callGroupProviders(c) - stores := c.storesToRoot() - result := reflect.MakeSlice(pt.Type, 0, itemCount) - for _, c := range stores { - result = reflect.Append(result, c.getValueGroup(pt.Group, pt.Type.Elem())...) - } - return result, nil + return whenAll(children...).then(func() error { + if !target.IsValid() { + *target = reflect.MakeSlice(pt.Type, 0, len(children)) + } + + for _, c := range c.storesToRoot() { + target.Set(reflect.Append(*target, c.getValueGroup(pt.Group, pt.Type.Elem())...)) + } + return nil + }) } // Checks if ignoring unexported files in an In struct is allowed. diff --git a/param_test.go b/param_test.go index 2ee90a8a..040c9626 100644 --- a/param_test.go +++ b/param_test.go @@ -33,7 +33,8 @@ func TestParamListBuild(t *testing.T) { p, err := newParamList(reflect.TypeOf(func() io.Writer { return nil }), newScope()) require.NoError(t, err) assert.Panics(t, func() { - p.Build(newScope(), false /* decorating */) + var target reflect.Value + p.Build(newScope(), false /* decorating */, &target) }) } diff --git a/provide.go b/provide.go index a8234c05..0ad9cd95 100644 --- a/provide.go +++ b/provide.go @@ -355,7 +355,7 @@ type provider interface { // // The values produced by this provider should be submitted into the // containerStore. - Call(containerStore) error + Call(store containerStore) *deferred CType() reflect.Type } diff --git a/scheduler.go b/scheduler.go new file mode 100644 index 00000000..7010023e --- /dev/null +++ b/scheduler.go @@ -0,0 +1,153 @@ +package dig + +// A scheduler queues work during resolution of params. +// constructorNode uses it to call its constructor function. +// This may happen in parallel with other calls (parallelScheduler) or +// synchronously, right when enqueued. +// +// Work is enqueued when building a paramList, but the user of scheduler +// must call flush() for asynchronous calls to proceed after the top-level +// paramList.BuildList() is called. +type scheduler interface { + // schedule will call a the supplied func. The deferred will resolve + // after the func is called. The func may be called before schedule + // returns. The deferred will be resolved on the "main" goroutine, so + // it's safe to mutate containerStore during its resolution. It will + // always be resolved with a nil error. + schedule(func()) *deferred + + // flush processes enqueued work. This may in turn enqueue more work; + // flush will keep processing the work until it's empty. After flush is + // called, every deferred returned from schedule will have been resolved. + // Asynchronous deferred values returned from schedule are resolved on the + // same goroutine as the one calling this method. + // + // The scheduler is ready for re-use after flush is called. + flush() +} + +// synchronousScheduler is stateless and calls funcs as soon as they are schedule. It produces +// the exact same results as the code before deferred was introduced. +type synchronousScheduler struct{} + +// schedule calls func and returns an already-resolved deferred. +func (s synchronousScheduler) schedule(fn func()) *deferred { + fn() + return &alreadyResolved +} + +// flush does nothing. All returned deferred values are already resolved. +func (s synchronousScheduler) flush() { + +} + +// task is used by parallelScheduler to remember which function to +// call and which deferred to notify afterwards. +type task struct { + fn func() + d *deferred +} + +// parallelScheduler processes enqueued work using a fixed-size worker pool. +// The pool is started and stopped during the call to flush. +type parallelScheduler struct { + concurrency int + tasks []task +} + +// schedule enqueues a task and returns an unresolved deferred. It will be +// resolved during flush. +func (p *parallelScheduler) schedule(fn func()) *deferred { + d := &deferred{} + p.tasks = append(p.tasks, task{fn, d}) + return d +} + +// flush processes enqueued work. concurrency controls how many executor +// goroutines are started and thus the maximum number of calls that may +// proceed in parallel. The real level of concurrency may be lower for +// CPU-heavy workloads if Go doesn't assign these goroutines to OS threads. +func (p *parallelScheduler) flush() { + inFlight := 0 + taskChan := make(chan task) + resultChan := make(chan *deferred) + + for n := 0; n < p.concurrency; n++ { + go func() { + for t := range taskChan { + t.fn() + resultChan <- t.d + } + }() + } + + for inFlight > 0 || len(p.tasks) > 0 { + var t task + var outChan chan<- task + + if len(p.tasks) > 0 { + t = p.tasks[len(p.tasks)-1] + outChan = taskChan + } + + select { + case outChan <- t: + inFlight++ + p.tasks = p.tasks[0 : len(p.tasks)-1] + case d := <-resultChan: + inFlight-- + d.resolve(nil) + } + } + + close(taskChan) + close(resultChan) + + p.tasks = nil +} + +// unboundedScheduler starts a goroutine per task. Maximum concurrency is +// controlled by Go's allocation of OS threads to goroutines. +type unboundedScheduler struct { + tasks []task +} + +// schedule enqueues a task and returns an unresolved deferred. It will be +// resolved during flush. +func (p *unboundedScheduler) schedule(fn func()) *deferred { + d := &deferred{} + p.tasks = append(p.tasks, task{fn, d}) + return d +} + +// flush processes enqueued work with unlimited concurrency. The actual limit +// is up to Go's allocation of OS resources to goroutines. +func (p *unboundedScheduler) flush() { + inFlight := 0 + resultChan := make(chan *deferred) + + for inFlight > 0 || len(p.tasks) > 0 { + if len(p.tasks) > 0 { + t := p.tasks[len(p.tasks)-1] + p.tasks = p.tasks[0 : len(p.tasks)-1] + + go func() { + t.fn() + resultChan <- t.d + }() + + inFlight++ + continue + } + + select { + case d := <-resultChan: + inFlight-- + d.resolve(nil) + } + } + + close(resultChan) + + p.tasks = nil +} diff --git a/scope.go b/scope.go index a96e15ae..079a981e 100644 --- a/scope.go +++ b/scope.go @@ -78,6 +78,8 @@ type Scope struct { // invokerFn calls a function with arguments provided to Provide or Invoke. invokerFn invokerFn + sched scheduler + // graph of this Scope. Note that this holds the dependency graph of all the // nodes that affect this Scope, not just the ones provided directly to this Scope. gh *graphHolder @@ -99,6 +101,7 @@ func newScope() *Scope { decoratedGroups: make(map[key]reflect.Value), invokerFn: defaultInvoker, rand: rand.New(rand.NewSource(time.Now().UnixNano())), + sched: synchronousScheduler{}, } s.gh = newGraphHolder(s) return s @@ -265,6 +268,10 @@ func (s *Scope) invoker() invokerFn { return s.invokerFn } +func (s *Scope) scheduler() scheduler { + return s.sched +} + // adds a new graphNode to this Scope and all of its descendent // scope. func (s *Scope) newGraphNode(wrapped interface{}, orders map[*Scope]int) { From 3790411b0b386351efa4cc1269637086b64194ce Mon Sep 17 00:00:00 2001 From: Alexandra Parker Date: Thu, 27 Jan 2022 14:14:15 -0800 Subject: [PATCH 02/14] parallel container startup with deferred values This commit tries to be as unobtrusive as possible, attaching new behavior to existing types where possible rather than building out new infrastructure. constructorNode returns a deferred value when called. On the first call, it asks paramList to start building an arg slice, which may also be deferred. Once the arg slice is resolved, constructorNode schedules its constructor function to be called. Once it's called, it resolves its own deferral. Multiple paramSingles can observe the same constructorNode before it's ready. If there's an error, they may all see the same error, which is a change in behavior. There are two schedulers: synchronous and parallel. The synchronous scheduler returns things in the same order as before. The parallel may not (and the tests that rely on shuffle order will fail). The scheduler needs to be flushed after deferred values are created. The synchronous scheduler does nothing on when flushing, but the parallel scheduler runs a pool of goroutines to resolve constructors. Calls to dig functions always happen on the same goroutine as Scope.Invoke(). Calls to constructor functions can happen on pooled goroutines. The choice of scheduler is up to the Scope. Whether constructor functions are safe to call in parallel seems most logically to be a property of the scope, and the scope is passed down the constructor/param call chain. --- constructor.go | 56 ++++++++-------- decorate.go | 70 ++++++++++++++----- deferred.go | 21 ++++-- invoke.go | 2 +- param.go | 178 ++++++++++++++++++++++++------------------------- 5 files changed, 185 insertions(+), 142 deletions(-) diff --git a/constructor.go b/constructor.go index ffdf455f..0c337f86 100644 --- a/constructor.go +++ b/constructor.go @@ -153,38 +153,36 @@ func (n *constructorNode) Call(c containerStore) *deferred { } var args []reflect.Value - d := n.paramList.BuildList(c, false /* decorating */, &args) - - d.observe(func(err error) { - if err != nil { - n.calling = false - n.deferred.resolve(errArgumentsFailed{ - Func: n.location, - Reason: err, - }) - return - } - - var results []reflect.Value + var results []reflect.Value - c.scheduler().schedule(func() { + n.paramList.BuildList(c, false /* decorating */, &args).catch(func(err error) error { + return errArgumentsFailed{ + Func: n.location, + Reason: err, + } + }).then(func() *deferred { + return c.scheduler().schedule(func() { results = c.invoker()(reflect.ValueOf(n.ctor), args) - }).observe(func(_ error) { - n.calling = false - receiver := newStagingContainerWriter() - if err := n.resultList.ExtractList(receiver, false /* decorating */, results); err != nil { - n.deferred.resolve(errConstructorFailed{Func: n.location, Reason: err}) - return - } - - // Commit the result to the original container that this constructor - // was supplied to. The provided container is only used for a view of - // the rest of the graph to instantiate the dependencies of this - // container. - receiver.Commit(n.s) - n.called = true - n.deferred.resolve(nil) }) + }).then(func() *deferred { + receiver := newStagingContainerWriter() + if err := n.resultList.ExtractList(receiver, false /* decorating */, results); err != nil { + return failedDeferred(errConstructorFailed{Func: n.location, Reason: err}) + } + + // Commit the result to the original container that this constructor + // was supplied to. The provided container is only used for a view of + // the rest of the graph to instantiate the dependencies of this + // container. + receiver.Commit(n.s) + n.calling = false + n.called = true + n.deferred.resolve(nil) + return &alreadyResolved + }).catch(func(err error) error { + n.calling = false + n.deferred.resolve(err) + return nil }) return &n.deferred diff --git a/decorate.go b/decorate.go index 6a81d843..cc5905a8 100644 --- a/decorate.go +++ b/decorate.go @@ -29,7 +29,7 @@ import ( ) type decorator interface { - Call(c containerStore) error + Call(c containerStore) *deferred ID() dot.CtorID } @@ -42,12 +42,18 @@ type decoratorNode struct { // Location where this function was defined. location *digreflect.Func + // Whether this node is already building its paramList and calling the constructor + calling bool + // Whether the decorator owned by this node was already called. called bool // Parameters of the decorator. params paramList + // The result of calling the constructor + deferred deferred + // Results of the decorator. results resultList @@ -86,32 +92,60 @@ func newDecoratorNode(dcor interface{}, s *Scope) (*decoratorNode, error) { return n, nil } -func (n *decoratorNode) Call(s containerStore) error { - if n.called { - return nil +// Call calls this decorator if it hasn't already been called and injects any values produced by it into the container +// passed to newConstructorNode. +// +// If constructorNode has a unresolved deferred already in the process of building, it will return that one. If it has +// already been successfully called, it will return an already-resolved deferred. Together these mean it will try the +// call again if it failed last time. +// +// On failure, the returned pointer is not guaranteed to stay in a failed state; another call will reset it back to its +// zero value; don't store the returned pointer. (It will still call each observer only once.) +func (n *decoratorNode) Call(s containerStore) *deferred { + if n.calling || n.called { + return &n.deferred } + n.calling = true + n.deferred = deferred{} + if err := shallowCheckDependencies(s, n.params); err != nil { - return errMissingDependencies{ + n.deferred.resolve(errMissingDependencies{ Func: n.location, Reason: err, - } + }) } - args, err := n.params.BuildList(n.s, true /* decorating */) - if err != nil { - return errArgumentsFailed{ - Func: n.location, - Reason: err, + var args []reflect.Value + d := n.params.BuildList(s, true /* decorating */, &args) + + d.observe(func(err error) { + if err != nil { + n.calling = false + n.deferred.resolve(errArgumentsFailed{ + Func: n.location, + Reason: err, + }) + return } - } - results := reflect.ValueOf(n.dcor).Call(args) - if err := n.results.ExtractList(n.s, true /* decorated */, results); err != nil { - return err - } - n.called = true - return nil + var results []reflect.Value + + s.scheduler().schedule(func() { + results = s.invoker()(reflect.ValueOf(n.dcor), args) + }).observe(func(_ error) { + n.calling = false + if err := n.results.ExtractList(n.s, true /* decorated */, results); err != nil { + n.deferred.resolve(err) + return + } + + n.called = true + n.deferred.resolve(nil) + }) + }) + + return &n.deferred } func (n *decoratorNode) ID() dot.CtorID { return n.id } diff --git a/deferred.go b/deferred.go index 001a6bd0..726c2384 100644 --- a/deferred.go +++ b/deferred.go @@ -43,16 +43,27 @@ func (d *deferred) resolve(err error) { d.observers = nil } -// then returns a new deferred that is either resolved with the same error as this deferred, or any error returned from -// the supplied function. The supplied function is only called if this deferred is resolved without error. -func (d *deferred) then(res func() error) *deferred { +// then returns a new deferred that is either resolved with the same error as this deferred or the eventual result of +// the deferred returned by res. +func (d *deferred) then(res func() *deferred) *deferred { + // Shortcut: if we're settled... + if d.settled { + if d.err == nil { + // ...successfully, then return the other deferred + return res() + } else { + // ...with an error, then return us + return d + } + } + d2 := new(deferred) d.observe(func(err error) { if err != nil { d2.resolve(err) - return + } else { + res().observe(d2.resolve) } - d2.resolve(res()) }) return d2 } diff --git a/invoke.go b/invoke.go index d5f2566d..98c3984e 100644 --- a/invoke.go +++ b/invoke.go @@ -84,7 +84,7 @@ func (s *Scope) Invoke(function interface{}, opts ...InvokeOption) error { var args []reflect.Value - d := pl.BuildList(s, &args, false /* decorating */) + d := pl.BuildList(s, false /* decorating */, &args) d.observe(func(err2 error) { err = err2 }) diff --git a/param.go b/param.go index 1ce5ed29..097b6d57 100644 --- a/param.go +++ b/param.go @@ -218,75 +218,39 @@ func (ps paramSingle) getValue(c containerStore) (reflect.Value, bool) { return _noValue, false } -// builds the parameter using decorators, if any. If there are no decorators associated -// with this parameter, _noValue is returned. -func (ps paramSingle) buildWithDecorators(c containerStore) (v reflect.Value, found bool, err error) { - decorators := c.getValueDecorators(ps.Name, ps.Type) - if len(decorators) == 0 { - return _noValue, false, nil - } - found = true - for _, d := range decorators { - err := d.Call(c) - if err == nil { - continue - } - if _, ok := err.(errMissingDependencies); ok && ps.Optional { - continue - } - v, err = _noValue, errParamSingleFailed{ - CtorID: 1, - Key: key{t: ps.Type, name: ps.Name}, - Reason: err, - } - return v, found, err - } - v, _ = c.getDecoratedValue(ps.Name, ps.Type) - return -} +// builds the parameter using decorators, if any. useDecorators controls whether to use decorator functions (true) or +// provider functions (false). +func (ps paramSingle) buildWith(c containerStore, useDecorators bool, target *reflect.Value) *deferred { + var decorators []decorator -func (ps paramSingle) Build(c containerStore, decorating bool, target *reflect.Value) *deferred { - if !target.IsValid() { - *target = reflect.New(ps.Type).Elem() - } + if useDecorators { + decorators = c.getValueDecorators(ps.Name, ps.Type) - if !decorating { - v, found, err := ps.buildWithDecorators(c) - if found { - return v, err + if len(decorators) == 0 { + return &alreadyResolved + } + } else { + // A provider is-a decorator ({methods of decorator} ⊆ {methods of provider}) + var providers []provider + for _, container := range c.storesToRoot() { + providers = container.getValueProviders(ps.Name, ps.Type) + if len(providers) > 0 { + break + } } - } - - // Check whether the value is a decorated value first. - if v, ok := ps.getDecoratedValue(c); ok { - return v, nil - } - - if v, ok := ps.getValue(c); ok { - target.Set(v) - return &alreadyResolved - } - // Starting at the given container and working our way up its parents, - // find one that provides this dependency. - // - // Once found, we'll use that container for the rest of the invocation. - // Dependencies of this type will begin searching at that container, - // rather than starting at base. - var providers []provider - for _, container := range c.storesToRoot() { - providers = container.getValueProviders(ps.Name, ps.Type) - if len(providers) > 0 { - break + if len(providers) == 0 { + if ps.Optional { + target.Set(reflect.Zero(ps.Type)) + return &alreadyResolved + } else { + return failedDeferred(newErrMissingTypes(c, key{name: ps.Name, t: ps.Type})) + } } - } - if len(providers) == 0 { - if ps.Optional { - target.Set(reflect.Zero(ps.Type)) - return &alreadyResolved - } else { - return failedDeferred(newErrMissingTypes(c, key{name: ps.Name, t: ps.Type})) + decorators = make([]decorator, len(providers)) + for i, provider := range providers { + decorators[i] = provider.(decorator) } } @@ -296,7 +260,7 @@ func (ps paramSingle) Build(c containerStore, decorating bool, target *reflect.V ) doNext = func(i int) { - if i == len(providers) { + if i == len(decorators) { // If we get here, it's impossible for the value to be absent from the // container. v, _ := ps.getValue(c) @@ -308,7 +272,7 @@ func (ps paramSingle) Build(c containerStore, decorating bool, target *reflect.V return } - n := providers[i] + n := decorators[i] n.Call(c).observe(func(err error) { if err != nil { @@ -331,6 +295,34 @@ func (ps paramSingle) Build(c containerStore, decorating bool, target *reflect.V return d } +func (ps paramSingle) Build(c containerStore, decorating bool, target *reflect.Value) *deferred { + if !target.IsValid() { + *target = reflect.New(ps.Type).Elem() + } + + d := &alreadyResolved + + if !decorating { + d = ps.buildWith(c, true, target) + } + + return d.then(func() *deferred { + // Check whether the value is a decorated value first. + if v, ok := ps.getDecoratedValue(c); ok { + target.Set(v) + return &alreadyResolved + } + + // See if it's already in the store + if v, ok := ps.getValue(c); ok { + target.Set(v) + return &alreadyResolved + } + + return ps.buildWith(c, false, target) + }) +} + // paramObject is a dig.In struct where each field is another param. // // This object is not expected in the graph as-is. @@ -576,27 +568,30 @@ func (pt paramGroupedSlice) getDecoratedValues(c containerStore) (reflect.Value, // The order in which the decorators are invoked is from the top level scope to // the current scope, to account for decorators that decorate values that were // already decorated. -func (pt paramGroupedSlice) callGroupDecorators(c containerStore) error { +func (pt paramGroupedSlice) callGroupDecorators(c containerStore) *deferred { + var children []*deferred stores := c.storesToRoot() for i := len(stores) - 1; i >= 0; i-- { c := stores[i] for _, d := range c.getGroupDecorators(pt.Group, pt.Type.Elem()) { - if err := d.Call(c); err != nil { + d := d + child := d.Call(c) + children = append(children, child.catch(func(err error) error { return errParamGroupFailed{ CtorID: d.ID(), Key: key{group: pt.Group, t: pt.Type.Elem()}, Reason: err, } - } + })) } } - return nil + return whenAll(children...) } // search the given container and its parent for matching group providers and // call them to commit values. If an error is encountered, return the number // of providers called and a non-nil error from the first provided. -func (pt paramGroupedSlice) callGroupProviders(c containerStore) []*deferred { +func (pt paramGroupedSlice) callGroupProviders(c containerStore) *deferred { var children []*deferred for _, c := range c.storesToRoot() { providers := c.getGroupProviders(pt.Group, pt.Type.Elem()) @@ -612,37 +607,42 @@ func (pt paramGroupedSlice) callGroupProviders(c containerStore) []*deferred { })) } } - return children + return whenAll(children...) } func (pt paramGroupedSlice) Build(c containerStore, decorating bool, target *reflect.Value) *deferred { + d := &alreadyResolved + // do not call this if we are already inside a decorator since // it will result in an infinite recursion. (i.e. decorate -> params.BuildList() -> Decorate -> params.BuildList...) // this is safe since a value can be decorated at most once in a given scope. if !decorating { - if err := pt.callGroupDecorators(c); err != nil { - return _noValue, err - } - } - - // Check if we have decorated values - if decoratedItems, ok := pt.getDecoratedValues(c); ok { - return decoratedItems, nil + d = pt.callGroupDecorators(c) } - // If we do not have any decorated values, find the - // providers and call them. - children := pt.callGroupProviders(c) + return d.then(func() *deferred { + // Check if we have decorated values + if decoratedItems, ok := pt.getDecoratedValues(c); ok { + if !target.IsValid() { + newCap := 0 + if decoratedItems.Kind() == reflect.Slice { + newCap = decoratedItems.Len() + } + *target = reflect.MakeSlice(pt.Type, 0, newCap) + } - return whenAll(children...).then(func() error { - if !target.IsValid() { - *target = reflect.MakeSlice(pt.Type, 0, len(children)) + target.Set(decoratedItems) + return &alreadyResolved } - for _, c := range c.storesToRoot() { - target.Set(reflect.Append(*target, c.getValueGroup(pt.Group, pt.Type.Elem())...)) - } - return nil + // If we do not have any decorated values, find the + // providers and call them. + return pt.callGroupProviders(c).then(func() *deferred { + for _, c := range c.storesToRoot() { + target.Set(reflect.Append(*target, c.getValueGroup(pt.Group, pt.Type.Elem())...)) + } + return &alreadyResolved + }) }) } From 81a0bad938240211ed6a918633143fa5d105f91d Mon Sep 17 00:00:00 2001 From: Alexandra Parker Date: Thu, 27 Jan 2022 17:36:09 -0800 Subject: [PATCH 03/14] Clean up concurrency tests --- dig_test.go | 36 +++++++++++++++++++----------------- go.mod | 5 ++++- go.sum | 6 +++++- 3 files changed, 28 insertions(+), 19 deletions(-) diff --git a/dig_test.go b/dig_test.go index 021ea95f..5463fb04 100644 --- a/dig_test.go +++ b/dig_test.go @@ -29,12 +29,12 @@ import ( "math/rand" "os" "reflect" - "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/dig" "go.uber.org/dig/internal/digtest" ) @@ -3579,13 +3579,14 @@ func TestConcurrency(t *testing.T) { C int ) + const max = 3 + var ( - timer = time.NewTimer(10 * time.Second) - max int32 = 3 - done = make(chan struct{}) - running int32 = 0 - waitForUs = func() error { - if atomic.AddInt32(&running, 1) == max { + timer = time.NewTimer(10 * time.Second) + done = make(chan struct{}) + running = atomic.Int32{} + waitForUs = func() error { + if running.Inc() == max { close(done) } select { @@ -3595,7 +3596,7 @@ func TestConcurrency(t *testing.T) { return nil } } - c = digtest.New(t, dig.MaxConcurrency(int(max))) + c = digtest.New(t, dig.MaxConcurrency(max)) ) c.RequireProvide(func() (A, error) { return 0, waitForUs() }) @@ -3606,20 +3607,21 @@ func TestConcurrency(t *testing.T) { require.Equal(t, a, A(0)) require.Equal(t, b, B(1)) require.Equal(t, c, C(2)) - require.Equal(t, running, int32(3)) + require.Equal(t, running.Load(), int32(max)) }) }) t.Run("TestUnboundConcurrency", func(t *testing.T) { t.Parallel() + const max = 20 + var ( - timer = time.NewTimer(10 * time.Second) - max int32 = 20 - done = make(chan struct{}) - running int32 = 0 - waitForUs = func() error { - if atomic.AddInt32(&running, 1) >= max { + timer = time.NewTimer(10 * time.Second) + done = make(chan struct{}) + running = atomic.NewInt32(0) + waitForUs = func() error { + if running.Inc() == max { close(done) } select { @@ -3629,11 +3631,11 @@ func TestConcurrency(t *testing.T) { return nil } } - c = digtest.New(t, dig.UnboundedConcurrency) + c = digtest.New(t, dig.MaxConcurrency(-1)) expected []int ) - for i := 0; i < int(max); i++ { + for i := 0; i < max; i++ { i := i expected = append(expected, i) type out struct { diff --git a/go.mod b/go.mod index fd99adc6..14bae251 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module go.uber.org/dig go 1.13 -require github.com/stretchr/testify v1.4.0 +require ( + github.com/stretchr/testify v1.4.0 + go.uber.org/atomic v1.9.0 +) diff --git a/go.sum b/go.sum index 8fdee585..5580b163 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,14 @@ -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= From 0c029bd4508ea20d2f253cf96aa36765f8702291 Mon Sep 17 00:00:00 2001 From: Alexandra Parker Date: Thu, 27 Jan 2022 17:36:30 -0800 Subject: [PATCH 04/14] Unify concurrency options into one --- container.go | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/container.go b/container.go index ac2decc7..1a068453 100644 --- a/container.go +++ b/container.go @@ -236,25 +236,31 @@ func dryInvoker(fn reflect.Value, _ []reflect.Value) []reflect.Value { type maxConcurrencyOption int -// MaxConcurrency run constructors in this container with a fixed pool of executor -// goroutines. max is the number of goroutines to start. +// MaxConcurrency run constructors in this container with the given level of +// concurrency: +// +// - max = 0 or 1: run one constructor at a time (this is the default) +// +// - max > 1: run at most 'max' constructors at a time +// +// - max < 0: run an unlimited number of constructors at a time +// +// Concurrency is limited by how many constructors' dependencies are satisfied at +// once and Go's own allocation of OS threads to Goroutines. This is useful for +// applications that have many slow, independent constructors. func MaxConcurrency(max int) Option { return maxConcurrencyOption(max) } func (m maxConcurrencyOption) applyOption(container *Container) { - container.scope.sched = ¶llelScheduler{concurrency: int(m)} -} - -type unboundedConcurrency struct{} - -// UnboundedConcurrency run constructors in this container as concurrently as possible. -// Go's resource limits like GOMAXPROCS will inherently limit how much can happen in -// parallel. -var UnboundedConcurrency Option = unboundedConcurrency{} - -func (u unboundedConcurrency) applyOption(container *Container) { - container.scope.sched = &unboundedScheduler{} + switch { + case m == 0, m == 1: + container.scope.sched = synchronousScheduler{} + case m > 1: + container.scope.sched = ¶llelScheduler{concurrency: int(m)} + case m < 0: + container.scope.sched = &unboundedScheduler{} + } } // String representation of the entire Container From 9cf903be1a4c01b55abbe8f0a36a95a1d9e79edb Mon Sep 17 00:00:00 2001 From: Alexandra Parker Date: Thu, 27 Jan 2022 17:52:16 -0800 Subject: [PATCH 05/14] concurrency: fix linter issues --- deferred.go | 26 +++++++++++++++++++++++--- param.go | 4 ++-- scheduler.go | 28 +++++++++++++++++++++++----- scope.go | 2 +- 4 files changed, 49 insertions(+), 11 deletions(-) diff --git a/deferred.go b/deferred.go index 726c2384..e5c2e93a 100644 --- a/deferred.go +++ b/deferred.go @@ -1,3 +1,23 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package dig type observer func(error) @@ -51,10 +71,10 @@ func (d *deferred) then(res func() *deferred) *deferred { if d.err == nil { // ...successfully, then return the other deferred return res() - } else { - // ...with an error, then return us - return d } + + // ...with an error, then return us + return d } d2 := new(deferred) diff --git a/param.go b/param.go index 097b6d57..378d551c 100644 --- a/param.go +++ b/param.go @@ -243,9 +243,9 @@ func (ps paramSingle) buildWith(c containerStore, useDecorators bool, target *re if ps.Optional { target.Set(reflect.Zero(ps.Type)) return &alreadyResolved - } else { - return failedDeferred(newErrMissingTypes(c, key{name: ps.Name, t: ps.Type})) } + + return failedDeferred(newErrMissingTypes(c, key{name: ps.Name, t: ps.Type})) } decorators = make([]decorator, len(providers)) diff --git a/scheduler.go b/scheduler.go index 7010023e..bb6687e3 100644 --- a/scheduler.go +++ b/scheduler.go @@ -1,3 +1,23 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package dig // A scheduler queues work during resolution of params. @@ -140,11 +160,9 @@ func (p *unboundedScheduler) flush() { continue } - select { - case d := <-resultChan: - inFlight-- - d.resolve(nil) - } + d := <-resultChan + inFlight-- + d.resolve(nil) } close(resultChan) diff --git a/scope.go b/scope.go index 079a981e..cc231f88 100644 --- a/scope.go +++ b/scope.go @@ -101,7 +101,7 @@ func newScope() *Scope { decoratedGroups: make(map[key]reflect.Value), invokerFn: defaultInvoker, rand: rand.New(rand.NewSource(time.Now().UnixNano())), - sched: synchronousScheduler{}, + sched: synchronousScheduler{}, } s.gh = newGraphHolder(s) return s From 08ffabf4e7253e9d7982f2b5f6af94b9d00fbf95 Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Mon, 31 Jan 2022 15:36:48 -0800 Subject: [PATCH 06/14] Move deferred to internal/promise Move deferred to an internal/promise package. Rename deferred to Deferred, export methods, and rename alreadyResolved and failedDeferred to Done and Fail. --- constructor.go | 25 +++--- constructor_test.go | 11 ++- decorate.go | 21 ++--- deferred.go | 135 ------------------------------- internal/promise/deferred.go | 151 +++++++++++++++++++++++++++++++++++ invoke.go | 2 +- param.go | 73 ++++++++--------- provide.go | 3 +- scheduler.go | 26 +++--- 9 files changed, 238 insertions(+), 209 deletions(-) delete mode 100644 deferred.go create mode 100644 internal/promise/deferred.go diff --git a/constructor.go b/constructor.go index 0c337f86..8725f7e9 100644 --- a/constructor.go +++ b/constructor.go @@ -27,6 +27,7 @@ import ( "go.uber.org/dig/internal/digerror" "go.uber.org/dig/internal/digreflect" "go.uber.org/dig/internal/dot" + "go.uber.org/dig/internal/promise" ) // constructorNode is a node in the dependency graph that represents @@ -55,7 +56,7 @@ type constructorNode struct { paramList paramList // The result of calling the constructor - deferred deferred + deferred promise.Deferred // Type information about constructor results. resultList resultList @@ -137,16 +138,16 @@ func (n *constructorNode) String() string { // // On failure, the returned pointer is not guaranteed to stay in a failed state; another call will reset it back to its // zero value; don't store the returned pointer. (It will still call each observer only once.) -func (n *constructorNode) Call(c containerStore) *deferred { +func (n *constructorNode) Call(c containerStore) *promise.Deferred { if n.calling || n.called { return &n.deferred } n.calling = true - n.deferred = deferred{} + n.deferred = promise.Deferred{} if err := shallowCheckDependencies(c, n.paramList); err != nil { - n.deferred.resolve(errMissingDependencies{ + n.deferred.Resolve(errMissingDependencies{ Func: n.location, Reason: err, }) @@ -155,19 +156,19 @@ func (n *constructorNode) Call(c containerStore) *deferred { var args []reflect.Value var results []reflect.Value - n.paramList.BuildList(c, false /* decorating */, &args).catch(func(err error) error { + n.paramList.BuildList(c, false /* decorating */, &args).Catch(func(err error) error { return errArgumentsFailed{ Func: n.location, Reason: err, } - }).then(func() *deferred { + }).Then(func() *promise.Deferred { return c.scheduler().schedule(func() { results = c.invoker()(reflect.ValueOf(n.ctor), args) }) - }).then(func() *deferred { + }).Then(func() *promise.Deferred { receiver := newStagingContainerWriter() if err := n.resultList.ExtractList(receiver, false /* decorating */, results); err != nil { - return failedDeferred(errConstructorFailed{Func: n.location, Reason: err}) + return promise.Fail(errConstructorFailed{Func: n.location, Reason: err}) } // Commit the result to the original container that this constructor @@ -177,11 +178,11 @@ func (n *constructorNode) Call(c containerStore) *deferred { receiver.Commit(n.s) n.calling = false n.called = true - n.deferred.resolve(nil) - return &alreadyResolved - }).catch(func(err error) error { + n.deferred.Resolve(nil) + return promise.Done + }).Catch(func(err error) error { n.calling = false - n.deferred.resolve(err) + n.deferred.Resolve(err) return nil }) diff --git a/constructor_test.go b/constructor_test.go index 992d06a4..8edb5d0d 100644 --- a/constructor_test.go +++ b/constructor_test.go @@ -61,9 +61,16 @@ func TestNodeAlreadyCalled(t *testing.T) { c := New() d := n.Call(c.scope) c.scope.sched.flush() - require.NoError(t, d.err, "invoke failed") + + ok, err := d.Resolved() + require.True(t, ok, "deferred must be resolved") + require.NoError(t, err, "invoke failed") + require.True(t, n.called, "node must be called") d = n.Call(c.scope) c.scope.sched.flush() - require.NoError(t, d.err, "calling again should be okay") + + ok, err = d.Resolved() + require.True(t, ok, "deferred must be resolved") + require.NoError(t, err, "calling again should be okay") } diff --git a/decorate.go b/decorate.go index cc5905a8..4b96f9f0 100644 --- a/decorate.go +++ b/decorate.go @@ -26,10 +26,11 @@ import ( "go.uber.org/dig/internal/digreflect" "go.uber.org/dig/internal/dot" + "go.uber.org/dig/internal/promise" ) type decorator interface { - Call(c containerStore) *deferred + Call(c containerStore) *promise.Deferred ID() dot.CtorID } @@ -52,7 +53,7 @@ type decoratorNode struct { params paramList // The result of calling the constructor - deferred deferred + deferred promise.Deferred // Results of the decorator. results resultList @@ -101,16 +102,16 @@ func newDecoratorNode(dcor interface{}, s *Scope) (*decoratorNode, error) { // // On failure, the returned pointer is not guaranteed to stay in a failed state; another call will reset it back to its // zero value; don't store the returned pointer. (It will still call each observer only once.) -func (n *decoratorNode) Call(s containerStore) *deferred { +func (n *decoratorNode) Call(s containerStore) *promise.Deferred { if n.calling || n.called { return &n.deferred } n.calling = true - n.deferred = deferred{} + n.deferred = promise.Deferred{} if err := shallowCheckDependencies(s, n.params); err != nil { - n.deferred.resolve(errMissingDependencies{ + n.deferred.Resolve(errMissingDependencies{ Func: n.location, Reason: err, }) @@ -119,10 +120,10 @@ func (n *decoratorNode) Call(s containerStore) *deferred { var args []reflect.Value d := n.params.BuildList(s, true /* decorating */, &args) - d.observe(func(err error) { + d.Observe(func(err error) { if err != nil { n.calling = false - n.deferred.resolve(errArgumentsFailed{ + n.deferred.Resolve(errArgumentsFailed{ Func: n.location, Reason: err, }) @@ -133,15 +134,15 @@ func (n *decoratorNode) Call(s containerStore) *deferred { s.scheduler().schedule(func() { results = s.invoker()(reflect.ValueOf(n.dcor), args) - }).observe(func(_ error) { + }).Observe(func(_ error) { n.calling = false if err := n.results.ExtractList(n.s, true /* decorated */, results); err != nil { - n.deferred.resolve(err) + n.deferred.Resolve(err) return } n.called = true - n.deferred.resolve(nil) + n.deferred.Resolve(nil) }) }) diff --git a/deferred.go b/deferred.go deleted file mode 100644 index e5c2e93a..00000000 --- a/deferred.go +++ /dev/null @@ -1,135 +0,0 @@ -// Copyright (c) 2021 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package dig - -type observer func(error) - -// A deferred is an observable future result that may fail. Its zero value is unresolved and has no observers. It can -// be resolved once, at which point every observer will be called. -type deferred struct { - observers []observer - settled bool - err error -} - -// alreadyResolved is a deferred that has already been resolved with a nil error. -var alreadyResolved = deferred{settled: true} - -// failedDeferred returns a deferred that is resolved with the given error. -func failedDeferred(err error) *deferred { - return &deferred{settled: true, err: err} -} - -// observe registers an observer to receive a callback when this deferred is resolved. It will be called at most one -// time. If this deferred is already resolved, the observer is called immediately, before observe returns. -func (d *deferred) observe(obs observer) { - if d.settled { - obs(d.err) - return - } - - d.observers = append(d.observers, obs) -} - -// resolve sets the status of this deferred and notifies all observers if it's not already resolved. -func (d *deferred) resolve(err error) { - if d.settled { - return - } - - d.settled = true - d.err = err - for _, obs := range d.observers { - obs(err) - } - d.observers = nil -} - -// then returns a new deferred that is either resolved with the same error as this deferred or the eventual result of -// the deferred returned by res. -func (d *deferred) then(res func() *deferred) *deferred { - // Shortcut: if we're settled... - if d.settled { - if d.err == nil { - // ...successfully, then return the other deferred - return res() - } - - // ...with an error, then return us - return d - } - - d2 := new(deferred) - d.observe(func(err error) { - if err != nil { - d2.resolve(err) - } else { - res().observe(d2.resolve) - } - }) - return d2 -} - -// catch maps any error from this deferred using the supplied function. The supplied function is only called if this -// deferred is resolved with an error. If the supplied function returns a nil error, the new deferred will resolve -// successfully. -func (d *deferred) catch(rej func(error) error) *deferred { - d2 := new(deferred) - d.observe(func(err error) { - if err != nil { - err = rej(err) - } - d2.resolve(err) - }) - return d2 -} - -// whenAll returns a new deferred that resolves when all the supplied deferreds resolve. It resolves with the first -// error reported by any deferred, or nil if they all succeed. -func whenAll(others ...*deferred) *deferred { - if len(others) == 0 { - return &alreadyResolved - } - - d := new(deferred) - count := len(others) - - onResolved := func(err error) { - if d.settled { - return - } - - if err != nil { - d.resolve(err) - } - - count-- - if count == 0 { - d.resolve(nil) - } - } - - for _, other := range others { - other.observe(onResolved) - } - - return d -} diff --git a/internal/promise/deferred.go b/internal/promise/deferred.go new file mode 100644 index 00000000..8e01edfb --- /dev/null +++ b/internal/promise/deferred.go @@ -0,0 +1,151 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package promise + +type Observer func(error) + +// Deferred is an observable future result that may fail. +// Its zero value is unresolved and has no observers. +// It can be resolved once, at which point every observer will be called. +type Deferred struct { + observers []Observer + settled bool + err error +} + +// Resolved reports whether this Deferred has resolved, +// and if so, with what error. +// +// err is undefined if the Deferred has not yet resolved. +func (d *Deferred) Resolved() (resolved bool, err error) { + return d.settled, d.err +} + +// Done is a Deferred that has already been resolved with a nil error. +var Done = &Deferred{settled: true} + +// Fail returns a Deferred that has resolved with the given error. +func Fail(err error) *Deferred { + return &Deferred{settled: true, err: err} +} + +// Observe registers an observer to receive a callback when this deferred is +// resolved. +// It will be called at most one time. +// If this deferred is already resolved, the observer is called immediately, +// before Observe returns. +func (d *Deferred) Observe(obs Observer) { + if d.settled { + obs(d.err) + } else { + d.observers = append(d.observers, obs) + } +} + +// Resolve sets the status of this deferred and notifies all observers. +// This is a no-op if the Deferred has already resolved. +func (d *Deferred) Resolve(err error) { + if d.settled { + return + } + + d.settled = true + d.err = err + for _, obs := range d.observers { + obs(err) + } + d.observers = nil +} + +// Then returns a new Deferred that resolves with the same error as this +// Deferred or the eventual result of the Deferred returned by res. +func (d *Deferred) Then(res func() *Deferred) *Deferred { + // Shortcut: if we're settled... + if d.settled { + if d.err == nil { + // ...successfully, then return the other deferred + return res() + } + + // ...with an error, then return us + return d + } + + d2 := new(Deferred) + d.Observe(func(err error) { + if err != nil { + d2.Resolve(err) + } else { + res().Observe(d2.Resolve) + } + }) + return d2 +} + +// Catch maps any error from this deferred using the supplied function. +// The supplied function is only called if this deferred is resolved with an +// error. +// If the supplied function returns a nil error, the new deferred will resolve +// successfully. +func (d *Deferred) Catch(rej func(error) error) *Deferred { + d2 := new(Deferred) + d.Observe(func(err error) { + if err != nil { + err = rej(err) + } + d2.Resolve(err) + }) + return d2 +} + +// WhenAll returns a new Deferred that resolves when all the supplied deferreds +// resolve. +// It resolves with the first error reported by any deferred, or nil if they +// all succeed. +func WhenAll(others ...*Deferred) *Deferred { + if len(others) == 0 { + return Done + } + + d := new(Deferred) + count := len(others) + + onResolved := func(err error) { + if d.settled { + return + } + + if err != nil { + d.Resolve(err) + } + + count-- + if count == 0 { + d.Resolve(nil) + } + } + + for _, other := range others { + other.Observe(onResolved) + } + + return d +} diff --git a/invoke.go b/invoke.go index 98c3984e..43366955 100644 --- a/invoke.go +++ b/invoke.go @@ -85,7 +85,7 @@ func (s *Scope) Invoke(function interface{}, opts ...InvokeOption) error { var args []reflect.Value d := pl.BuildList(s, false /* decorating */, &args) - d.observe(func(err2 error) { + d.Observe(func(err2 error) { err = err2 }) s.sched.flush() diff --git a/param.go b/param.go index 378d551c..233f227d 100644 --- a/param.go +++ b/param.go @@ -29,6 +29,7 @@ import ( "go.uber.org/dig/internal/digerror" "go.uber.org/dig/internal/dot" + "go.uber.org/dig/internal/promise" ) // The param interface represents a dependency for a constructor. @@ -52,7 +53,7 @@ type param interface { // Build returns a deferred that resolves once the reflect.Value is filled in. // // This MAY panic if the param does not produce a single value. - Build(store containerStore, decorating bool, target *reflect.Value) *deferred + Build(store containerStore, decorating bool, target *reflect.Value) *promise.Deferred // DotParam returns a slice of dot.Param(s). DotParam() []*dot.Param @@ -140,7 +141,7 @@ func newParamList(ctype reflect.Type, c containerStore) (paramList, error) { return pl, nil } -func (pl paramList) Build(containerStore, bool, *reflect.Value) *deferred { +func (pl paramList) Build(containerStore, bool, *reflect.Value) *promise.Deferred { digerror.BugPanicf("paramList.Build() must never be called") panic("") // Unreachable, as BugPanicf above will panic. } @@ -148,13 +149,13 @@ func (pl paramList) Build(containerStore, bool, *reflect.Value) *deferred { // BuildList builds an ordered list of values which may be passed directly // to the underlying constructor and stores them in the pointed-to slice. // It returns a deferred that resolves when the slice is filled out. -func (pl paramList) BuildList(c containerStore, decorating bool, targets *[]reflect.Value) *deferred { - children := make([]*deferred, len(pl.Params)) +func (pl paramList) BuildList(c containerStore, decorating bool, targets *[]reflect.Value) *promise.Deferred { + children := make([]*promise.Deferred, len(pl.Params)) *targets = make([]reflect.Value, len(pl.Params)) for i, p := range pl.Params { children[i] = p.Build(c, decorating, &(*targets)[i]) } - return whenAll(children...) + return promise.WhenAll(children...) } // paramSingle is an explicitly requested type, optionally with a name. @@ -220,14 +221,14 @@ func (ps paramSingle) getValue(c containerStore) (reflect.Value, bool) { // builds the parameter using decorators, if any. useDecorators controls whether to use decorator functions (true) or // provider functions (false). -func (ps paramSingle) buildWith(c containerStore, useDecorators bool, target *reflect.Value) *deferred { +func (ps paramSingle) buildWith(c containerStore, useDecorators bool, target *reflect.Value) *promise.Deferred { var decorators []decorator if useDecorators { decorators = c.getValueDecorators(ps.Name, ps.Type) if len(decorators) == 0 { - return &alreadyResolved + return promise.Done } } else { // A provider is-a decorator ({methods of decorator} ⊆ {methods of provider}) @@ -242,10 +243,10 @@ func (ps paramSingle) buildWith(c containerStore, useDecorators bool, target *re if len(providers) == 0 { if ps.Optional { target.Set(reflect.Zero(ps.Type)) - return &alreadyResolved + return promise.Done } - return failedDeferred(newErrMissingTypes(c, key{name: ps.Name, t: ps.Type})) + return promise.Fail(newErrMissingTypes(c, key{name: ps.Name, t: ps.Type})) } decorators = make([]decorator, len(providers)) @@ -256,7 +257,7 @@ func (ps paramSingle) buildWith(c containerStore, useDecorators bool, target *re var ( doNext func(i int) - d = new(deferred) + d = new(promise.Deferred) ) doNext = func(i int) { @@ -268,18 +269,18 @@ func (ps paramSingle) buildWith(c containerStore, useDecorators bool, target *re // Not valid during a dry run target.Set(v) } - d.resolve(nil) + d.Resolve(nil) return } n := decorators[i] - n.Call(c).observe(func(err error) { + n.Call(c).Observe(func(err error) { if err != nil { // If we're missing dependencies but the parameter itself is optional, // we can just move on. if _, ok := err.(errMissingDependencies); !ok || !ps.Optional { - d.resolve(errParamSingleFailed{ + d.Resolve(errParamSingleFailed{ CtorID: n.ID(), Key: key{t: ps.Type, name: ps.Name}, Reason: err, @@ -295,28 +296,28 @@ func (ps paramSingle) buildWith(c containerStore, useDecorators bool, target *re return d } -func (ps paramSingle) Build(c containerStore, decorating bool, target *reflect.Value) *deferred { +func (ps paramSingle) Build(c containerStore, decorating bool, target *reflect.Value) *promise.Deferred { if !target.IsValid() { *target = reflect.New(ps.Type).Elem() } - d := &alreadyResolved + d := promise.Done if !decorating { d = ps.buildWith(c, true, target) } - return d.then(func() *deferred { + return d.Then(func() *promise.Deferred { // Check whether the value is a decorated value first. if v, ok := ps.getDecoratedValue(c); ok { target.Set(v) - return &alreadyResolved + return promise.Done } // See if it's already in the store if v, ok := ps.getValue(c); ok { target.Set(v) - return &alreadyResolved + return promise.Done } return ps.buildWith(c, false, target) @@ -407,19 +408,19 @@ func newParamObject(t reflect.Type, c containerStore) (paramObject, error) { return po, nil } -func (po paramObject) Build(c containerStore, decorating bool, target *reflect.Value) *deferred { +func (po paramObject) Build(c containerStore, decorating bool, target *reflect.Value) *promise.Deferred { if !target.IsValid() { *target = reflect.New(po.Type).Elem() } - children := make([]*deferred, len(po.Fields)) + children := make([]*promise.Deferred, len(po.Fields)) for i, f := range po.Fields { f := f field := target.Field(f.FieldIndex) children[i] = f.Build(c, decorating, &field) } - return whenAll(children...) + return promise.WhenAll(children...) } // paramObjectField is a single field of a dig.In struct. @@ -485,7 +486,7 @@ func newParamObjectField(idx int, f reflect.StructField, c containerStore) (para return pof, nil } -func (pof paramObjectField) Build(c containerStore, decorating bool, target *reflect.Value) *deferred { +func (pof paramObjectField) Build(c containerStore, decorating bool, target *reflect.Value) *promise.Deferred { return pof.Param.Build(c, decorating, target) } @@ -568,15 +569,15 @@ func (pt paramGroupedSlice) getDecoratedValues(c containerStore) (reflect.Value, // The order in which the decorators are invoked is from the top level scope to // the current scope, to account for decorators that decorate values that were // already decorated. -func (pt paramGroupedSlice) callGroupDecorators(c containerStore) *deferred { - var children []*deferred +func (pt paramGroupedSlice) callGroupDecorators(c containerStore) *promise.Deferred { + var children []*promise.Deferred stores := c.storesToRoot() for i := len(stores) - 1; i >= 0; i-- { c := stores[i] for _, d := range c.getGroupDecorators(pt.Group, pt.Type.Elem()) { d := d child := d.Call(c) - children = append(children, child.catch(func(err error) error { + children = append(children, child.Catch(func(err error) error { return errParamGroupFailed{ CtorID: d.ID(), Key: key{group: pt.Group, t: pt.Type.Elem()}, @@ -585,20 +586,20 @@ func (pt paramGroupedSlice) callGroupDecorators(c containerStore) *deferred { })) } } - return whenAll(children...) + return promise.WhenAll(children...) } // search the given container and its parent for matching group providers and // call them to commit values. If an error is encountered, return the number // of providers called and a non-nil error from the first provided. -func (pt paramGroupedSlice) callGroupProviders(c containerStore) *deferred { - var children []*deferred +func (pt paramGroupedSlice) callGroupProviders(c containerStore) *promise.Deferred { + var children []*promise.Deferred for _, c := range c.storesToRoot() { providers := c.getGroupProviders(pt.Group, pt.Type.Elem()) for _, n := range providers { n := n child := n.Call(c) - children = append(children, child.catch(func(err error) error { + children = append(children, child.Catch(func(err error) error { return errParamGroupFailed{ CtorID: n.ID(), Key: key{group: pt.Group, t: pt.Type.Elem()}, @@ -607,11 +608,11 @@ func (pt paramGroupedSlice) callGroupProviders(c containerStore) *deferred { })) } } - return whenAll(children...) + return promise.WhenAll(children...) } -func (pt paramGroupedSlice) Build(c containerStore, decorating bool, target *reflect.Value) *deferred { - d := &alreadyResolved +func (pt paramGroupedSlice) Build(c containerStore, decorating bool, target *reflect.Value) *promise.Deferred { + d := promise.Done // do not call this if we are already inside a decorator since // it will result in an infinite recursion. (i.e. decorate -> params.BuildList() -> Decorate -> params.BuildList...) @@ -620,7 +621,7 @@ func (pt paramGroupedSlice) Build(c containerStore, decorating bool, target *ref d = pt.callGroupDecorators(c) } - return d.then(func() *deferred { + return d.Then(func() *promise.Deferred { // Check if we have decorated values if decoratedItems, ok := pt.getDecoratedValues(c); ok { if !target.IsValid() { @@ -632,16 +633,16 @@ func (pt paramGroupedSlice) Build(c containerStore, decorating bool, target *ref } target.Set(decoratedItems) - return &alreadyResolved + return promise.Done } // If we do not have any decorated values, find the // providers and call them. - return pt.callGroupProviders(c).then(func() *deferred { + return pt.callGroupProviders(c).Then(func() *promise.Deferred { for _, c := range c.storesToRoot() { target.Set(reflect.Append(*target, c.getValueGroup(pt.Group, pt.Type.Elem())...)) } - return &alreadyResolved + return promise.Done }) }) } diff --git a/provide.go b/provide.go index 0ad9cd95..4df296ce 100644 --- a/provide.go +++ b/provide.go @@ -30,6 +30,7 @@ import ( "go.uber.org/dig/internal/digreflect" "go.uber.org/dig/internal/dot" "go.uber.org/dig/internal/graph" + "go.uber.org/dig/internal/promise" ) // A ProvideOption modifies the default behavior of Provide. @@ -355,7 +356,7 @@ type provider interface { // // The values produced by this provider should be submitted into the // containerStore. - Call(store containerStore) *deferred + Call(store containerStore) *promise.Deferred CType() reflect.Type } diff --git a/scheduler.go b/scheduler.go index bb6687e3..98f8246f 100644 --- a/scheduler.go +++ b/scheduler.go @@ -20,6 +20,8 @@ package dig +import "go.uber.org/dig/internal/promise" + // A scheduler queues work during resolution of params. // constructorNode uses it to call its constructor function. // This may happen in parallel with other calls (parallelScheduler) or @@ -34,7 +36,7 @@ type scheduler interface { // returns. The deferred will be resolved on the "main" goroutine, so // it's safe to mutate containerStore during its resolution. It will // always be resolved with a nil error. - schedule(func()) *deferred + schedule(func()) *promise.Deferred // flush processes enqueued work. This may in turn enqueue more work; // flush will keep processing the work until it's empty. After flush is @@ -51,9 +53,9 @@ type scheduler interface { type synchronousScheduler struct{} // schedule calls func and returns an already-resolved deferred. -func (s synchronousScheduler) schedule(fn func()) *deferred { +func (s synchronousScheduler) schedule(fn func()) *promise.Deferred { fn() - return &alreadyResolved + return promise.Done } // flush does nothing. All returned deferred values are already resolved. @@ -65,7 +67,7 @@ func (s synchronousScheduler) flush() { // call and which deferred to notify afterwards. type task struct { fn func() - d *deferred + d *promise.Deferred } // parallelScheduler processes enqueued work using a fixed-size worker pool. @@ -77,8 +79,8 @@ type parallelScheduler struct { // schedule enqueues a task and returns an unresolved deferred. It will be // resolved during flush. -func (p *parallelScheduler) schedule(fn func()) *deferred { - d := &deferred{} +func (p *parallelScheduler) schedule(fn func()) *promise.Deferred { + d := new(promise.Deferred) p.tasks = append(p.tasks, task{fn, d}) return d } @@ -90,7 +92,7 @@ func (p *parallelScheduler) schedule(fn func()) *deferred { func (p *parallelScheduler) flush() { inFlight := 0 taskChan := make(chan task) - resultChan := make(chan *deferred) + resultChan := make(chan *promise.Deferred) for n := 0; n < p.concurrency; n++ { go func() { @@ -116,7 +118,7 @@ func (p *parallelScheduler) flush() { p.tasks = p.tasks[0 : len(p.tasks)-1] case d := <-resultChan: inFlight-- - d.resolve(nil) + d.Resolve(nil) } } @@ -134,8 +136,8 @@ type unboundedScheduler struct { // schedule enqueues a task and returns an unresolved deferred. It will be // resolved during flush. -func (p *unboundedScheduler) schedule(fn func()) *deferred { - d := &deferred{} +func (p *unboundedScheduler) schedule(fn func()) *promise.Deferred { + d := new(promise.Deferred) p.tasks = append(p.tasks, task{fn, d}) return d } @@ -144,7 +146,7 @@ func (p *unboundedScheduler) schedule(fn func()) *deferred { // is up to Go's allocation of OS resources to goroutines. func (p *unboundedScheduler) flush() { inFlight := 0 - resultChan := make(chan *deferred) + resultChan := make(chan *promise.Deferred) for inFlight > 0 || len(p.tasks) > 0 { if len(p.tasks) > 0 { @@ -162,7 +164,7 @@ func (p *unboundedScheduler) flush() { d := <-resultChan inFlight-- - d.resolve(nil) + d.Resolve(nil) } close(resultChan) From 58b540a20120b97d1eac2c5d1467a06411cc3d4b Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Mon, 31 Jan 2022 15:44:57 -0800 Subject: [PATCH 07/14] scheduler: move into internal/, split into files Move scheduler.go into internal/scheduler, rename and export things, and split the implementations across files. --- constructor.go | 2 +- constructor_test.go | 4 +- container.go | 9 +- decorate.go | 2 +- internal/scheduler/parallel.go | 97 ++++++++++++++++++ internal/scheduler/scheduler.go | 69 +++++++++++++ internal/scheduler/unbounded.go | 70 +++++++++++++ invoke.go | 2 +- scheduler.go | 173 -------------------------------- scope.go | 8 +- 10 files changed, 251 insertions(+), 185 deletions(-) create mode 100644 internal/scheduler/parallel.go create mode 100644 internal/scheduler/scheduler.go create mode 100644 internal/scheduler/unbounded.go delete mode 100644 scheduler.go diff --git a/constructor.go b/constructor.go index 8725f7e9..c278347e 100644 --- a/constructor.go +++ b/constructor.go @@ -162,7 +162,7 @@ func (n *constructorNode) Call(c containerStore) *promise.Deferred { Reason: err, } }).Then(func() *promise.Deferred { - return c.scheduler().schedule(func() { + return c.scheduler().Schedule(func() { results = c.invoker()(reflect.ValueOf(n.ctor), args) }) }).Then(func() *promise.Deferred { diff --git a/constructor_test.go b/constructor_test.go index 8edb5d0d..9dd2c721 100644 --- a/constructor_test.go +++ b/constructor_test.go @@ -60,7 +60,7 @@ func TestNodeAlreadyCalled(t *testing.T) { c := New() d := n.Call(c.scope) - c.scope.sched.flush() + c.scope.sched.Flush() ok, err := d.Resolved() require.True(t, ok, "deferred must be resolved") @@ -68,7 +68,7 @@ func TestNodeAlreadyCalled(t *testing.T) { require.True(t, n.called, "node must be called") d = n.Call(c.scope) - c.scope.sched.flush() + c.scope.sched.Flush() ok, err = d.Resolved() require.True(t, ok, "deferred must be resolved") diff --git a/container.go b/container.go index 1a068453..682cbfe9 100644 --- a/container.go +++ b/container.go @@ -26,6 +26,7 @@ import ( "reflect" "go.uber.org/dig/internal/dot" + "go.uber.org/dig/internal/scheduler" ) const ( @@ -144,7 +145,7 @@ type containerStore interface { invoker() invokerFn // Returns the scheduler to use for this scope. - scheduler() scheduler + scheduler() scheduler.Scheduler } // New constructs a Container. @@ -255,11 +256,11 @@ func MaxConcurrency(max int) Option { func (m maxConcurrencyOption) applyOption(container *Container) { switch { case m == 0, m == 1: - container.scope.sched = synchronousScheduler{} + container.scope.sched = scheduler.Synchronous case m > 1: - container.scope.sched = ¶llelScheduler{concurrency: int(m)} + container.scope.sched = scheduler.NewParallel(int(m)) case m < 0: - container.scope.sched = &unboundedScheduler{} + container.scope.sched = new(scheduler.Unbounded) } } diff --git a/decorate.go b/decorate.go index 4b96f9f0..98943c0d 100644 --- a/decorate.go +++ b/decorate.go @@ -132,7 +132,7 @@ func (n *decoratorNode) Call(s containerStore) *promise.Deferred { var results []reflect.Value - s.scheduler().schedule(func() { + s.scheduler().Schedule(func() { results = s.invoker()(reflect.ValueOf(n.dcor), args) }).Observe(func(_ error) { n.calling = false diff --git a/internal/scheduler/parallel.go b/internal/scheduler/parallel.go new file mode 100644 index 00000000..2017bbc8 --- /dev/null +++ b/internal/scheduler/parallel.go @@ -0,0 +1,97 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package scheduler + +import "go.uber.org/dig/internal/promise" + +// task is used by parallelScheduler to remember which function to +// call and which deferred to notify afterwards. +type task struct { + fn func() + d *promise.Deferred +} + +// Parallel processes enqueued work using a fixed-size worker pool. +// The pool is started and stopped during the call to flush. +type Parallel struct { + concurrency int + tasks []task +} + +var _ Scheduler = (*Parallel)(nil) + +// NewParallel builds a new parallel scheduler that will use the specified +// number of goroutines to run tasks. +func NewParallel(concurrency int) *Parallel { + return &Parallel{concurrency: concurrency} +} + +// Schedule enqueues a task and returns an unresolved deferred. +// It will be resolved during flush. +func (p *Parallel) Schedule(fn func()) *promise.Deferred { + d := new(promise.Deferred) + p.tasks = append(p.tasks, task{fn, d}) + return d +} + +// Flush processes enqueued work. +// concurrency controls how many executor goroutines are started and thus the +// maximum number of calls that may proceed in parallel. +// The real level of concurrency may be lower for CPU-heavy workloads if Go +// doesn't assign these goroutines to OS threads. +func (p *Parallel) Flush() { + inFlight := 0 + taskChan := make(chan task) + resultChan := make(chan *promise.Deferred) + + for n := 0; n < p.concurrency; n++ { + go func() { + for t := range taskChan { + t.fn() + resultChan <- t.d + } + }() + } + + for inFlight > 0 || len(p.tasks) > 0 { + var t task + var outChan chan<- task + + if len(p.tasks) > 0 { + t = p.tasks[len(p.tasks)-1] + outChan = taskChan + } + + select { + case outChan <- t: + inFlight++ + p.tasks = p.tasks[0 : len(p.tasks)-1] + case d := <-resultChan: + inFlight-- + d.Resolve(nil) + } + } + + close(taskChan) + close(resultChan) + + p.tasks = nil +} diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go new file mode 100644 index 00000000..f3292fce --- /dev/null +++ b/internal/scheduler/scheduler.go @@ -0,0 +1,69 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package scheduler + +import "go.uber.org/dig/internal/promise" + +// A scheduler queues work during resolution of params. +// constructorNode uses it to call its constructor function. +// This may happen in parallel with other calls (parallelScheduler) or +// synchronously, right when enqueued. +// +// Work is enqueued when building a paramList, but the user of scheduler +// must call flush() for asynchronous calls to proceed after the top-level +// paramList.BuildList() is called. +type Scheduler interface { + // schedule will call a the supplied func. The deferred will resolve + // after the func is called. The func may be called before schedule + // returns. The deferred will be resolved on the "main" goroutine, so + // it's safe to mutate containerStore during its resolution. It will + // always be resolved with a nil error. + Schedule(func()) *promise.Deferred + + // flush processes enqueued work. This may in turn enqueue more work; + // flush will keep processing the work until it's empty. After flush is + // called, every deferred returned from schedule will have been resolved. + // Asynchronous deferred values returned from schedule are resolved on the + // same goroutine as the one calling this method. + // + // The scheduler is ready for re-use after flush is called. + Flush() +} + +// Synchronous is a stateless synchronous scheduler. +// It invokes functions as soon as they are scheduled. +// This is equivalent to not using a concurrent scheduler at all. +var Synchronous = synchronous{} + +// synchronous is stateless and calls funcs as soon as they are schedule. It produces +// the exact same results as the code before deferred was introduced. +type synchronous struct{} + +var _ Scheduler = synchronous{} + +// schedule calls func and returns an already-resolved deferred. +func (s synchronous) Schedule(fn func()) *promise.Deferred { + fn() + return promise.Done +} + +// flush does nothing. All returned deferred values are already resolved. +func (s synchronous) Flush() {} diff --git a/internal/scheduler/unbounded.go b/internal/scheduler/unbounded.go new file mode 100644 index 00000000..bb4beabd --- /dev/null +++ b/internal/scheduler/unbounded.go @@ -0,0 +1,70 @@ +// Copyright (c) 2022 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package scheduler + +import "go.uber.org/dig/internal/promise" + +// Unbounded starts a goroutine per task. +// Maximum concurrency is controlled by Go's allocation of OS threads to +// goroutines. +type Unbounded struct { + tasks []task +} + +var _ Scheduler = (*Unbounded)(nil) + +// Schedule enqueues a task and returns an unresolved deferred. +// It will be resolved during flush. +func (p *Unbounded) Schedule(fn func()) *promise.Deferred { + d := new(promise.Deferred) + p.tasks = append(p.tasks, task{fn, d}) + return d +} + +// Flush processes enqueued work with unlimited concurrency. +// The actual limit is up to Go's allocation of OS resources to goroutines. +func (p *Unbounded) Flush() { + inFlight := 0 + resultChan := make(chan *promise.Deferred) + + for inFlight > 0 || len(p.tasks) > 0 { + if len(p.tasks) > 0 { + t := p.tasks[len(p.tasks)-1] + p.tasks = p.tasks[0 : len(p.tasks)-1] + + go func() { + t.fn() + resultChan <- t.d + }() + + inFlight++ + continue + } + + d := <-resultChan + inFlight-- + d.Resolve(nil) + } + + close(resultChan) + + p.tasks = nil +} diff --git a/invoke.go b/invoke.go index 43366955..217ebe1c 100644 --- a/invoke.go +++ b/invoke.go @@ -88,7 +88,7 @@ func (s *Scope) Invoke(function interface{}, opts ...InvokeOption) error { d.Observe(func(err2 error) { err = err2 }) - s.sched.flush() + s.sched.Flush() if err != nil { return errArgumentsFailed{ diff --git a/scheduler.go b/scheduler.go deleted file mode 100644 index 98f8246f..00000000 --- a/scheduler.go +++ /dev/null @@ -1,173 +0,0 @@ -// Copyright (c) 2021 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package dig - -import "go.uber.org/dig/internal/promise" - -// A scheduler queues work during resolution of params. -// constructorNode uses it to call its constructor function. -// This may happen in parallel with other calls (parallelScheduler) or -// synchronously, right when enqueued. -// -// Work is enqueued when building a paramList, but the user of scheduler -// must call flush() for asynchronous calls to proceed after the top-level -// paramList.BuildList() is called. -type scheduler interface { - // schedule will call a the supplied func. The deferred will resolve - // after the func is called. The func may be called before schedule - // returns. The deferred will be resolved on the "main" goroutine, so - // it's safe to mutate containerStore during its resolution. It will - // always be resolved with a nil error. - schedule(func()) *promise.Deferred - - // flush processes enqueued work. This may in turn enqueue more work; - // flush will keep processing the work until it's empty. After flush is - // called, every deferred returned from schedule will have been resolved. - // Asynchronous deferred values returned from schedule are resolved on the - // same goroutine as the one calling this method. - // - // The scheduler is ready for re-use after flush is called. - flush() -} - -// synchronousScheduler is stateless and calls funcs as soon as they are schedule. It produces -// the exact same results as the code before deferred was introduced. -type synchronousScheduler struct{} - -// schedule calls func and returns an already-resolved deferred. -func (s synchronousScheduler) schedule(fn func()) *promise.Deferred { - fn() - return promise.Done -} - -// flush does nothing. All returned deferred values are already resolved. -func (s synchronousScheduler) flush() { - -} - -// task is used by parallelScheduler to remember which function to -// call and which deferred to notify afterwards. -type task struct { - fn func() - d *promise.Deferred -} - -// parallelScheduler processes enqueued work using a fixed-size worker pool. -// The pool is started and stopped during the call to flush. -type parallelScheduler struct { - concurrency int - tasks []task -} - -// schedule enqueues a task and returns an unresolved deferred. It will be -// resolved during flush. -func (p *parallelScheduler) schedule(fn func()) *promise.Deferred { - d := new(promise.Deferred) - p.tasks = append(p.tasks, task{fn, d}) - return d -} - -// flush processes enqueued work. concurrency controls how many executor -// goroutines are started and thus the maximum number of calls that may -// proceed in parallel. The real level of concurrency may be lower for -// CPU-heavy workloads if Go doesn't assign these goroutines to OS threads. -func (p *parallelScheduler) flush() { - inFlight := 0 - taskChan := make(chan task) - resultChan := make(chan *promise.Deferred) - - for n := 0; n < p.concurrency; n++ { - go func() { - for t := range taskChan { - t.fn() - resultChan <- t.d - } - }() - } - - for inFlight > 0 || len(p.tasks) > 0 { - var t task - var outChan chan<- task - - if len(p.tasks) > 0 { - t = p.tasks[len(p.tasks)-1] - outChan = taskChan - } - - select { - case outChan <- t: - inFlight++ - p.tasks = p.tasks[0 : len(p.tasks)-1] - case d := <-resultChan: - inFlight-- - d.Resolve(nil) - } - } - - close(taskChan) - close(resultChan) - - p.tasks = nil -} - -// unboundedScheduler starts a goroutine per task. Maximum concurrency is -// controlled by Go's allocation of OS threads to goroutines. -type unboundedScheduler struct { - tasks []task -} - -// schedule enqueues a task and returns an unresolved deferred. It will be -// resolved during flush. -func (p *unboundedScheduler) schedule(fn func()) *promise.Deferred { - d := new(promise.Deferred) - p.tasks = append(p.tasks, task{fn, d}) - return d -} - -// flush processes enqueued work with unlimited concurrency. The actual limit -// is up to Go's allocation of OS resources to goroutines. -func (p *unboundedScheduler) flush() { - inFlight := 0 - resultChan := make(chan *promise.Deferred) - - for inFlight > 0 || len(p.tasks) > 0 { - if len(p.tasks) > 0 { - t := p.tasks[len(p.tasks)-1] - p.tasks = p.tasks[0 : len(p.tasks)-1] - - go func() { - t.fn() - resultChan <- t.d - }() - - inFlight++ - continue - } - - d := <-resultChan - inFlight-- - d.Resolve(nil) - } - - close(resultChan) - - p.tasks = nil -} diff --git a/scope.go b/scope.go index cc231f88..74c169ee 100644 --- a/scope.go +++ b/scope.go @@ -27,6 +27,8 @@ import ( "reflect" "sort" "time" + + "go.uber.org/dig/internal/scheduler" ) // A ScopeOption modifies the default behavior of Scope; currently, @@ -78,7 +80,7 @@ type Scope struct { // invokerFn calls a function with arguments provided to Provide or Invoke. invokerFn invokerFn - sched scheduler + sched scheduler.Scheduler // graph of this Scope. Note that this holds the dependency graph of all the // nodes that affect this Scope, not just the ones provided directly to this Scope. @@ -101,7 +103,7 @@ func newScope() *Scope { decoratedGroups: make(map[key]reflect.Value), invokerFn: defaultInvoker, rand: rand.New(rand.NewSource(time.Now().UnixNano())), - sched: synchronousScheduler{}, + sched: scheduler.Synchronous, } s.gh = newGraphHolder(s) return s @@ -268,7 +270,7 @@ func (s *Scope) invoker() invokerFn { return s.invokerFn } -func (s *Scope) scheduler() scheduler { +func (s *Scope) scheduler() scheduler.Scheduler { return s.sched } From c8adb75247b0eda2332a1a1978fcf51963c1b394 Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Mon, 31 Jan 2022 15:47:52 -0800 Subject: [PATCH 08/14] lint --- internal/promise/deferred.go | 1 + internal/scheduler/scheduler.go | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/promise/deferred.go b/internal/promise/deferred.go index 8e01edfb..41c8b367 100644 --- a/internal/promise/deferred.go +++ b/internal/promise/deferred.go @@ -20,6 +20,7 @@ package promise +// Observer is a function that gets called when a Deferred resolves. type Observer func(error) // Deferred is an observable future result that may fail. diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index f3292fce..8c6025c2 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -22,7 +22,8 @@ package scheduler import "go.uber.org/dig/internal/promise" -// A scheduler queues work during resolution of params. +// Scheduler queues work during resolution of params. +// // constructorNode uses it to call its constructor function. // This may happen in parallel with other calls (parallelScheduler) or // synchronously, right when enqueued. From 3359a6c849c9c5e15704aefc945d4e45a5322291 Mon Sep 17 00:00:00 2001 From: Sung Yoon Whang Date: Sat, 4 Jun 2022 21:06:33 -0700 Subject: [PATCH 09/14] Fix scope scheduler copying issue The scope scheduler wasn't being copied upon a new scope being created. This caused multiple schedulers to exist in the app each with different settings, causing some funky behavior. Align the scheduler behavior by copying parents scheduler over to the child scopes as new scopes are created. --- constructor.go | 1 + internal/scheduler/parallel.go | 3 +-- internal/scheduler/unbounded.go | 7 +------ scope.go | 1 + 4 files changed, 4 insertions(+), 8 deletions(-) diff --git a/constructor.go b/constructor.go index da3bae33..3cb55942 100644 --- a/constructor.go +++ b/constructor.go @@ -155,6 +155,7 @@ func (n *constructorNode) Call(c containerStore) *promise.Deferred { Func: n.location, Reason: err, }) + return &n.deferred } var args []reflect.Value diff --git a/internal/scheduler/parallel.go b/internal/scheduler/parallel.go index 2017bbc8..548389ae 100644 --- a/internal/scheduler/parallel.go +++ b/internal/scheduler/parallel.go @@ -83,7 +83,7 @@ func (p *Parallel) Flush() { select { case outChan <- t: inFlight++ - p.tasks = p.tasks[0 : len(p.tasks)-1] + p.tasks = p.tasks[:len(p.tasks)-1] case d := <-resultChan: inFlight-- d.Resolve(nil) @@ -92,6 +92,5 @@ func (p *Parallel) Flush() { close(taskChan) close(resultChan) - p.tasks = nil } diff --git a/internal/scheduler/unbounded.go b/internal/scheduler/unbounded.go index bb4beabd..af001a08 100644 --- a/internal/scheduler/unbounded.go +++ b/internal/scheduler/unbounded.go @@ -48,23 +48,18 @@ func (p *Unbounded) Flush() { for inFlight > 0 || len(p.tasks) > 0 { if len(p.tasks) > 0 { t := p.tasks[len(p.tasks)-1] - p.tasks = p.tasks[0 : len(p.tasks)-1] - + p.tasks = p.tasks[:len(p.tasks)-1] go func() { t.fn() resultChan <- t.d }() - inFlight++ continue } - d := <-resultChan inFlight-- d.Resolve(nil) } - close(resultChan) - p.tasks = nil } diff --git a/scope.go b/scope.go index 50defa7a..d84ab6e4 100644 --- a/scope.go +++ b/scope.go @@ -120,6 +120,7 @@ func (s *Scope) Scope(name string, opts ...ScopeOption) *Scope { child.parentScope = s child.invokerFn = s.invokerFn child.deferAcyclicVerification = s.deferAcyclicVerification + child.sched = s.sched // child copies the parent's graph nodes. child.gh.nodes = append(child.gh.nodes, s.gh.nodes...) From 162e85fd0940c83b3ecb857d1f5918015f868fab Mon Sep 17 00:00:00 2001 From: EstebanOlmedo Date: Fri, 12 Aug 2022 14:23:06 -0700 Subject: [PATCH 10/14] Fix issue while decorating paramObject fields While building the fields of a paramObject struct, there could be the case where two fields use the same decorator, but one of them isn't decorated. This fixes that behavior. This also fixes a bug that created a data race when there was a decorated value. --- constructor.go | 9 ++++++--- decorate.go | 10 +++++++--- param.go | 31 +++++++++++++++++++------------ 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/constructor.go b/constructor.go index 3cb55942..5c2e9728 100644 --- a/constructor.go +++ b/constructor.go @@ -147,7 +147,7 @@ func (n *constructorNode) Call(c containerStore) *promise.Deferred { return &n.deferred } - n.state = functionOnStack + n.state = functionVisited n.deferred = promise.Deferred{} if err := shallowCheckDependencies(c, n.paramList); err != nil { @@ -161,7 +161,11 @@ func (n *constructorNode) Call(c containerStore) *promise.Deferred { var args []reflect.Value var results []reflect.Value - n.paramList.BuildList(c, &args).Catch(func(err error) error { + d := n.paramList.BuildList(c, &args) + + n.state = functionOnStack + + d.Catch(func(err error) error { if err != nil { return errArgumentsFailed{ Func: n.location, @@ -192,7 +196,6 @@ func (n *constructorNode) Call(c containerStore) *promise.Deferred { n.deferred.Resolve(err) return nil }) - return &n.deferred } diff --git a/decorate.go b/decorate.go index 9700692f..7362b03c 100644 --- a/decorate.go +++ b/decorate.go @@ -33,8 +33,9 @@ import ( type functionState int const ( - functionReady functionState = iota - functionOnStack + functionReady functionState = iota + functionVisited // For avoiding cycles + functionOnStack // For telling that this function is already scheduled functionCalled ) @@ -115,7 +116,8 @@ func (n *decoratorNode) Call(s containerStore) *promise.Deferred { return &n.deferred } - n.state = functionOnStack + // We mark it as "visited" to avoid cycles + n.state = functionVisited n.deferred = promise.Deferred{} if err := shallowCheckDependencies(s, n.params); err != nil { @@ -128,6 +130,8 @@ func (n *decoratorNode) Call(s containerStore) *promise.Deferred { var args []reflect.Value d := n.params.BuildList(s, &args) + n.state = functionOnStack + d.Observe(func(err error) { if err != nil { n.state = functionCalled diff --git a/param.go b/param.go index e7610be5..a0d469cc 100644 --- a/param.go +++ b/param.go @@ -212,7 +212,10 @@ func (ps paramSingle) buildWithDecorators(c containerStore, target *reflect.Valu if d, found = s.getValueDecorator(ps.Name, ps.Type); !found { continue } - if d.State() == functionOnStack { + // This is for avoiding cycles i.e decorator -> function + // ^ | + // \ ------- / + if d.State() == functionVisited { d = nil continue } @@ -275,6 +278,7 @@ func (ps paramSingle) build(c containerStore, target *reflect.Value) *promise.De if target.IsValid() { target.Set(v) } + def.Resolve(nil) return } @@ -290,7 +294,6 @@ func (ps paramSingle) build(c containerStore, target *reflect.Value) *promise.De }) }) } - def.Resolve(nil) return def } @@ -412,16 +415,22 @@ func (po paramObject) Build(c containerStore, target *reflect.Value) *promise.De } fields = append(fields, f) } - fields = append(fields, softGroupsQueue...) - children := make([]*promise.Deferred, len(fields)) - for i, f := range fields { - f := f - field := target.Field(f.FieldIndex) - children[i] = f.Build(c, &field) + buildFields := func(fields []paramObjectField) *promise.Deferred { + children := make([]*promise.Deferred, len(fields)) + + for i, f := range fields { + f := f + field := target.Field(f.FieldIndex) + children[i] = f.Build(c, &field) + } + + return promise.WhenAll(children...) } - return promise.WhenAll(children...) + return buildFields(fields).Then(func() *promise.Deferred { + return buildFields(softGroupsQueue) + }) } // paramObjectField is a single field of a dig.In struct. @@ -575,8 +584,6 @@ func (pt paramGroupedSlice) getDecoratedValues(c containerStore) (reflect.Value, } // search the given container and its parents for matching group decorators -// and call them to commit values. If any decorators return an error, -// that error is returned immediately. If all decorators succeeds, nil is returned. // The order in which the decorators are invoked is from the top level scope to // the current scope, to account for decorators that decorate values that were // already decorated. @@ -587,7 +594,7 @@ func (pt paramGroupedSlice) callGroupDecorators(c containerStore) *promise.Defer c := stores[i] if d, ok := c.getGroupDecorator(pt.Group, pt.Type.Elem()); ok { - if d.State() == functionOnStack { + if d.State() == functionVisited { // This decorator is already being run. Avoid cycle // and look further. continue From 7449537eed1da3376c2322de70828fb5f6e552ae Mon Sep 17 00:00:00 2001 From: EstebanOlmedo Date: Tue, 16 Aug 2022 11:32:48 -0700 Subject: [PATCH 11/14] Fix issue while building a paramSingle This fixes a bug that ocurred while calling paramSingle.build that could lead in panicking because of invalid memory address or nil pointer dereference introduced in: 8f06d1de2381a8467abf8acf8eb6ee3cf71c7c01 --- param.go | 46 +++++++++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/param.go b/param.go index a0d469cc..7b7d624a 100644 --- a/param.go +++ b/param.go @@ -246,13 +246,13 @@ func (ps paramSingle) buildWithDecorators(c containerStore, target *reflect.Valu func (ps paramSingle) build(c containerStore, target *reflect.Value) *promise.Deferred { var providingContainer containerStore var providers []provider - def := new(promise.Deferred) for _, container := range c.storesToRoot() { // First we check if the value it's stored in the current store if v, ok := container.getValue(ps.Name, ps.Type); ok { - target.Set(v) - def.Resolve(nil) - return def + if v.IsValid() { + target.Set(v) + } + return promise.Done } providers = container.getValueProviders(ps.Name, ps.Type) @@ -264,37 +264,37 @@ func (ps paramSingle) build(c containerStore, target *reflect.Value) *promise.De if len(providers) == 0 { if ps.Optional { target.Set(reflect.Zero(ps.Type)) - def.Resolve(nil) + return promise.Done } - def.Resolve(newErrMissingTypes(c, key{name: ps.Name, t: ps.Type})) - return def + return promise.Fail(newErrMissingTypes(c, key{name: ps.Name, t: ps.Type})) } + var children []*promise.Deferred + def := new(promise.Deferred) for _, n := range providers { - n.Call(n.OrigScope()).Observe(func(err error) { - if err == nil { - // If we get here, it's impossible for the value to be absent from the - // container. - v, _ := providingContainer.getValue(ps.Name, ps.Type) - if target.IsValid() { - target.Set(v) - } - def.Resolve(nil) - return - } - + child := n.Call(n.OrigScope()).Catch(func(err error) error { // If we're missing dependencies but the parameter itself is optional, // we can just move on. if _, ok := err.(errMissingDependencies); ok && ps.Optional { - return + return nil } - def.Resolve(errParamSingleFailed{ + return errParamSingleFailed{ CtorID: n.ID(), Key: key{t: ps.Type, name: ps.Name}, Reason: err, - }) + } }) + children = append(children, child) } - return def + return promise.WhenAll(children...).Then(func() *promise.Deferred { + // If we get here, it's impossible for the value to be absent from the + // container. + v, _ := providingContainer.getValue(ps.Name, ps.Type) + if v.IsValid() { + target.Set(v) + } + def.Resolve(nil) + return def + }) } func (ps paramSingle) Build(c containerStore, target *reflect.Value) *promise.Deferred { From 1a8f019bfd7647baf4c6f51716cd4e72f6cb65ee Mon Sep 17 00:00:00 2001 From: Alexandra Parker Date: Wed, 30 Nov 2022 11:51:24 -0800 Subject: [PATCH 12/14] Callback to Deferred.Catch is only ever called with non-nil error --- constructor.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/constructor.go b/constructor.go index 5c2e9728..91dbe082 100644 --- a/constructor.go +++ b/constructor.go @@ -166,13 +166,10 @@ func (n *constructorNode) Call(c containerStore) *promise.Deferred { n.state = functionOnStack d.Catch(func(err error) error { - if err != nil { - return errArgumentsFailed{ - Func: n.location, - Reason: err, - } + return errArgumentsFailed{ + Func: n.location, + Reason: err, } - return nil }).Then(func() *promise.Deferred { return c.scheduler().Schedule(func() { results = c.invoker()(reflect.ValueOf(n.ctor), args) From 14f1bbfb2bdd238e861749a0082988d752bc05ae Mon Sep 17 00:00:00 2001 From: Alexandra Parker Date: Wed, 30 Nov 2022 12:27:33 -0800 Subject: [PATCH 13/14] Some documentation fixes --- constructor.go | 7 +++---- doc.go | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/constructor.go b/constructor.go index 91dbe082..2b69006e 100644 --- a/constructor.go +++ b/constructor.go @@ -137,11 +137,10 @@ func (n *constructorNode) String() string { // passed to newConstructorNode. // // If constructorNode has a unresolved deferred already in the process of building, it will return that one. If it has -// already been successfully called, it will return an already-resolved deferred. Together these mean it will try the -// call again if it failed last time. +// already been called, it will return an already-resolved deferred. errMissingDependencies is non-fatal; any other +// errors means this node is permanently in an error state. // -// On failure, the returned pointer is not guaranteed to stay in a failed state; another call will reset it back to its -// zero value; don't store the returned pointer. (It will still call each observer only once.) +// Don't store the returned pointer; it points into a field that may be reused on non-fatal errors. func (n *constructorNode) Call(c containerStore) *promise.Deferred { if n.State() == functionCalled || n.State() == functionOnStack { return &n.deferred diff --git a/doc.go b/doc.go index b8268eb4..3f9ab85c 100644 --- a/doc.go +++ b/doc.go @@ -98,7 +98,7 @@ // // # Invoke // -// Types added to to the container may be consumed by using the Invoke method. +// Types added to the container may be consumed by using the Invoke method. // Invoke accepts any function that accepts one or more parameters and // optionally, returns an error. Dig calls the function with the requested // type, instantiating only those types that were requested by the function. From 58a2c6472574ec0f08ef3efdf2fe02e90a8dd44c Mon Sep 17 00:00:00 2001 From: Alexandra Parker Date: Wed, 30 Nov 2022 12:37:50 -0800 Subject: [PATCH 14/14] Remove unnecessary rebind f.Build has a struct receiver, not a pointer, so that method call copies f anyway. --- param.go | 1 - 1 file changed, 1 deletion(-) diff --git a/param.go b/param.go index 7b7d624a..4d4f9507 100644 --- a/param.go +++ b/param.go @@ -420,7 +420,6 @@ func (po paramObject) Build(c containerStore, target *reflect.Value) *promise.De children := make([]*promise.Deferred, len(fields)) for i, f := range fields { - f := f field := target.Field(f.FieldIndex) children[i] = f.Build(c, &field) }