diff --git a/api/allocations.go b/api/allocations.go index 08dd3fbed21..76d60ce23c7 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -436,12 +436,14 @@ type AllocatedTaskResources struct { Memory AllocatedMemoryResources Networks []*NetworkResource Devices []*AllocatedDeviceResource + Custom []*CustomResource } type AllocatedSharedResources struct { DiskMB int64 Networks []*NetworkResource Ports []PortMapping + Custom []*CustomResource } type PortMapping struct { diff --git a/api/nodes.go b/api/nodes.go index fda4763f7df..6359a9a6b2f 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -587,6 +587,7 @@ type NodeResources struct { Disk NodeDiskResources Networks []*NetworkResource Devices []*NodeDeviceResource + Custom []*NodeCustomResource MinDynamicPort int MaxDynamicPort int @@ -629,6 +630,18 @@ type NodeReservedNetworkResources struct { ReservedHostPorts string } +type NodeCustomResource struct { + Name string `hcl:"name,label"` + Version uint64 `hcl:"version,optional"` + Type CustomResourceType `hcl:"type,optional"` + Scope CustomResourceScope `hcl:"scope,optional"` + + Quantity int64 `hcl:"quantity,optional"` + Range string `hcl:"range,optional"` + Items []any `hcl:"items,optional"` + Meta map[string]string `hcl:"meta,block"` +} + type CSITopologyRequest struct { Required []*CSITopology `hcl:"required"` Preferred []*CSITopology `hcl:"preferred"` diff --git a/api/resources.go b/api/resources.go index f45a4615145..573981ccc6f 100644 --- a/api/resources.go +++ b/api/resources.go @@ -20,6 +20,7 @@ type Resources struct { Devices []*RequestedDevice `hcl:"device,block"` NUMA *NUMAResource `hcl:"numa,block"` SecretsMB *int `mapstructure:"secrets" hcl:"secrets,optional"` + Custom []*CustomResource `mapstructure:"custom" hcl:"custom,block"` // COMPAT(0.10) // XXX Deprecated. Please do not use. The field will be removed in Nomad @@ -330,3 +331,32 @@ func (d *RequestedDevice) Canonicalize() { a.Canonicalize() } } + +type CustomResource struct { + Name string `hcl:"name,label"` + Version uint64 `hcl:"version,optional"` + Type CustomResourceType `hcl:"type,optional"` + Scope CustomResourceScope `hcl:"scope,optional"` + + Quantity int64 `hcl:"quantity,optional"` + Range string `hcl:"range,optional"` + Items []any `hcl:"items,optional"` + Constraints []*Constraint `hcl:"constraint,block"` +} + +type CustomResourceScope string + +const ( + CustomResourceScopeGroup CustomResourceScope = "group" + CustomResourceScopeTask CustomResourceScope = "task" +) + +type CustomResourceType string + +const ( + CustomResourceTypeRatio CustomResourceType = "ratio" // ex. weight + CustomResourceTypeCappedRatio CustomResourceType = "capped-ratio" // ex. resource.cpu + CustomResourceTypeCountable CustomResourceType = "countable" // ex. memory, disk + CustomResourceTypeDynamicInstance CustomResourceType = "dynamic" // ex. ports, cores + CustomResourceTypeStaticInstance CustomResourceType = "static" // ex. ports, devices +) diff --git a/client/client.go b/client/client.go index 587d70b6a09..d428c84f1da 100644 --- a/client/client.go +++ b/client/client.go @@ -1649,12 +1649,14 @@ func (c *Client) setupNode() error { node.NodeResources.MinDynamicPort = newConfig.MinDynamicPort node.NodeResources.MaxDynamicPort = newConfig.MaxDynamicPort node.NodeResources.Processors = newConfig.Node.NodeResources.Processors + node.NodeResources.Custom = newConfig.CustomResources if node.NodeResources.Processors.Empty() { node.NodeResources.Processors = structs.NodeProcessorResources{ Topology: &numalib.Topology{}, } } + node.NodeResources.Custom = newConfig.CustomResources } if node.ReservedResources == nil { node.ReservedResources = &structs.NodeReservedResources{} @@ -1813,6 +1815,8 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp if cpu := response.NodeResources.Processors.TotalCompute(); cpu > 0 { newConfig.CpuCompute = cpu } + + response.NodeResources.Custom = newConfig.CustomResources } if nodeHasChanged { diff --git a/client/config/config.go b/client/config/config.go index 5b6525d1803..ad6bb25d7fd 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -395,6 +395,8 @@ type Config struct { // LogFile is used by MonitorExport to stream a server's log file LogFile string `hcl:"log_file"` + + CustomResources structs.CustomResources } type APIListenerRegistrar interface { @@ -896,6 +898,7 @@ func (c *Config) Copy() *Config { nc.ReservableCores = slices.Clone(c.ReservableCores) nc.Artifact = c.Artifact.Copy() nc.Users = c.Users.Copy() + nc.CustomResources = c.CustomResources.Copy() return &nc } diff --git a/command/agent/agent.go b/command/agent/agent.go index 56dc7089e36..9ca75bd8791 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -158,6 +158,7 @@ func NewAgent(config *Config, logger log.InterceptLogger, logOutput io.Writer, i if err := a.setupServer(); err != nil { return nil, err } + if err := a.setupClient(); err != nil { return nil, err } @@ -936,6 +937,7 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) { conf.Node.Meta = agentConfig.Client.Meta conf.Node.NodeClass = agentConfig.Client.NodeClass conf.Node.NodePool = agentConfig.Client.NodePool + conf.CustomResources = agentConfig.Client.CustomResources // Set up the HTTP advertise address conf.Node.HTTPAddr = agentConfig.AdvertiseAddrs.HTTP diff --git a/command/agent/config.go b/command/agent/config.go index 6aeb1d9b388..fbcd1655586 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -443,6 +443,10 @@ type ClientConfig struct { // LogFile is used by MonitorExport to stream a client's log file LogFile string `hcl:"log_file"` + + // CustomResources allows the user to define custom schedulable resources on + // this node + CustomResources structs.CustomResources `hcl:"custom_resource,block"` } func (c *ClientConfig) Copy() *ClientConfig { @@ -466,6 +470,7 @@ func (c *ClientConfig) Copy() *ClientConfig { nc.Drain = c.Drain.Copy() nc.Users = c.Users.Copy() nc.ExtraKeysHCL = slices.Clone(c.ExtraKeysHCL) + nc.CustomResources = c.CustomResources.Copy() return &nc } @@ -2883,6 +2888,10 @@ func (c *ClientConfig) Merge(b *ClientConfig) *ClientConfig { result.IntroToken = b.IntroToken } + if b.CustomResources != nil { + result.CustomResources.Merge(b.CustomResources) + } + return &result } diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index afd023c667f..ee5cd45a17c 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -330,6 +330,11 @@ func extraKeys(c *Config) error { helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, "host_network") } + for _, cr := range c.Client.CustomResources { + helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, "custom_resource") + helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, cr.Name) + } + // Remove Template extra keys for _, t := range []string{"function_denylist", "disable_file_sandbox", "max_stale", "wait", "wait_bounds", "block_query_wait", "consul_retry", "vault_retry", "nomad_retry"} { helper.RemoveEqualFold(&c.Client.ExtraKeysHCL, t) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index df055515ed7..d4b4f5d4774 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -1646,6 +1646,22 @@ func ApiResourcesToStructs(in *api.Resources) *structs.Resources { out.SecretsMB = *in.SecretsMB } + if len(in.Custom) > 0 { + out.Custom = []*structs.CustomResource{} + for _, apiCr := range in.Custom { + out.Custom = append(out.Custom, &structs.CustomResource{ + Name: apiCr.Name, + Version: apiCr.Version, + Type: structs.CustomResourceType(apiCr.Type), + Scope: structs.CustomResourceScope(apiCr.Scope), + Quantity: apiCr.Quantity, + Range: apiCr.Range, + Items: apiCr.Items, + Constraints: ApiConstraintsToStructs(apiCr.Constraints), + }) + } + } + return out } diff --git a/command/node_status.go b/command/node_status.go index df31b3c02fd..463325db148 100644 --- a/command/node_status.go +++ b/command/node_status.go @@ -569,6 +569,7 @@ func (c *NodeStatusCommand) formatNode(client *api.Client, node *api.Node) int { c.outputNodeNetworkInfo(node) c.outputNodeCSIVolumeInfo(client, node, runningAllocs) c.outputNodeDriverInfo(node) + c.outputNodeCustomResourceInfo(node) } // Emit node events @@ -788,6 +789,35 @@ func (c *NodeStatusCommand) outputNodeDriverInfo(node *api.Node) { c.Ui.Output(formatList(nodeDrivers)) } +func (c *NodeStatusCommand) outputNodeCustomResourceInfo(node *api.Node) { + c.Ui.Output(c.Colorize().Color("\n[bold]Custom Resources")) + + resources := node.NodeResources.Custom + if len(resources) == 0 { + c.Ui.Output("") + return + } + + out := make([]string, 0, len(resources)+1) + out = append(out, "Resource|Version|Type|Available") + + sort.Slice(resources, func(i int, j int) bool { return resources[i].Name < resources[j].Name }) + for _, resource := range resources { + switch resource.Type { + case api.CustomResourceTypeRatio, api.CustomResourceTypeCappedRatio, + api.CustomResourceTypeCountable: + out = append(out, fmt.Sprintf("%s|%d|%s|%d", + resource.Name, resource.Version, resource.Type, resource.Quantity)) + + case api.CustomResourceTypeStaticInstance, api.CustomResourceTypeDynamicInstance: + out = append(out, fmt.Sprintf("%s|%d|%s|%v", + resource.Name, resource.Version, resource.Type, resource.Items)) + + } + } + c.Ui.Output(formatList(out)) +} + func (c *NodeStatusCommand) outputNodeStatusEvents(node *api.Node) { c.Ui.Output(c.Colorize().Color("\n[bold]Node Events")) c.outputNodeEvent(node.Events) diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 59b6b3d38c7..4f6f51512c2 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -163,7 +163,8 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi cr := alloc.AllocatedResources.Comparable() used.Add(cr) - // Adding the comparable resource unions reserved core sets, need to check if reserved cores overlap + // Adding the comparable resource unions reserved core sets, need to + // check if reserved cores overlap for _, core := range cr.Flattened.Cpu.ReservedCores { if _, ok := reservedCores[core]; ok { coreOverlap = true diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index 79db2d1bf07..aeffa2ada5a 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -638,6 +638,190 @@ func TestAllocsFit_MemoryOversubscription(t *testing.T) { must.Eq(t, 12000, used.Flattened.Memory.MemoryMaxMB) } +func TestAllocsFit_CustomResources(t *testing.T) { + ci.Parallel(t) + + n0 := node2k() + n0.NodeResources.Memory.MemoryMB = 2048 + n0.ReservedResources = nil + n0.NodeResources.Custom = []*CustomResource{ + { + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Range: "1-3,7-10", + Items: []any{1, 2, 3, 7, 8, 9}, + }, + { + Name: "bar_countable", + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 10_000, + }, + } + + a1 := &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{CpuShares: 100}, + Memory: AllocatedMemoryResources{MemoryMB: 500}, + Custom: []*CustomResource{ + { + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Items: []any{3}, + }, + }, + }, + }, + }, + } + + ok, msg, used, err := AllocsFit(n0, []*Allocation{a1}, nil, false) + test.True(t, ok) + test.Eq(t, "", msg) + test.Eq(t, &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 100, ReservedCores: []uint16{}}, + Memory: AllocatedMemoryResources{MemoryMB: 500, MemoryMaxMB: 500}, + Custom: CustomResources{{ + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Items: []any{3}, + }}, + }, + Shared: AllocatedSharedResources{}, + }, used) + test.NoError(t, err) + + a2 := &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{CpuShares: 100}, + Memory: AllocatedMemoryResources{MemoryMB: 500}, + Custom: []*CustomResource{ + { + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Items: []any{5}, + }, + }, + }, + }, + }, + } + ok, msg, used, err = AllocsFit(n0, []*Allocation{a1, a2}, nil, false) + test.False(t, ok) + test.Eq(t, "custom resource: foo_dynamic", msg) + test.Eq(t, &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 200, ReservedCores: []uint16{}}, + Memory: AllocatedMemoryResources{MemoryMB: 1000, MemoryMaxMB: 1000}, + Custom: CustomResources{{ + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Items: []any{3, 5}, // TODO(tgross): shows as used here even if not available? + }}, + }, + Shared: AllocatedSharedResources{}, + }, used) + test.NoError(t, err) + + a2.AllocatedResources.Tasks["web"].Custom[0].Items = []any{7} + ok, msg, used, err = AllocsFit(n0, []*Allocation{a1, a2}, nil, false) + test.True(t, ok) + test.Eq(t, "", msg) + test.Eq(t, &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 200, ReservedCores: []uint16{}}, + Memory: AllocatedMemoryResources{MemoryMB: 1000, MemoryMaxMB: 1000}, + Custom: CustomResources{{ + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Items: []any{3, 7}, + }}, + }, + Shared: AllocatedSharedResources{}, + }, used) + test.NoError(t, err) + + a2.AllocatedResources.Shared.Custom = CustomResources{{ + Name: "bar_countable", + Version: 2, + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 10_000, + }} + ok, msg, used, err = AllocsFit(n0, []*Allocation{a1, a2}, nil, false) + test.False(t, ok) + test.Eq(t, "custom resources could not be compared: resource request 2 for \"bar_countable\" is newer than available version 0", msg) + test.Eq(t, &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 200, ReservedCores: []uint16{}}, + Memory: AllocatedMemoryResources{MemoryMB: 1000, MemoryMaxMB: 1000}, + Custom: CustomResources{{ + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Items: []any{3, 7}, + }}, + }, + Shared: AllocatedSharedResources{ + Custom: CustomResources{{ + Name: "bar_countable", + Version: 2, + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 10_000, + }}, + }, + }, used) + test.NoError(t, err) + + a2.AllocatedResources.Shared.Custom[0].Version = 0 + ok, msg, used, err = AllocsFit(n0, []*Allocation{a1, a2}, nil, false) + test.True(t, ok) + test.Eq(t, "", msg) + test.Eq(t, &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 200, ReservedCores: []uint16{}}, + Memory: AllocatedMemoryResources{MemoryMB: 1000, MemoryMaxMB: 1000}, + Custom: CustomResources{{ + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Items: []any{3, 7}, + }}, + }, + Shared: AllocatedSharedResources{ + Custom: CustomResources{{ + Name: "bar_countable", + Version: 2, + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 10_000, + }}, + }, + }, used) + test.NoError(t, err) + +} + func TestScoreFitBinPack(t *testing.T) { ci.Parallel(t) diff --git a/nomad/structs/resources.go b/nomad/structs/resources.go new file mode 100644 index 00000000000..9aba4e6cbb3 --- /dev/null +++ b/nomad/structs/resources.go @@ -0,0 +1,393 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package structs + +import ( + "cmp" + "errors" + "fmt" + "maps" + "math/rand" + "slices" + "strings" + + "github.com/hashicorp/go-set/v3" + "github.com/hashicorp/nomad/helper" +) + +type CustomResource struct { + // TODO(tgross): we need this old ",key" construction for HCL v1 parsing of + // the client config; should this even be the same struct as the config? + Name string `hcl:",key"` + Version uint64 // optional + Type CustomResourceType + Scope CustomResourceScope + + Quantity int64 // for countable or capped-ratio + Range string // for dynamic or static + Items []any // for dynamic or static + Meta map[string]string + + Constraints []*Constraint +} + +type CustomResourceScope string + +const ( + CustomResourceScopeGroup CustomResourceScope = "group" + CustomResourceScopeTask CustomResourceScope = "task" +) + +type CustomResourceType string + +const ( + CustomResourceTypeRatio CustomResourceType = "ratio" // ex. weight + CustomResourceTypeCappedRatio CustomResourceType = "capped-ratio" // ex. resource.cpu + CustomResourceTypeCountable CustomResourceType = "countable" // ex. memory, disk + CustomResourceTypeDynamicInstance CustomResourceType = "dynamic" // ex. ports, cores + CustomResourceTypeStaticInstance CustomResourceType = "static" // ex. ports, devices +) + +// Copy returns a deep clone of the CustomResource +func (cr *CustomResource) Copy() *CustomResource { + ncr := new(CustomResource) + *ncr = *cr + + ncr.Items = slices.Clone(cr.Items) + ncr.Meta = maps.Clone(cr.Meta) + ncr.Constraints = helper.CopySlice(cr.Constraints) + return ncr +} + +func (cr *CustomResource) Validate() error { + // TODO(tgross): what do we need to do to validate these? + return nil +} + +func (cr *CustomResource) Equal(or *CustomResource) bool { + if cr.Name != or.Name || + cr.Quantity != or.Quantity || + cr.Type != or.Type || + cr.Scope != or.Scope || + cr.Range != or.Range || + len(cr.Items) != len(or.Items) || + len(cr.Constraints) != len(or.Constraints) || + !slices.Equal(cr.Items, or.Items) || + !maps.Equal(cr.Meta, or.Meta) || + !slices.EqualFunc(cr.Constraints, or.Constraints, func(l, r *Constraint) bool { + return l.Equal(r) + }) { + return false + } + + return true +} + +var ( + ErrInvalidCustomResourceComparison = errors.New("custom resources could not be compared") + ErrCustomResourceExhausted = errors.New("custom resources exhausted") + ErrSubtractFromNothing = errors.New("custom resource request cannot be subtracted from non-existant base") +) + +func (cr *CustomResource) Superset(other *CustomResource) (bool, string) { + + if cr == nil || other == nil { + return false, fmt.Sprintf("custom resource: %s", cr.Name) + } + err := cr.compatible(other) + if err != nil { + return false, err.Error() // not ideal, but this matches the other Comparable APIs + } + + switch cr.Type { + case CustomResourceTypeRatio: + // fallthrough: ratios always fit? + + case CustomResourceTypeCappedRatio, CustomResourceTypeCountable: + if cr.Quantity-other.Quantity < 0 { + return false, fmt.Sprintf("custom resource: %s", cr.Name) + } + + case CustomResourceTypeDynamicInstance, CustomResourceTypeStaticInstance: + items := set.From(cr.Items) + if !items.ContainsSlice(other.Items) { + return false, fmt.Sprintf("custom resource: %s", cr.Name) + } + } + + return true, "" +} + +// Add mutates the CustomResource by the delta +func (cr *CustomResource) Add(delta *CustomResource) error { + if cr == nil || delta == nil { + return nil + } + err := cr.compatible(delta) + if err != nil { + return err + } + + switch cr.Type { + case CustomResourceTypeRatio: + return nil // ratios don't sum up + + case CustomResourceTypeCappedRatio, CustomResourceTypeCountable: + cr.Quantity += delta.Quantity + + case CustomResourceTypeDynamicInstance, CustomResourceTypeStaticInstance: + items := set.From(cr.Items) + cr.Items = items.Union(set.From(delta.Items)).Slice() + if len(cr.Items) > 1 { + slices.SortFunc(cr.Items, func(a, b any) int { + switch a.(type) { + case string: + if _, ok := b.(string); !ok { + return 0 + } + return strings.Compare(a.(string), b.(string)) + case int: + if _, ok := b.(int); !ok { + return 0 + } + return cmp.Compare(a.(int), b.(int)) + } + return 0 + }) + } + } + + return nil +} + +// Subtract mutates the CustomResource by the delta +func (cr *CustomResource) Subtract(delta *CustomResource) error { + if cr == nil || delta == nil { + return nil + } + err := cr.compatible(delta) + if err != nil { + return err + } + + switch cr.Type { + case CustomResourceTypeRatio: + return nil // ratios don't sum up + + case CustomResourceTypeCappedRatio, CustomResourceTypeCountable: + quantity := cr.Quantity - delta.Quantity + cr.Quantity = max(quantity, 0) + + case CustomResourceTypeDynamicInstance, CustomResourceTypeStaticInstance: + items := set.From(cr.Items) + items.RemoveSet(set.From(delta.Items)) + cr.Items = items.Slice() + } + + return nil +} + +func (cr *CustomResource) compatible(delta *CustomResource) error { + // TODO(tgross): these are programmer errors, I think? + if cr.Name != delta.Name { + return fmt.Errorf("%w: resource names %q and %q mismatch", + ErrInvalidCustomResourceComparison, cr.Name, delta.Name) + } + if cr.Version < delta.Version && delta.Version > 0 { + // requests for version 0 (default) can be considered compatible with + // any instance of the resource + return fmt.Errorf("%w: resource request %d for %q is newer than available version %d", + ErrInvalidCustomResourceComparison, delta.Version, delta.Name, cr.Version) + } + if cr.Type != delta.Type { + return fmt.Errorf("%w: resource types %q and %q for %q are not the same", + ErrInvalidCustomResourceComparison, cr.Type, delta.Type, delta.Name) + } + if cr.Scope != delta.Scope { + return fmt.Errorf("%w: resource scopes %q and %q for %q are not the same", + ErrInvalidCustomResourceComparison, cr.Scope, delta.Scope, delta.Name) + } + + return nil +} + +// CustomResources is a convenience wrapper around a slice of CustomResources +type CustomResources []*CustomResource + +func (cr CustomResources) Copy() CustomResources { + return helper.CopySlice(cr) +} + +func (cr *CustomResources) Select(available CustomResources) error { +NEXT: + for _, r := range *cr { + for _, base := range available { + if base.Name == r.Name && base.Version == r.Version { + if r.Type != CustomResourceTypeDynamicInstance { + if ok, exhausted := base.Superset(r); !ok { + return fmt.Errorf("%w: %s", ErrCustomResourceExhausted, exhausted) + } + } else { + // for dynamic instances, we need to select items for the + // quantity and mutate the Items of this CustomResource with + // the selected items + if r.Quantity > int64(len(base.Items)) { + return fmt.Errorf("%w: %s", ErrCustomResourceExhausted, r.Name) + } + + // selecting randomly by shuffling and slicing can be + // expensive, so this tries random picks similar to how we + // do port selection if we know the ratio of quantity to + // items is sparse. these values are selected by hunch, so + // this is a place for fine-tuning for sure + if len(base.Items) > 64 && r.Quantity < 8 { + r.Items = make([]any, 0, r.Quantity) + for int64(len(r.Items)) < r.Quantity { + item := base.Items[rand.Intn(len(base.Items))] + if !slices.Contains(r.Items, item) { + r.Items = append(r.Items, item) + } + // TODO(tgross): should we cap attempts? + } + } else { + baseItems := slices.Clone(base.Items) + rand.Shuffle(len(baseItems), func(i, j int) { + baseItems[i], baseItems[j] = baseItems[j], baseItems[i] + }) + items := base.Items[:r.Quantity] + r.Items = items + } + } + + continue NEXT + } + } + } + + return nil +} + +func (cr CustomResources) CopySharedOnly() CustomResources { + out := make([]*CustomResource, 0, len(cr)) + for _, r := range cr { + if r.Scope == CustomResourceScopeGroup { + out = append(out, r) + } + } + + return out +} + +func (cr CustomResources) CopyTaskOnly() CustomResources { + out := make([]*CustomResource, 0, len(cr)) + for _, r := range cr { + if r.Scope == CustomResourceScopeTask || r.Scope == "" { + out = append(out, r) + } + } + + return out +} + +// Merge combines other custom resources, overwriting the resources with +// matching names and versions +func (cr *CustomResources) Merge(other CustomResources) { + out := *cr + +NEXT: + for _, ocr := range other { + for i, base := range *cr { + if ocr.Name == base.Name && ocr.Version == base.Version { + out[i] = ocr + continue NEXT + } + } + out = append(out, ocr) + } + + *cr = out +} + +func (cr *CustomResources) Add(delta *CustomResources) error { + if cr == nil || delta == nil { + return nil + } + + // it's possible to get an error after we've mutated one *CR in the slice, + // so copy to make the function succeed entirely or not at all + out := cr.Copy() + seen := []int{} +NEXT: + for _, r := range out { + for i, o := range *delta { + if r.Name == o.Name { + seen = append(seen, i) + err := r.Add(o) + if err != nil { + return err + } + continue NEXT + } + } + } + for i, o := range *delta { + if !slices.Contains(seen, i) { + out = append(out, o) + } + } + *cr = out + return nil +} + +func (cr CustomResources) Superset(other CustomResources) (bool, string) { + if other == nil { + return true, "" + } + if cr == nil { + return false, ErrInvalidCustomResourceComparison.Error() + } + for _, ocr := range other { + for _, base := range cr { + if ocr.Name != base.Name { + continue + } + if ok, name := base.Superset(ocr); !ok { + return false, name + } + } + } + + return true, "" +} + +func (cr *CustomResources) Subtract(delta *CustomResources) error { + if cr == nil || delta == nil { + return nil + } + + // it's possible to get an error after we've mutated one *CR in the slice, + // so copy to make the function succeed entirely or not at all + out := cr.Copy() + seen := []int{} +NEXT: + for _, r := range out { + for i, o := range *delta { + if r.Name == o.Name { + seen = append(seen, i) + err := r.Subtract(o) + if err != nil { + return err + } + continue NEXT + } + } + } + for i := range *delta { + if !slices.Contains(seen, i) { + return ErrSubtractFromNothing + } + } + *cr = out + return nil +} diff --git a/nomad/structs/resources_test.go b/nomad/structs/resources_test.go new file mode 100644 index 00000000000..d7c796a0a7d --- /dev/null +++ b/nomad/structs/resources_test.go @@ -0,0 +1,323 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package structs + +import ( + "testing" + + "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test" + "github.com/shoenig/test/must" +) + +func TestCustomResourcesMath(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + name string + have *CustomResource + delta *CustomResource + expect *CustomResource + method func(*CustomResource, *CustomResource) error + expectErr string + }{ + { + name: "incompatible", + have: &CustomResource{Name: "foo"}, + delta: &CustomResource{Name: "bar"}, + method: (*CustomResource).Add, + expectErr: `custom resources could not be compared: resource names "foo" and "bar" mismatch`, + }, + { + name: "countable add", + have: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 100_000, + }, + delta: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 10_000, + }, + method: (*CustomResource).Add, + expect: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 110_000, + }, + }, + { + name: "countable subtract floor", + have: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 10_000, + }, + delta: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 20_000, + }, + method: (*CustomResource).Subtract, + expect: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 0, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + got := tc.have.Copy() + delta := tc.delta.Copy() + err := tc.method(got, delta) + + if tc.expectErr == "" { + must.NoError(t, err) + must.Eq(t, tc.expect, got) + } else { + must.EqError(t, err, tc.expectErr) + must.Eq(t, got, tc.have, must.Sprint("expected unchanged on error")) + } + must.Eq(t, tc.delta, delta, must.Sprint("expected delta to be unchanged")) + }) + } + +} + +func TestCustomResourcesSuperset(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + name string + have *CustomResource + want *CustomResource + expectOk bool + expectMsg string + }{ + { + name: "incompatible", + have: &CustomResource{Name: "foo"}, + want: &CustomResource{Name: "bar"}, + expectMsg: `custom resources could not be compared: resource names "foo" and "bar" mismatch`, + }, + { + name: "countable ok", + have: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 100_000, + }, + want: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 10_000, + }, + expectOk: true, + }, + { + name: "countable exhausted", + have: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 100_000, + }, + want: &CustomResource{ + Name: "disk", + Type: CustomResourceTypeCountable, + Quantity: 200_000, + }, + expectMsg: "custom resource: disk", + }, + { + name: "dynamic ok", + have: &CustomResource{ + Name: "ports", + Type: CustomResourceTypeDynamicInstance, + Items: []any{8001, 8002, 8003}, + }, + want: &CustomResource{ + Name: "ports", + Type: CustomResourceTypeDynamicInstance, + Items: []any{8002}, + }, + expectOk: true, + }, + { + name: "dynamic exhausted", + have: &CustomResource{ + Name: "ports", + Type: CustomResourceTypeDynamicInstance, + Items: []any{8001, 8002, 8003}, + }, + want: &CustomResource{ + Name: "ports", + Type: CustomResourceTypeDynamicInstance, + Items: []any{8006}, + }, + expectMsg: "custom resource: ports", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ok, msg := tc.have.Superset(tc.want) + test.Eq(t, tc.expectOk, ok) + test.Eq(t, tc.expectMsg, msg) + }) + } + +} + +func TestCustomResourcesMerge(t *testing.T) { + ci.Parallel(t) + + base := CustomResources{ + { // should be updated + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Range: "1-3", + Items: []any{1, 2, 3}, + }, + { // different version, should be ignored + Name: "bar_countable", + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 10_000, + }, + { // should be ignored + Name: "quuz_ratio", + Type: CustomResourceTypeRatio, + Scope: CustomResourceScopeTask, + Quantity: 100, + }, + } + + other := CustomResources{ + { // should update + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Range: "2-4,7-8", + Items: []any{2, 3, 4, 7, 8}, + }, + { // different version, should be added + Name: "bar_countable", + Version: 2, + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 20_000, + }, + { // new resource, should be added + Name: "baz_static", + Type: CustomResourceTypeStaticInstance, + Scope: CustomResourceScopeGroup, + Items: []any{10, 20, 30}, + }, + } + + got := base.Copy() + got.Merge(other) + must.Eq(t, + CustomResources{ + { + Name: "foo_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Range: "2-4,7-8", + Items: []any{2, 3, 4, 7, 8}, + }, + { + Name: "bar_countable", + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 10_000, + }, + { + Name: "quuz_ratio", + Type: CustomResourceTypeRatio, + Scope: CustomResourceScopeTask, + Quantity: 100, + }, + { + Name: "bar_countable", + Version: 2, + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 20_000, + }, + { + Name: "baz_static", + Type: CustomResourceTypeStaticInstance, + Scope: CustomResourceScopeGroup, + Items: []any{10, 20, 30}, + }, + }, + got) + +} + +func TestCustomResources_Select(t *testing.T) { + + ci.Parallel(t) + + available := CustomResources{ + { + Name: "foo_countable", + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 10_000, + }, + { + Name: "bar_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Range: "1-10", + Items: []any{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + }, + } + + request := CustomResources{ + { + Name: "foo_countable", + Type: CustomResourceTypeCountable, + Scope: CustomResourceScopeGroup, + Quantity: 0, + }, + { + Name: "bar_dynamic", + Version: 1, + Type: CustomResourceTypeDynamicInstance, + Scope: CustomResourceScopeTask, + Quantity: 0, + }, + } + + request[0].Quantity = 10 + request[1].Quantity = 20 + err := request.Select(available) + must.EqError(t, err, "custom resources exhausted: bar_dynamic") + + request[0].Quantity = 10 + request[1].Quantity = 4 + err = request.Select(available) + must.NoError(t, err) + must.Len(t, 4, request[1].Items) + + available[1].Items = []any{} + for i := range 100 { + available[1].Items = append(available[1].Items, i) + } + err = request.Select(available) + must.NoError(t, err) + must.Len(t, 4, request[1].Items) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 615ea398d55..89206d5b8cc 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2386,6 +2386,7 @@ type Resources struct { Devices ResourceDevices NUMA *NUMA SecretsMB int + Custom CustomResources `hcl:"custom,block"` } const ( @@ -2480,6 +2481,13 @@ func (r *Resources) Validate() error { mErr.Errors = append(mErr.Errors, fmt.Errorf("SecretsMB value (%d) cannot be negative", r.SecretsMB)) } + for _, resource := range r.Custom { + err := resource.Validate() + if err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + return mErr.ErrorOrNil() } @@ -2510,6 +2518,10 @@ func (r *Resources) Merge(other *Resources) { if other.SecretsMB != 0 { r.SecretsMB = other.SecretsMB } + if len(other.Custom) != 0 { + // TODO(tgross): this looks wrong but matches devices behavior + r.Custom = other.Custom + } } // Equal Resources. @@ -2530,7 +2542,11 @@ func (r *Resources) Equal(o *Resources) bool { r.IOPS == o.IOPS && r.Networks.Equal(&o.Networks) && r.Devices.Equal(&o.Devices) && - r.SecretsMB == o.SecretsMB + r.SecretsMB == o.SecretsMB && + slices.EqualFunc(r.Custom, o.Custom, + func(l, r *CustomResource) bool { + return l.Equal(r) + }) } // ResourceDevices are part of Resources. @@ -2619,6 +2635,7 @@ func (r *Resources) Copy() *Resources { if r == nil { return nil } + return &Resources{ CPU: r.CPU, Cores: r.Cores, @@ -2630,6 +2647,7 @@ func (r *Resources) Copy() *Resources { Devices: r.Devices.Copy(), NUMA: r.NUMA.Copy(), SecretsMB: r.SecretsMB, + Custom: helper.CopySlice(r.Custom), } } @@ -3188,6 +3206,12 @@ type NodeResources struct { // to select dynamic ports from across all networks. MinDynamicPort int MaxDynamicPort int + + // Custom defines any custom resources this node advertises + // + // TODO(tgross): we may want to split this out to a CustomNodeResources if + // we want more ability to define a schema for the resource request? + Custom CustomResources } func (n *NodeResources) Copy() *NodeResources { @@ -3216,6 +3240,8 @@ func (n *NodeResources) Copy() *NodeResources { } } + newN.Custom = n.Custom.Copy() + // COMPAT remove in 1.10+ // apply compatibility fixups covering node topology newN.Compatibility() @@ -3245,9 +3271,11 @@ func (n *NodeResources) Comparable() *ComparableResources { MemoryMB: n.Memory.MemoryMB, }, Networks: n.Networks, + Custom: n.Custom.CopyTaskOnly(), }, Shared: AllocatedSharedResources{ DiskMB: n.Disk.DiskMB, + Custom: n.Custom.CopySharedOnly(), }, } return c @@ -3280,6 +3308,8 @@ func (n *NodeResources) Merge(o *NodeResources) { } } + n.Custom.Merge(o.Custom) + // COMPAT remove in 1.10+ // apply compatibility fixups covering node topology n.Compatibility() @@ -3887,6 +3917,7 @@ type AllocatedTaskResources struct { Memory AllocatedMemoryResources Networks Networks Devices []*AllocatedDeviceResource + Custom CustomResources } func (a *AllocatedTaskResources) Copy() *AllocatedTaskResources { @@ -3908,6 +3939,8 @@ func (a *AllocatedTaskResources) Copy() *AllocatedTaskResources { } } + newA.Custom = a.Custom.Copy() + return newA } @@ -3943,6 +3976,8 @@ func (a *AllocatedTaskResources) Add(delta *AllocatedTaskResources) { a.Devices[idx].Add(d) } } + + _ = a.Custom.Add(&delta.Custom) } func (a *AllocatedTaskResources) Max(other *AllocatedTaskResources) { @@ -3972,6 +4007,9 @@ func (a *AllocatedTaskResources) Max(other *AllocatedTaskResources) { a.Devices[idx].Add(d) } } + + // TODO(tgross): what does this even mean here? + // a.CustomResources.Max(delta.CustomResources) } // Comparable turns AllocatedTaskResources into ComparableResources @@ -4002,6 +4040,7 @@ func (a *AllocatedTaskResources) Subtract(delta *AllocatedTaskResources) { a.Cpu.Subtract(&delta.Cpu) a.Memory.Subtract(&delta.Memory) + _ = a.Custom.Subtract(&delta.Custom) } // AllocatedSharedResources are the set of resources allocated to a task group. @@ -4009,6 +4048,7 @@ type AllocatedSharedResources struct { Networks Networks DiskMB int64 Ports AllocatedPorts + Custom CustomResources } func (a AllocatedSharedResources) Copy() AllocatedSharedResources { @@ -4016,6 +4056,7 @@ func (a AllocatedSharedResources) Copy() AllocatedSharedResources { Networks: a.Networks.Copy(), DiskMB: a.DiskMB, Ports: a.Ports, + Custom: a.Custom.Copy(), } } @@ -4025,7 +4066,7 @@ func (a *AllocatedSharedResources) Add(delta *AllocatedSharedResources) { } a.Networks = append(a.Networks, delta.Networks...) a.DiskMB += delta.DiskMB - + _ = a.Custom.Add(&delta.Custom) } func (a *AllocatedSharedResources) Subtract(delta *AllocatedSharedResources) { @@ -4045,6 +4086,7 @@ func (a *AllocatedSharedResources) Subtract(delta *AllocatedSharedResources) { } a.Networks = nets a.DiskMB -= delta.DiskMB + _ = a.Custom.Subtract(&delta.Custom) } func (a *AllocatedSharedResources) Canonicalize() { @@ -4276,6 +4318,16 @@ func (c *ComparableResources) Superset(other *ComparableResources) (bool, string if c.Shared.DiskMB < other.Shared.DiskMB { return false, "disk" } + + ok, exhaustedResource := c.Flattened.Custom.Superset(other.Flattened.Custom) + if !ok { + return false, exhaustedResource + } + ok, exhaustedResource = c.Shared.Custom.Superset(other.Shared.Custom) + if !ok { + return false, exhaustedResource + } + return true, "" } diff --git a/scheduler/feasible/custom_resources.go b/scheduler/feasible/custom_resources.go new file mode 100644 index 00000000000..c611c308837 --- /dev/null +++ b/scheduler/feasible/custom_resources.go @@ -0,0 +1,26 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package feasible + +import "github.com/hashicorp/nomad/nomad/structs" + +type customResourceChecker struct { + ask *structs.CustomResources + proposed *structs.CustomResources + available *structs.CustomResources +} + +func (crc *customResourceChecker) addProposed(proposed []*structs.Allocation) { + for _, alloc := range proposed { + for _, task := range alloc.AllocatedResources.Tasks { + proposedResources := []*structs.CustomResource(*crc.proposed) + proposedResources = append(proposedResources, task.Custom...) + } + } +} + +func (crc *customResourceChecker) Select(ask *structs.CustomResources) error { + crc.available.Subtract(crc.proposed) + return ask.Select(*crc.available) +} diff --git a/scheduler/feasible/rank.go b/scheduler/feasible/rank.go index e2c94161961..49720ab8ad2 100644 --- a/scheduler/feasible/rank.go +++ b/scheduler/feasible/rank.go @@ -8,6 +8,7 @@ import ( "math" "slices" + "github.com/davecgh/go-spew/spew" "github.com/hashicorp/go-set/v3" "github.com/hashicorp/nomad/client/lib/idset" "github.com/hashicorp/nomad/client/lib/numalib/hw" @@ -258,6 +259,12 @@ NEXTNODE: devAllocator := newDeviceAllocator(iter.ctx, option.Node) devAllocator.AddAllocs(proposed) + customAllocator := customResourceChecker{ + available: &option.Node.NodeResources.Custom, + } + + customAllocator.addProposed(proposed) + // Track the affinities of the devices totalDeviceAffinityWeight := 0.0 sumMatchingAffinities := 0.0 @@ -379,7 +386,21 @@ NEXTNODE: MemoryMB: safemath.Add( int64(task.Resources.MemoryMB), int64(task.Resources.SecretsMB)), }, + // TODO(tgross): how do we populate the AllocatedSharedResources here? + Custom: task.Resources.Custom.CopyTaskOnly(), + } + + if len(taskResources.Custom) > 0 { + // TODO(tgross): probably need to check option.AllocResources for group scope too? + spew.Dump(option) + err := customAllocator.Select(&taskResources.Custom) + + if err != nil { + iter.ctx.Metrics().ExhaustedNode(option.Node, err.Error()) + continue NEXTNODE + } } + if iter.memoryOversubscription { taskResources.Memory.MemoryMaxMB = safemath.Add( int64(task.Resources.MemoryMaxMB), int64(task.Resources.SecretsMB))