Skip to content

Commit 7be01f7

Browse files
committed
feat: add krun & bug fixes
1 parent 035d394 commit 7be01f7

File tree

19 files changed

+1038
-23
lines changed

19 files changed

+1038
-23
lines changed

PROJECT

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,12 @@ resources:
3737
webhooks:
3838
defaulting: true
3939
webhookVersion: v1
40+
- core: true
41+
group: batch
42+
kind: Job
43+
path: k8s.io/api/batch/v1
44+
version: v1
45+
webhooks:
46+
defaulting: true
47+
webhookVersion: v1
4048
version: "3"

cmd/krun/init.go

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"fmt"
7+
"log"
8+
"net"
9+
"os"
10+
"strconv"
11+
"strings"
12+
"time"
13+
14+
"github.com/urfave/cli/v2"
15+
)
16+
17+
func determineServiceName() (string, error) {
18+
// Firstly get from environment variable KRUN_SERVICE
19+
if serviceName := os.Getenv("KRUN_SERVICE"); serviceName != "" {
20+
return serviceName, nil
21+
}
22+
// Secondly get from current hostname
23+
hostname, err := os.Hostname()
24+
if err != nil {
25+
return "", wrapError("determine_service_name", err)
26+
}
27+
firstElem := strings.Split(hostname, ".")[0]
28+
lastStashPos := strings.LastIndex(firstElem, "-")
29+
if lastStashPos != -1 {
30+
suffix := firstElem[lastStashPos+1:]
31+
if _, err := strconv.ParseInt(suffix, 10, 64); err != nil {
32+
return "", wrapError("determine_service_name", errors.New("hostname not indexed"))
33+
}
34+
firstElem = firstElem[:lastStashPos]
35+
return firstElem, nil
36+
}
37+
return "", wrapError("determine_service_name", errors.New("hostname not indexed"))
38+
}
39+
40+
func getIPsOfService(serviceName string) ([]string, error) {
41+
ips, err := net.LookupIP(serviceName)
42+
if err != nil {
43+
return nil, wrapError("get_ips_of_service", err)
44+
}
45+
ipStrings := make([]string, 0, len(ips))
46+
for _, ip := range ips {
47+
ipStrings = append(ipStrings, ip.String())
48+
}
49+
return ipStrings, nil
50+
}
51+
52+
func getRealHostnameAndIPOfPod(origIP string) (hostname, ip string, err error) {
53+
krunPath := os.Args[0]
54+
rawStr, err := waitGetOutput(sshRunner.Command(origIP, []string{krunPath, "_init"}))
55+
if err != nil {
56+
return "", "", wrapError("get_real_hostname_and_ip_of_pod", err)
57+
}
58+
var peerInfo InitPeerInfo
59+
err = json.Unmarshal([]byte(rawStr), &peerInfo)
60+
if err != nil {
61+
return "", "", wrapError("get_real_hostname_and_ip_of_pod", err)
62+
}
63+
return peerInfo.Hostname, peerInfo.IP, nil
64+
}
65+
66+
type InitPeerInfo struct {
67+
Hostname string `json:"h"`
68+
IP string `json:"i"`
69+
}
70+
71+
func initPeerInfoHandler(c *cli.Context) error {
72+
h, err := getShortHostname()
73+
if err != nil {
74+
return err
75+
}
76+
i, err := getFastestIP()
77+
if err != nil {
78+
return err
79+
}
80+
peerInfo := &InitPeerInfo{
81+
Hostname: h,
82+
IP: i,
83+
}
84+
peerInfoBytes, err := json.Marshal(peerInfo)
85+
if err != nil {
86+
return err
87+
}
88+
fmt.Println(string(peerInfoBytes))
89+
return nil
90+
}
91+
92+
// 200ms, adjust to your need
93+
const ReadinessWaitInterval = 200 * time.Millisecond
94+
const ReadinessDelay = 100 * time.Millisecond
95+
96+
func initCommandHandler(c *cli.Context) error {
97+
// waitMinStr := os.Getenv("KRUN_WAIT_MIN")
98+
// if waitMinStr == "" {
99+
// waitMinStr = "1"
100+
// }
101+
// waitMin, err := strconv.Atoi(waitMinStr)
102+
// if err != nil {
103+
// return wrapError("init_command_handler", err)
104+
// }
105+
waitMin := c.Int("wait-min")
106+
107+
serviceName, err := determineServiceName()
108+
if err != nil {
109+
return err
110+
}
111+
log.Println("Service name:", serviceName)
112+
log.Println("Wait min:", waitMin)
113+
114+
var ipsOfService []string
115+
for len(ipsOfService) < waitMin {
116+
ipsOfService, err = getIPsOfService(serviceName)
117+
if err != nil {
118+
return err
119+
}
120+
log.Println("Discovered IPs of service:", ipsOfService)
121+
if len(ipsOfService) < waitMin {
122+
log.Printf("%v out of %v are ready. Still waiting...\n", len(ipsOfService), waitMin)
123+
time.Sleep(ReadinessWaitInterval)
124+
}
125+
}
126+
127+
// Such delay is used to make sure underlying services are started up correctly
128+
time.Sleep(ReadinessDelay)
129+
130+
results, err := PoolExecute(func(ip string) (*InitPeerInfo, error) {
131+
hostname, realIP, err := getRealHostnameAndIPOfPod(ip)
132+
return &InitPeerInfo{
133+
Hostname: hostname,
134+
IP: realIP,
135+
}, err
136+
}, ipsOfService, c.Int("threads"))
137+
if err != nil {
138+
return err
139+
}
140+
141+
for _, result := range results {
142+
log.Printf("Peer (Hostname: %v, IP: %v)\n", result.Hostname, result.IP)
143+
}
144+
145+
noHosts := c.Bool("no-hosts")
146+
147+
if !noHosts {
148+
err = writeHosts(results)
149+
if err != nil {
150+
return err
151+
}
152+
}
153+
154+
log.Println("Saving state file to", c.String("state-file")+"...")
155+
r, err := json.Marshal(results)
156+
if err != nil {
157+
return wrapError("init_command_handler", err)
158+
}
159+
err = os.WriteFile(c.String("state-file"), r, 0644)
160+
if err != nil {
161+
return wrapError("init_command_handler", err)
162+
}
163+
164+
return nil
165+
}
166+
167+
func writeHosts(results []*InitPeerInfo) error {
168+
log.Println("Generating hosts file...")
169+
170+
myHostname, err := getShortHostname()
171+
if err != nil {
172+
return err
173+
}
174+
175+
orig, err := os.ReadFile("/etc/hosts")
176+
if err != nil {
177+
return wrapError("write_hosts", err)
178+
}
179+
180+
origStr := string(orig)
181+
182+
startBanner := "\n# --- KRUN GENERATED HOSTS START ---"
183+
startBanner += "\n# Generated by krun, do not modify manually\n\n"
184+
endedBanner := "\n# --- KRUN GENERATED HOSTS ENDED ---\n\n"
185+
186+
content := ""
187+
for _, result := range results {
188+
// FIX: skip the current pod, otherwise
189+
// intel MPI hydra fails to launch
190+
// processes across nodes.
191+
if result.Hostname == myHostname {
192+
continue
193+
}
194+
content += fmt.Sprintf("%v\t%v\n", result.IP, result.Hostname)
195+
}
196+
197+
startPos := strings.Index(origStr, startBanner)
198+
endPos := strings.Index(origStr, endedBanner)
199+
if startPos == -1 || endPos == -1 {
200+
origStr += startBanner + content + endedBanner
201+
} else {
202+
origStr = origStr[:startPos] + startBanner + content + origStr[endPos:]
203+
}
204+
205+
err = os.WriteFile("/etc/hosts", []byte(origStr), 0644)
206+
if err != nil {
207+
return wrapError("write_hosts", err)
208+
}
209+
210+
return nil
211+
}
212+
213+
func loadState(c *cli.Context) ([]*InitPeerInfo, error) {
214+
r, err := os.ReadFile(c.String("state-file"))
215+
if err != nil {
216+
if os.IsNotExist(err) {
217+
return nil, errors.New("failed to load state file, please run `krun init` first")
218+
}
219+
return nil, wrapError("load_state", err)
220+
}
221+
var results []*InitPeerInfo
222+
err = json.Unmarshal(r, &results)
223+
if err != nil {
224+
return nil, wrapError("load_state", err)
225+
}
226+
return results, nil
227+
}
228+
229+
func hostsCommandHandler(c *cli.Context) error {
230+
results, err := loadState(c)
231+
if err != nil {
232+
return err
233+
}
234+
235+
short := c.Bool("short")
236+
noHeaders := c.Bool("no-headers")
237+
hostname := c.Bool("hostname")
238+
ip := c.Bool("ip")
239+
suffix := c.String("suffix")
240+
241+
if !hostname && !ip {
242+
hostname = true
243+
if !short {
244+
ip = true
245+
}
246+
}
247+
248+
fieldDelim := "\t"
249+
itemDelim := "\n"
250+
251+
if short {
252+
fieldDelim = " "
253+
itemDelim = ","
254+
}
255+
256+
if !short && !noHeaders {
257+
if hostname {
258+
fmt.Print("HOSTNAME")
259+
}
260+
if hostname && ip {
261+
fmt.Print(fieldDelim)
262+
}
263+
if ip {
264+
fmt.Print("IP")
265+
}
266+
}
267+
268+
first := true
269+
270+
for _, result := range results {
271+
if first && (short || noHeaders) {
272+
first = false
273+
} else {
274+
fmt.Print(itemDelim)
275+
}
276+
if hostname {
277+
fmt.Print(result.Hostname)
278+
}
279+
if hostname && ip {
280+
fmt.Print(fieldDelim)
281+
}
282+
if ip {
283+
fmt.Print(result.IP)
284+
}
285+
fmt.Print(suffix)
286+
}
287+
288+
fmt.Println()
289+
290+
return nil
291+
}

cmd/krun/log.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package main
2+
3+
import "fmt"
4+
5+
func wrapError(wrapper string, err error) error {
6+
if err != nil {
7+
return fmt.Errorf("%s: %w", wrapper, err)
8+
}
9+
return nil
10+
}

0 commit comments

Comments
 (0)