-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.go
More file actions
332 lines (300 loc) · 9.7 KB
/
main.go
File metadata and controls
332 lines (300 loc) · 9.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
package main
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"path/filepath"
"strings"
"time"
_ "modernc.org/sqlite"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
const (
dsn = "file:cmdb.db?cache=shared&mode=rwc"
)
// ---------- DB ----------
func openDB() (*sql.DB, error) {
db, err := sql.Open("sqlite", dsn)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(1) // SQLite 单连接足够
return db, nil
}
func initSchema(db *sql.DB) error {
podTable := `
CREATE TABLE IF NOT EXISTS pods(
uid TEXT PRIMARY KEY,
name TEXT,
namespace TEXT,
phase TEXT,
node_name TEXT,
pod_ip TEXT,
created_at TEXT,
updated_at TEXT
);`
nodeTable := `
CREATE TABLE IF NOT EXISTS nodes(
name TEXT PRIMARY KEY,
labels TEXT,
capacity_cpu TEXT,
capacity_mem TEXT,
internal_ip TEXT,
created_at TEXT,
updated_at TEXT
);`
_, err := db.Exec(podTable)
if err != nil {
return err
}
_, err = db.Exec(nodeTable)
return err
}
func upsertPod(db *sql.DB, p *corev1.Pod) error {
if p == nil {
return errors.New("nil pod")
}
uid := string(p.UID)
now := time.Now().Format(time.RFC3339)
_, err := db.Exec(`
INSERT INTO pods(uid,name,namespace,phase,node_name,pod_ip,created_at,updated_at)
VALUES(?,?,?,?,?,?,?,?)
ON CONFLICT(uid) DO UPDATE SET
name=excluded.name,
namespace=excluded.namespace,
phase=excluded.phase,
node_name=excluded.node_name,
pod_ip=excluded.pod_ip,
updated_at=excluded.updated_at
`, uid, p.Name, p.Namespace, string(p.Status.Phase), p.Spec.NodeName, p.Status.PodIP, now, now)
return err
}
func deletePod(db *sql.DB, uid string) error {
_, err := db.Exec(`DELETE FROM pods WHERE uid=?`, uid)
return err
}
func upsertNode(db *sql.DB, n *corev1.Node) error {
if n == nil {
return errors.New("nil node")
}
// 简化:取 CPU/内存为字符串、InternalIP
cpu := n.Status.Capacity.Cpu().String()
mem := n.Status.Capacity.Memory().String()
ip := ""
for _, a := range n.Status.Addresses {
if a.Type == corev1.NodeInternalIP {
ip = a.Address
break
}
}
// 展平 labels
var labels []string
for k, v := range n.Labels {
labels = append(labels, fmt.Sprintf("%s=%s", k, v))
}
now := time.Now().Format(time.RFC3339)
_, err := db.Exec(`
INSERT INTO nodes(name,labels,capacity_cpu,capacity_mem,internal_ip,created_at,updated_at)
VALUES(?,?,?,?,?,?,?)
ON CONFLICT(name) DO UPDATE SET
labels=excluded.labels,
capacity_cpu=excluded.capacity_cpu,
capacity_mem=excluded.capacity_mem,
internal_ip=excluded.internal_ip,
updated_at=excluded.updated_at
`, n.Name, strings.Join(labels, ","), cpu, mem, ip, now, now)
return err
}
func deleteNode(db *sql.DB, name string) error {
_, err := db.Exec(`DELETE FROM nodes WHERE name=?`, name)
return err
}
// ---------- K8s ----------
func getClientset() (*kubernetes.Clientset, error) {
kubeconfig := filepath.Join("/etc/rancher/k3s/k3s.yaml")
cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
return kubernetes.NewForConfig(cfg)
}
// ---------- HTTP DTO ----------
type PodRow struct {
UID string `json:"uid"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Phase string `json:"phase"`
NodeName string `json:"nodeName"`
PodIP string `json:"podIP"`
UpdatedAt string `json:"updatedAt"`
}
type NodeRow struct {
Name string `json:"name"`
Labels string `json:"labels"`
CPU string `json:"cpu"`
Memory string `json:"memory"`
InternalIP string `json:"internalIP"`
UpdatedAt string `json:"updatedAt"`
}
// ---------- HTTP Handlers ----------
func podsAPI(db *sql.DB) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ns := r.URL.Query().Get("ns")
var rows *sql.Rows
var err error
if ns == "" {
rows, err = db.Query(`SELECT uid,name,namespace,phase,node_name,pod_ip,updated_at FROM pods ORDER BY namespace,name`)
} else {
rows, err = db.Query(`SELECT uid,name,namespace,phase,node_name,pod_ip,updated_at FROM pods WHERE namespace=? ORDER BY name`, ns)
}
if err != nil {
http.Error(w, err.Error(), 500)
return
}
defer rows.Close()
var out []PodRow
for rows.Next() {
var p PodRow
if err := rows.Scan(&p.UID, &p.Name, &p.Namespace, &p.Phase, &p.NodeName, &p.PodIP, &p.UpdatedAt); err != nil {
http.Error(w, err.Error(), 500)
return
}
out = append(out, p)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(out)
}
}
func nodesAPI(db *sql.DB) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
rows, err := db.Query(`SELECT name,labels,capacity_cpu,capacity_mem,internal_ip,updated_at FROM nodes ORDER BY name`)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
defer rows.Close()
var out []NodeRow
for rows.Next() {
var n NodeRow
if err := rows.Scan(&n.Name, &n.Labels, &n.CPU, &n.Memory, &n.InternalIP, &n.UpdatedAt); err != nil {
http.Error(w, err.Error(), 500)
return
}
out = append(out, n)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(out)
}
}
// ---------- Bootstrap ----------
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
// DB
db, err := openDB()
if err != nil {
log.Fatalf("open db: %v", err)
}
if err := initSchema(db); err != nil {
log.Fatalf("init schema: %v", err)
}
// K8s
client, err := getClientset()
if err != nil {
log.Fatalf("load kubeconfig: %v", err)
}
// Informers(全命名空间)
// 也可换成 factory := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithNamespace("default"))
factory := informers.NewSharedInformerFactory(client, 0)
// Pod Informer
podInformer := factory.Core().V1().Pods().Informer()
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
if err := upsertPod(db, pod); err != nil {
log.Printf("[pods/add] %s/%s err=%v", pod.Namespace, pod.Name, err)
} else {
log.Printf("[pods/add] %s/%s", pod.Namespace, pod.Name)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
pod := newObj.(*corev1.Pod)
if err := upsertPod(db, pod); err != nil {
log.Printf("[pods/update] %s/%s err=%v", pod.Namespace, pod.Name, err)
}
},
DeleteFunc: func(obj interface{}) {
// Delete 时 obj 可能是 DeletedFinalStateUnknown
switch t := obj.(type) {
case *corev1.Pod:
_ = deletePod(db, string(t.UID))
log.Printf("[pods/del] %s/%s", t.Namespace, t.Name)
case cache.DeletedFinalStateUnknown:
if p, ok := t.Obj.(*corev1.Pod); ok {
_ = deletePod(db, string(p.UID))
log.Printf("[pods/delDFSU] %s/%s", p.Namespace, p.Name)
}
}
},
})
// Node Informer(示例加了一个 field selector 的写法)
nodeInformer := factory.Core().V1().Nodes().Informer()
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
n := obj.(*corev1.Node)
if err := upsertNode(db, n); err != nil {
log.Printf("[nodes/add] %s err=%v", n.Name, err)
} else {
log.Printf("[nodes/add] %s", n.Name)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
n := newObj.(*corev1.Node)
if err := upsertNode(db, n); err != nil {
log.Printf("[nodes/update] %s err=%v", n.Name, err)
}
},
DeleteFunc: func(obj interface{}) {
switch t := obj.(type) {
case *corev1.Node:
_ = deleteNode(db, t.Name)
log.Printf("[nodes/del] %s", t.Name)
case cache.DeletedFinalStateUnknown:
if n, ok := t.Obj.(*corev1.Node); ok {
_ = deleteNode(db, n.Name)
log.Printf("[nodes/delDFSU] %s", n.Name)
}
}
},
})
// 启动 informer
stop := make(chan struct{})
factory.Start(stop)
// 等待缓存同步
factory.WaitForCacheSync(stop)
// HTTP
mux := http.NewServeMux()
mux.HandleFunc("/cmdb/pods", podsAPI(db))
mux.HandleFunc("/cmdb/nodes", nodesAPI(db))
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("ok")) })
srv := &http.Server{
Addr: ":8080",
Handler: mux,
ReadHeaderTimeout: 5 * time.Second,
}
log.Println("LightCMDB Week3 started on :8080")
log.Fatal(srv.ListenAndServe())
// 优雅退出(保留示例)
_ = fields.Everything // 引用避免未使用(示例中没有真正用到)
_ = metav1.NamespaceAll
_ = context.Background()
}