Skip to content

Commit 3af1432

Browse files
committed
feat(slurm) add support for dynamic nodes
Signed-off-by: Dmitry Shmulevich <dshmulevich@nvidia.com>
1 parent 2d0e14e commit 3af1432

File tree

6 files changed

+198
-24
lines changed

6 files changed

+198
-24
lines changed

pkg/engines/slurm/slurm.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,19 @@ type BaseParams struct {
5555
BlockSizes string `mapstructure:"block_sizes"`
5656
FakeNodesEnabled bool `mapstructure:"fakeNodesEnabled"`
5757
FakeNodePool string `mapstructure:"fake_node_pool"`
58+
DynamicNodes []string `mapstructure:"dynamicNodes"`
59+
MinBlocks int `mapstructure:"minBlocks"`
5860
Topologies map[string]*Topology `mapstructure:"topologies,omitempty"`
5961
}
6062

6163
type Topology struct {
62-
Partition string `mapstructure:"partition"`
63-
Plugin string `mapstructure:"plugin"`
64-
BlockSizes []int `mapstructure:"blockSizes"`
65-
Nodes []string `mapstructure:"nodes"`
66-
Default bool `mapstructure:"clusterDefault"`
64+
Partition string `mapstructure:"partition"`
65+
Plugin string `mapstructure:"plugin"`
66+
BlockSizes []int `mapstructure:"blockSizes"`
67+
DynamicNodes []string `mapstructure:"dynamicNodes"`
68+
MinBlocks int `mapstructure:"minBlocks"`
69+
Nodes []string `mapstructure:"nodes"`
70+
Default bool `mapstructure:"clusterDefault"`
6771
}
6872

6973
type Params struct {
@@ -306,8 +310,10 @@ func GenerateOutputParams(ctx context.Context, root *topology.Vertex, params *Pa
306310

307311
func GetTranslateConfig(ctx context.Context, params *BaseParams, f *TopologyNodeFinder) (*translate.Config, error) {
308312
cfg := &translate.Config{
309-
Plugin: params.Plugin,
310-
BlockSizes: getBlockSizes(params.BlockSizes),
313+
Plugin: params.Plugin,
314+
BlockSizes: getBlockSizes(params.BlockSizes),
315+
DynamicNodes: params.DynamicNodes,
316+
MinBlocks: params.MinBlocks,
311317
}
312318

313319
// set fake nodes
@@ -332,6 +338,8 @@ func GetTranslateConfig(ctx context.Context, params *BaseParams, f *TopologyNode
332338
spec := &translate.TopologySpec{
333339
Plugin: sect.Plugin,
334340
BlockSizes: sect.BlockSizes,
341+
DynamicNodes: sect.DynamicNodes,
342+
MinBlocks: sect.MinBlocks,
335343
ClusterDefault: sect.Default,
336344
}
337345
klog.InfoS("Adding partition topology", "name", topo, "plugin", sect.Plugin, "default", sect.Default, "partition", sect.Partition)

pkg/engines/slurm/slurm_test.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -291,12 +291,17 @@ func TestGetTranslateConfig(t *testing.T) {
291291
{
292292
name: "Case 3: valid blocksize",
293293
params: &BaseParams{
294-
Plugin: topology.TopologyBlock,
295-
BlockSizes: "2,4,8",
294+
Plugin: topology.TopologyBlock,
295+
BlockSizes: "2,4,8",
296+
DynamicNodes: []string{"n[01-03]", "n[05,07-08]"},
297+
MinBlocks: 50,
296298
},
297299
cfg: &translate.Config{
298-
Plugin: topology.TopologyBlock,
299-
BlockSizes: []int{2, 4, 8},
300+
Plugin: topology.TopologyBlock,
301+
BlockSizes: []int{2, 4, 8},
302+
DynamicNodes: []string{"n[01-03]", "n[05,07-08]"},
303+
//DynamicNodes: map[string]bool{"n01": true, "n02": true, "n03": true, "n05": true, "n07": true, "n08": true},
304+
MinBlocks: 50,
300305
},
301306
},
302307
{
@@ -337,8 +342,9 @@ func TestGetTranslateConfig(t *testing.T) {
337342
Default: true,
338343
},
339344
"topo": {
340-
Plugin: topology.TopologyBlock,
341-
Nodes: []string{"node[001-100]"},
345+
Plugin: topology.TopologyBlock,
346+
DynamicNodes: []string{"n[01-03]"},
347+
Nodes: []string{"node[001-100]"},
342348
},
343349
},
344350
},
@@ -349,8 +355,10 @@ func TestGetTranslateConfig(t *testing.T) {
349355
ClusterDefault: true,
350356
},
351357
"topo": {
352-
Plugin: topology.TopologyBlock,
353-
Nodes: []string{"node[001-100]"},
358+
Plugin: topology.TopologyBlock,
359+
DynamicNodes: []string{"n[01-03]"},
360+
//DynamicNodes: map[string]bool{"n01": true, "n02": true, "n03": true},
361+
Nodes: []string{"node[001-100]"},
354362
},
355363
},
356364
},

pkg/translate/block.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414

1515
"k8s.io/klog/v2"
1616

17-
"github.com/NVIDIA/topograph/internal/cluset"
1817
"github.com/NVIDIA/topograph/internal/httperr"
1918
"github.com/NVIDIA/topograph/pkg/metrics"
2019
)
@@ -98,22 +97,28 @@ func (nt *NetworkTopology) toBlockTopology(wr io.Writer) *httperr.Error {
9897
fnc.baseBlockSize = finalBlockSizes[0]
9998
}
10099

100+
dynamicNodeMap := nodes2map(nt.config.DynamicNodes)
101101
for _, bInfo := range nt.blocks {
102102
var comment string
103103
if len(bInfo.name) != 0 {
104104
comment = fmt.Sprintf("# %s=%s\n", bInfo.id, bInfo.name)
105105
}
106106

107-
outputNodeNames := strings.Join(cluset.Compact(bInfo.nodes), ",")
107+
static, dynamic := splitNodes(bInfo.nodes, dynamicNodeMap)
108108
if fnc != nil && len(bInfo.nodes) < fnc.baseBlockSize {
109109
fakeNodeNames, err := fnc.getFreeFakeNodes(fnc.baseBlockSize - len(bInfo.nodes))
110110
if err != nil {
111111
return httperr.NewError(http.StatusBadGateway, err.Error())
112112
}
113-
outputNodeNames = fmt.Sprintf("%s,%s", outputNodeNames, fakeNodeNames)
113+
static = fmt.Sprintf("%s,%s", static, fakeNodeNames)
114114
}
115115

116-
if _, err := fmt.Fprintf(wr, "%sBlockName=%s Nodes=%s\n", comment, bInfo.id, outputNodeNames); err != nil {
116+
// append the block line with the names of dynamic nodes, if present
117+
var suffix string
118+
if len(dynamic) != 0 {
119+
suffix = fmt.Sprintf(" # dynamic=%s", dynamic)
120+
}
121+
if _, err := fmt.Fprintf(wr, "%sBlockName=%s Nodes=%s%s\n", comment, bInfo.id, static, suffix); err != nil {
117122
return httperr.NewError(http.StatusInternalServerError, err.Error())
118123
}
119124
}

pkg/translate/block_test.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,125 @@
66
package translate
77

88
import (
9+
"bytes"
910
"fmt"
11+
"strings"
1012
"testing"
1113

1214
"github.com/stretchr/testify/require"
1315
)
1416

17+
func TestBlockTopology(t *testing.T) {
18+
testCases := []struct {
19+
name string
20+
nt *NetworkTopology
21+
output string
22+
err string
23+
}{
24+
{
25+
name: "single block without name or dynamic nodes",
26+
nt: &NetworkTopology{
27+
config: &Config{
28+
BlockSizes: []int{2},
29+
},
30+
blocks: []*blockInfo{
31+
{
32+
id: "b1",
33+
nodes: []string{"n1", "n2"},
34+
},
35+
},
36+
},
37+
output: strings.Join([]string{
38+
"BlockName=b1 Nodes=n[1-2]",
39+
"BlockSizes=2",
40+
"",
41+
}, "\n"),
42+
},
43+
{
44+
name: "block with name and dynamic nodes",
45+
nt: &NetworkTopology{
46+
config: &Config{
47+
BlockSizes: []int{2},
48+
DynamicNodes: []string{"n2"},
49+
},
50+
blocks: []*blockInfo{
51+
{
52+
id: "b1",
53+
name: "primary",
54+
nodes: []string{"n1", "n2"},
55+
},
56+
},
57+
},
58+
output: strings.Join([]string{
59+
"# b1=primary",
60+
"BlockName=b1 Nodes=n1 # dynamic=n2",
61+
"BlockSizes=2",
62+
"",
63+
}, "\n"),
64+
},
65+
{
66+
name: "fake nodes added to meet base block size",
67+
nt: &NetworkTopology{
68+
config: &Config{
69+
BlockSizes: []int{3},
70+
FakeNodePool: "fake-[1-3]",
71+
},
72+
blocks: []*blockInfo{
73+
{
74+
id: "b1",
75+
nodes: []string{"n1"},
76+
},
77+
},
78+
},
79+
output: strings.Join([]string{
80+
"BlockName=b1 Nodes=n1,fake-[1-2]",
81+
"BlockSizes=3",
82+
"",
83+
}, "\n"),
84+
},
85+
{
86+
name: "multiple blocks with mixed settings",
87+
nt: &NetworkTopology{
88+
config: &Config{
89+
BlockSizes: []int{2, 4},
90+
DynamicNodes: []string{"n3"},
91+
},
92+
blocks: []*blockInfo{
93+
{
94+
id: "b1",
95+
nodes: []string{"n1", "n2"},
96+
},
97+
{
98+
id: "b2",
99+
name: "secondary",
100+
nodes: []string{"n3"},
101+
},
102+
},
103+
},
104+
output: strings.Join([]string{
105+
"BlockName=b1 Nodes=n[1-2]",
106+
"# b2=secondary",
107+
"BlockName=b2 Nodes= # dynamic=n3",
108+
"BlockSizes=1,2",
109+
"",
110+
}, "\n"),
111+
},
112+
}
113+
114+
for _, tc := range testCases {
115+
t.Run(tc.name, func(t *testing.T) {
116+
var buf bytes.Buffer
117+
err := tc.nt.toBlockTopology(&buf)
118+
if len(tc.err) != 0 {
119+
require.EqualError(t, err, tc.err)
120+
} else {
121+
require.Nil(t, err)
122+
require.Equal(t, tc.output, buf.String())
123+
}
124+
})
125+
}
126+
}
127+
15128
func TestGetBlockSize(t *testing.T) {
16129
testCases := []struct {
17130
name string

pkg/translate/topology.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ import (
99
"fmt"
1010
"io"
1111
"sort"
12+
"strings"
1213

1314
"github.com/agrea/ptr"
1415
"k8s.io/klog/v2"
1516

17+
"github.com/NVIDIA/topograph/internal/cluset"
1618
"github.com/NVIDIA/topograph/internal/httperr"
1719
"github.com/NVIDIA/topograph/pkg/topology"
1820
)
@@ -21,13 +23,17 @@ type Config struct {
2123
Plugin string // topology plugin (cluster-wide)
2224
BlockSizes []int
2325
FakeNodePool string
26+
DynamicNodes []string
27+
MinBlocks int
2428
Topologies map[string]*TopologySpec // per-partiton topology settings
2529
}
2630

2731
// TopologySpec define topology for a partition
2832
type TopologySpec struct {
2933
Plugin string
3034
BlockSizes []int
35+
DynamicNodes []string
36+
MinBlocks int
3137
ClusterDefault bool
3238
Nodes []string
3339
}
@@ -255,3 +261,33 @@ func (nt *NetworkTopology) Generate(wr io.Writer) *httperr.Error {
255261
return nt.toTreeTopology(wr)
256262
}
257263
}
264+
265+
func nodes2map(nodes []string) map[string]bool {
266+
if len(nodes) == 0 {
267+
return nil
268+
}
269+
270+
res := make(map[string]bool)
271+
for _, node := range cluset.Expand(nodes) {
272+
res[node] = true
273+
}
274+
275+
return res
276+
}
277+
278+
func splitNodes(nodes []string, dynamicNodeMap map[string]bool) (string, string) {
279+
if len(dynamicNodeMap) == 0 {
280+
return strings.Join(cluset.Compact(nodes), ","), ""
281+
}
282+
283+
var static, dynamic []string
284+
for _, node := range nodes {
285+
if dynamicNodeMap[node] {
286+
dynamic = append(dynamic, node)
287+
} else {
288+
static = append(static, node)
289+
}
290+
}
291+
292+
return strings.Join(cluset.Compact(static), ","), strings.Join(cluset.Compact(dynamic), ",")
293+
}

pkg/translate/yaml.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ type BlockTopo struct {
4343
}
4444

4545
type Block struct {
46-
Name string `yaml:"block"`
47-
Nodes string `yaml:"nodes"`
46+
Name string `yaml:"block"`
47+
Nodes string `yaml:"nodes"`
48+
Dynamic string `yaml:"dynamic,omitempty"`
4849
}
4950

5051
// toYamlTopology generates SLURM cluster topology config in YAML format
@@ -98,8 +99,9 @@ func (nt *NetworkTopology) toYamlTopology(wr io.Writer) *httperr.Error {
9899

99100
func (nt *NetworkTopology) getBlockTopologyUnit(topoName string, topoSpec *TopologySpec) *TopologyUnit {
100101
// populate map [block indx : blockInfo]
101-
nodeNames := cluset.Expand(topoSpec.Nodes)
102+
dynamicNodesMap := nodes2map(topoSpec.DynamicNodes)
102103
blockMap := make(map[int]*blockInfo)
104+
nodeNames := cluset.Expand(topoSpec.Nodes)
103105
for _, nodeName := range nodeNames {
104106
info, ok := nt.nodeInfo[nodeName]
105107
if !ok {
@@ -134,9 +136,11 @@ func (nt *NetworkTopology) getBlockTopologyUnit(topoName string, topoSpec *Topol
134136
// populate block topology units ordered by block indices
135137
blocks := make([]*Block, 0, len(bInfos))
136138
for indx, bInfo := range bInfos {
139+
static, dynamic := splitNodes(bInfo.nodes, dynamicNodesMap)
137140
blocks = append(blocks, &Block{
138-
Name: fmt.Sprintf("block%d", indx),
139-
Nodes: strings.Join(cluset.Compact(bInfo.nodes), ","),
141+
Name: fmt.Sprintf("block%d", indx),
142+
Nodes: static,
143+
Dynamic: dynamic,
140144
})
141145
}
142146

0 commit comments

Comments
 (0)