Skip to content

Commit 7c96aa7

Browse files
committed
feat(slurm) add support for dynamic nodes
Signed-off-by: Dmitry Shmulevich <dshmulevich@nvidia.com>
1 parent 2d0e14e commit 7c96aa7

File tree

7 files changed

+235
-35
lines changed

7 files changed

+235
-35
lines changed

pkg/engines/slurm/slurm.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,19 @@ type BaseParams struct {
5555
BlockSizes string `mapstructure:"block_sizes"`
5656
FakeNodesEnabled bool `mapstructure:"fakeNodesEnabled"`
5757
FakeNodePool string `mapstructure:"fake_node_pool"`
58+
DynamicNodes []string `mapstructure:"dynamicNodes"`
59+
MinBlocks int `mapstructure:"minBlocks"`
5860
Topologies map[string]*Topology `mapstructure:"topologies,omitempty"`
5961
}
6062

6163
type Topology struct {
62-
Partition string `mapstructure:"partition"`
63-
Plugin string `mapstructure:"plugin"`
64-
BlockSizes []int `mapstructure:"blockSizes"`
65-
Nodes []string `mapstructure:"nodes"`
66-
Default bool `mapstructure:"clusterDefault"`
64+
Partition string `mapstructure:"partition"`
65+
Plugin string `mapstructure:"plugin"`
66+
BlockSizes []int `mapstructure:"blockSizes"`
67+
DynamicNodes []string `mapstructure:"dynamicNodes"`
68+
MinBlocks int `mapstructure:"minBlocks"`
69+
Nodes []string `mapstructure:"nodes"`
70+
Default bool `mapstructure:"clusterDefault"`
6771
}
6872

6973
type Params struct {
@@ -306,8 +310,10 @@ func GenerateOutputParams(ctx context.Context, root *topology.Vertex, params *Pa
306310

307311
func GetTranslateConfig(ctx context.Context, params *BaseParams, f *TopologyNodeFinder) (*translate.Config, error) {
308312
cfg := &translate.Config{
309-
Plugin: params.Plugin,
310-
BlockSizes: getBlockSizes(params.BlockSizes),
313+
Plugin: params.Plugin,
314+
BlockSizes: getBlockSizes(params.BlockSizes),
315+
DynamicNodes: params.DynamicNodes,
316+
MinBlocks: params.MinBlocks,
311317
}
312318

313319
// set fake nodes
@@ -332,6 +338,8 @@ func GetTranslateConfig(ctx context.Context, params *BaseParams, f *TopologyNode
332338
spec := &translate.TopologySpec{
333339
Plugin: sect.Plugin,
334340
BlockSizes: sect.BlockSizes,
341+
DynamicNodes: sect.DynamicNodes,
342+
MinBlocks: sect.MinBlocks,
335343
ClusterDefault: sect.Default,
336344
}
337345
klog.InfoS("Adding partition topology", "name", topo, "plugin", sect.Plugin, "default", sect.Default, "partition", sect.Partition)

pkg/engines/slurm/slurm_test.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -291,12 +291,16 @@ func TestGetTranslateConfig(t *testing.T) {
291291
{
292292
name: "Case 3: valid blocksize",
293293
params: &BaseParams{
294-
Plugin: topology.TopologyBlock,
295-
BlockSizes: "2,4,8",
294+
Plugin: topology.TopologyBlock,
295+
BlockSizes: "2,4,8",
296+
DynamicNodes: []string{"n[01-03]", "n[05,07-08]"},
297+
MinBlocks: 50,
296298
},
297299
cfg: &translate.Config{
298-
Plugin: topology.TopologyBlock,
299-
BlockSizes: []int{2, 4, 8},
300+
Plugin: topology.TopologyBlock,
301+
BlockSizes: []int{2, 4, 8},
302+
DynamicNodes: []string{"n[01-03]", "n[05,07-08]"},
303+
MinBlocks: 50,
300304
},
301305
},
302306
{
@@ -337,8 +341,9 @@ func TestGetTranslateConfig(t *testing.T) {
337341
Default: true,
338342
},
339343
"topo": {
340-
Plugin: topology.TopologyBlock,
341-
Nodes: []string{"node[001-100]"},
344+
Plugin: topology.TopologyBlock,
345+
DynamicNodes: []string{"n[01-03]"},
346+
Nodes: []string{"node[001-100]"},
342347
},
343348
},
344349
},
@@ -349,8 +354,9 @@ func TestGetTranslateConfig(t *testing.T) {
349354
ClusterDefault: true,
350355
},
351356
"topo": {
352-
Plugin: topology.TopologyBlock,
353-
Nodes: []string{"node[001-100]"},
357+
Plugin: topology.TopologyBlock,
358+
DynamicNodes: []string{"n[01-03]"},
359+
Nodes: []string{"node[001-100]"},
354360
},
355361
},
356362
},

pkg/translate/block.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414

1515
"k8s.io/klog/v2"
1616

17-
"github.com/NVIDIA/topograph/internal/cluset"
1817
"github.com/NVIDIA/topograph/internal/httperr"
1918
"github.com/NVIDIA/topograph/pkg/metrics"
2019
)
@@ -98,22 +97,36 @@ func (nt *NetworkTopology) toBlockTopology(wr io.Writer) *httperr.Error {
9897
fnc.baseBlockSize = finalBlockSizes[0]
9998
}
10099

100+
dynamicNodeMap := nodeList2map(nt.config.DynamicNodes)
101101
for _, bInfo := range nt.blocks {
102102
var comment string
103103
if len(bInfo.name) != 0 {
104104
comment = fmt.Sprintf("# %s=%s\n", bInfo.id, bInfo.name)
105105
}
106106

107-
outputNodeNames := strings.Join(cluset.Compact(bInfo.nodes), ",")
107+
static, dynamic := splitNodes(bInfo.nodes, dynamicNodeMap)
108108
if fnc != nil && len(bInfo.nodes) < fnc.baseBlockSize {
109109
fakeNodeNames, err := fnc.getFreeFakeNodes(fnc.baseBlockSize - len(bInfo.nodes))
110110
if err != nil {
111111
return httperr.NewError(http.StatusBadGateway, err.Error())
112112
}
113-
outputNodeNames = fmt.Sprintf("%s,%s", outputNodeNames, fakeNodeNames)
113+
static = fmt.Sprintf("%s,%s", static, fakeNodeNames)
114114
}
115115

116-
if _, err := fmt.Fprintf(wr, "%sBlockName=%s Nodes=%s\n", comment, bInfo.id, outputNodeNames); err != nil {
116+
// append the block line with the names of dynamic nodes, if present
117+
var suffix string
118+
if len(dynamic) != 0 {
119+
suffix = fmt.Sprintf(" # dynamic=%s", dynamic)
120+
}
121+
122+
if _, err := fmt.Fprintf(wr, "%sBlockName=%s Nodes=%s%s\n", comment, bInfo.id, static, suffix); err != nil {
123+
return httperr.NewError(http.StatusInternalServerError, err.Error())
124+
}
125+
}
126+
127+
// add empty blocks if needed
128+
for i := len(nt.blocks) + 1; i <= nt.config.MinBlocks; i++ {
129+
if _, err := fmt.Fprintf(wr, "BlockName=extraBlock%d Nodes=\n", i); err != nil {
117130
return httperr.NewError(http.StatusInternalServerError, err.Error())
118131
}
119132
}

pkg/translate/block_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,129 @@
66
package translate
77

88
import (
9+
"bytes"
910
"fmt"
11+
"strings"
1012
"testing"
1113

1214
"github.com/stretchr/testify/require"
1315
)
1416

17+
func TestBlockTopology(t *testing.T) {
18+
testCases := []struct {
19+
name string
20+
nt *NetworkTopology
21+
output string
22+
err string
23+
}{
24+
{
25+
name: "single block without name or dynamic nodes",
26+
nt: &NetworkTopology{
27+
config: &Config{
28+
BlockSizes: []int{2},
29+
},
30+
blocks: []*blockInfo{
31+
{
32+
id: "b1",
33+
nodes: []string{"n1", "n2"},
34+
},
35+
},
36+
},
37+
output: strings.Join([]string{
38+
"BlockName=b1 Nodes=n[1-2]",
39+
"BlockSizes=2",
40+
"",
41+
}, "\n"),
42+
},
43+
{
44+
name: "block with name and dynamic nodes",
45+
nt: &NetworkTopology{
46+
config: &Config{
47+
BlockSizes: []int{2},
48+
DynamicNodes: []string{"n2"},
49+
MinBlocks: 3,
50+
},
51+
blocks: []*blockInfo{
52+
{
53+
id: "b1",
54+
name: "block1",
55+
nodes: []string{"n1", "n2"},
56+
},
57+
},
58+
},
59+
output: `# b1=block1
60+
BlockName=b1 Nodes=n1 # dynamic=n2
61+
BlockName=extraBlock2 Nodes=
62+
BlockName=extraBlock3 Nodes=
63+
BlockSizes=2
64+
`,
65+
},
66+
{
67+
name: "fake nodes added to meet base block size",
68+
nt: &NetworkTopology{
69+
config: &Config{
70+
BlockSizes: []int{3},
71+
FakeNodePool: "fake[1-6]",
72+
MinBlocks: 3,
73+
},
74+
blocks: []*blockInfo{
75+
{
76+
id: "b1",
77+
nodes: []string{"n1"},
78+
},
79+
{
80+
id: "b2",
81+
nodes: []string{"n2"},
82+
},
83+
},
84+
},
85+
output: `BlockName=b1 Nodes=n1,fake[1-2]
86+
BlockName=b2 Nodes=n2,fake[3-4]
87+
BlockName=extraBlock3 Nodes=
88+
BlockSizes=3
89+
`,
90+
},
91+
{
92+
name: "multiple blocks with mixed settings",
93+
nt: &NetworkTopology{
94+
config: &Config{
95+
BlockSizes: []int{2, 4},
96+
DynamicNodes: []string{"n3"},
97+
},
98+
blocks: []*blockInfo{
99+
{
100+
id: "b1",
101+
nodes: []string{"n1", "n2"},
102+
},
103+
{
104+
id: "b2",
105+
name: "block2",
106+
nodes: []string{"n3"},
107+
},
108+
},
109+
},
110+
output: `BlockName=b1 Nodes=n[1-2]
111+
# b2=block2
112+
BlockName=b2 Nodes= # dynamic=n3
113+
BlockSizes=1,2
114+
`,
115+
},
116+
}
117+
118+
for _, tc := range testCases {
119+
t.Run(tc.name, func(t *testing.T) {
120+
var buf bytes.Buffer
121+
err := tc.nt.toBlockTopology(&buf)
122+
if len(tc.err) != 0 {
123+
require.EqualError(t, err, tc.err)
124+
} else {
125+
require.Nil(t, err)
126+
require.Equal(t, tc.output, buf.String())
127+
}
128+
})
129+
}
130+
}
131+
15132
func TestGetBlockSize(t *testing.T) {
16133
testCases := []struct {
17134
name string

pkg/translate/topology.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ import (
99
"fmt"
1010
"io"
1111
"sort"
12+
"strings"
1213

1314
"github.com/agrea/ptr"
1415
"k8s.io/klog/v2"
1516

17+
"github.com/NVIDIA/topograph/internal/cluset"
1618
"github.com/NVIDIA/topograph/internal/httperr"
1719
"github.com/NVIDIA/topograph/pkg/topology"
1820
)
@@ -21,13 +23,17 @@ type Config struct {
2123
Plugin string // topology plugin (cluster-wide)
2224
BlockSizes []int
2325
FakeNodePool string
26+
DynamicNodes []string
27+
MinBlocks int
2428
Topologies map[string]*TopologySpec // per-partiton topology settings
2529
}
2630

2731
// TopologySpec define topology for a partition
2832
type TopologySpec struct {
2933
Plugin string
3034
BlockSizes []int
35+
DynamicNodes []string
36+
MinBlocks int
3137
ClusterDefault bool
3238
Nodes []string
3339
}
@@ -255,3 +261,33 @@ func (nt *NetworkTopology) Generate(wr io.Writer) *httperr.Error {
255261
return nt.toTreeTopology(wr)
256262
}
257263
}
264+
265+
func nodeList2map(nodes []string) map[string]bool {
266+
if len(nodes) == 0 {
267+
return nil
268+
}
269+
270+
res := make(map[string]bool)
271+
for _, node := range cluset.Expand(nodes) {
272+
res[node] = true
273+
}
274+
275+
return res
276+
}
277+
278+
func splitNodes(nodes []string, dynamicNodeMap map[string]bool) (string, string) {
279+
if len(dynamicNodeMap) == 0 {
280+
return strings.Join(cluset.Compact(nodes), ","), ""
281+
}
282+
283+
var static, dynamic []string
284+
for _, node := range nodes {
285+
if dynamicNodeMap[node] {
286+
dynamic = append(dynamic, node)
287+
} else {
288+
static = append(static, node)
289+
}
290+
}
291+
292+
return strings.Join(cluset.Compact(static), ","), strings.Join(cluset.Compact(dynamic), ",")
293+
}

0 commit comments

Comments
 (0)