Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion info/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ module github.com/labring-sigs/sealos-migrate/info

go 1.21

require github.com/modood/table v0.0.0-20220527013332-8d47e76dad33
require (
github.com/lib/pq v1.10.9
github.com/modood/table v0.0.0-20220527013332-8d47e76dad33
)

require github.com/smartystreets/goconvey v1.8.1 // indirect
2 changes: 2 additions & 0 deletions info/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25d
github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/modood/table v0.0.0-20220527013332-8d47e76dad33 h1:T5IbS9C1G2zeHb6eBy6OfIvj5tfQB23kGFpewCJuGDg=
github.com/modood/table v0.0.0-20220527013332-8d47e76dad33/go.mod h1:41qyXVI5QH9/ObyPj27CGCVau5v/njfc3Gjj7yzr0HQ=
github.com/smarty/assertions v1.15.0 h1:cR//PqUBUiQRakZWqBiFFQ9wb8emQGDb0HeGdqGByCY=
Expand Down
249 changes: 248 additions & 1 deletion info/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,31 @@ package main

import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"database/sql"
"encoding/base64"
"encoding/csv"
"encoding/json"
"errors"
"flag"
"fmt"
"net"
"net/url"
"os"
"os/exec"
"path/filepath"
"strings"
"time"

_ "github.com/lib/pq"
"github.com/modood/table"
)

const (
defaultSealosEnvPath = "/root/.sealos/cloud/sealos.env"
defaultGlobalsPath = "/root/.sealos/cloud/values/globals.yaml"
)

const (
Expand Down Expand Up @@ -959,12 +965,20 @@ func buildPath(stack []yamlKey, leaf string) string {
}

func lookupUserUID(dbURI, userID string) (string, error) {
resolvedURI, useDirectQuery, err := resolveLookupUserDBURI(dbURI)
if err != nil {
return "", err
}
if useDirectQuery {
return lookupUserUIDDirect(resolvedURI, userID)
}

query := fmt.Sprintf("SELECT id, uid FROM \"User\" WHERE id='%s' LIMIT 1;", escapeSQLLiteral(userID))
podName, err := findCockroachPod()
if err != nil {
return "", err
}
localURL, urlErr := rewriteCockroachURLForLocalhost(dbURI)
localURL, urlErr := rewriteCockroachURLForLocalhost(resolvedURI)
if urlErr != nil {
return "", urlErr
}
Expand Down Expand Up @@ -1033,6 +1047,239 @@ func parseUserUIDCSV(output, userID string) (string, error) {
return "", fmt.Errorf("数据库未找到用户 %s", userID)
}

func lookupUserUIDDirect(dbURI, userID string) (string, error) {
uid, err := queryUserUIDDirect(dbURI, userID)
if err == nil {
return uid, nil
}

if shouldRetryDirectQueryWithoutSSL(err, dbURI) {
retryURI, rewriteErr := forceDisableDirectDatabaseQuerySSL(dbURI)
if rewriteErr != nil {
return "", rewriteErr
}
return queryUserUIDDirect(retryURI, userID)
}

return "", err
}

func queryUserUIDDirect(dbURI, userID string) (string, error) {
db, err := sql.Open("postgres", dbURI)
if err != nil {
return "", fmt.Errorf("创建数据库连接失败: %v", err)
}
defer db.Close()

db.SetMaxOpenConns(1)
db.SetMaxIdleConns(0)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

var returnedID string
var uid string
err = db.QueryRowContext(ctx, `SELECT id, uid FROM "User" WHERE id=$1 LIMIT 1`, userID).Scan(&returnedID, &uid)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return "", fmt.Errorf("数据库未找到用户 %s", userID)
}
return "", fmt.Errorf("直连数据库查询失败: %v", err)
}
uid = strings.TrimSpace(uid)
if uid == "" {
return "", fmt.Errorf("数据库未找到用户 %s 的 UID", userID)
}
return uid, nil
}

func shouldRetryDirectQueryWithoutSSL(err error, dbURI string) bool {
if err == nil {
return false
}
if !strings.Contains(err.Error(), "SSL is not enabled on the server") {
return false
}
parsed, parseErr := url.Parse(dbURI)
if parseErr != nil {
return false
}
return parsed.Query().Get("sslmode") != "disable"
}

func resolveLookupUserDBURI(defaultURI string) (string, bool, error) {
return resolveLookupUserDBURIWithGlobalsPath(defaultURI, defaultGlobalsPath, resolveServiceClusterAddress)
}

func resolveLookupUserDBURIWithGlobalsPath(defaultURI, globalsPath string, resolver func(serviceName, namespace, port string) (string, error)) (string, bool, error) {
overrideURI, err := getGlobalDatabaseURIOverride(globalsPath)
if err != nil {
return "", false, fmt.Errorf("读取 globals.yaml 失败: %v", err)
}
if strings.TrimSpace(overrideURI) == "" {
return defaultURI, false, nil
}
if !isKubernetesServiceURI(overrideURI) {
normalizedURI, err := ensureDirectDatabaseQueryURI(overrideURI)
if err != nil {
return "", false, err
}
return normalizedURI, true, nil
}

resolvedURI, err := rewriteCockroachURLForClusterServiceWithResolver(overrideURI, resolver)
if err != nil {
return "", false, err
}
normalizedURI, err := ensureDirectDatabaseQueryURI(resolvedURI)
if err != nil {
return "", false, err
}
return normalizedURI, true, nil
}

func getGlobalDatabaseURIOverride(globalsPath string) (string, error) {
content, err := os.ReadFile(globalsPath)
if err != nil {
if os.IsNotExist(err) {
return "", nil
}
return "", err
}
return parseGlobalDatabaseURIFromGlobals(string(content)), nil
}

func parseGlobalDatabaseURIFromGlobals(content string) string {
lines := strings.Split(content, "\n")
stack := make([]yamlKey, 0, 8)
for _, rawLine := range lines {
line := strings.TrimRight(rawLine, " \t\r")
trimmed := strings.TrimSpace(line)
if trimmed == "" || strings.HasPrefix(trimmed, "#") {
continue
}

indent := leadingSpaces(rawLine)
parts := strings.SplitN(trimmed, ":", 2)
key := strings.TrimSpace(parts[0])
value := ""
if len(parts) > 1 {
value = strings.TrimSpace(parts[1])
}

for len(stack) > 0 && indent <= stack[len(stack)-1].indent {
stack = stack[:len(stack)-1]
}

if value == "" {
stack = append(stack, yamlKey{indent: indent, key: key})
continue
}

value = strings.Trim(value, `"'`)
if buildPath(stack, key) == "global.featureConfigs.globalDatabase.uri" {
return value
}
}
return ""
}

func isKubernetesServiceURI(dbURI string) bool {
parsed, err := url.Parse(dbURI)
if err != nil {
return false
}
return isKubernetesServiceHost(parsed.Hostname())
}

func isKubernetesServiceHost(host string) bool {
parts := strings.Split(strings.TrimSuffix(strings.TrimSpace(host), "."), ".")
return len(parts) >= 3 && parts[2] == "svc"
}

func rewriteCockroachURLForClusterServiceWithResolver(dbURI string, resolver func(serviceName, namespace, port string) (string, error)) (string, error) {
parsed, err := url.Parse(dbURI)
if err != nil {
return "", fmt.Errorf("解析数据库地址失败: %v", err)
}
serviceName, namespace, err := splitKubernetesServiceHost(parsed.Hostname())
if err != nil {
return "", err
}

address, err := resolver(serviceName, namespace, parsed.Port())
if err != nil {
return "", err
}
parsed.Host = address
return parsed.String(), nil
}

func ensureDirectDatabaseQueryURI(dbURI string) (string, error) {
parsed, err := url.Parse(dbURI)
if err != nil {
return "", fmt.Errorf("解析数据库地址失败: %v", err)
}
if parsed.Scheme == "" || parsed.Host == "" {
return "", fmt.Errorf("数据库地址格式不正确")
}

query := parsed.Query()
if query.Get("sslmode") == "" {
query.Set("sslmode", "disable")
}
parsed.RawQuery = query.Encode()
return parsed.String(), nil
}

func forceDisableDirectDatabaseQuerySSL(dbURI string) (string, error) {
parsed, err := url.Parse(dbURI)
if err != nil {
return "", fmt.Errorf("解析数据库地址失败: %v", err)
}
if parsed.Scheme == "" || parsed.Host == "" {
return "", fmt.Errorf("数据库地址格式不正确")
}

query := parsed.Query()
query.Set("sslmode", "disable")
parsed.RawQuery = query.Encode()
return parsed.String(), nil
}

func splitKubernetesServiceHost(host string) (string, string, error) {
parts := strings.Split(strings.TrimSuffix(strings.TrimSpace(host), "."), ".")
if len(parts) < 3 || parts[2] != "svc" {
return "", "", fmt.Errorf("数据库地址不是 k8s.svc Service 地址: %s", host)
}
return parts[0], parts[1], nil
}

func resolveServiceClusterAddress(serviceName, namespace, port string) (string, error) {
clusterIP, err := runCommand("kubectl", "get", "svc", serviceName, "-n", namespace, "-o", "jsonpath={.spec.clusterIP}")
if err != nil {
return "", fmt.Errorf("获取 Service %s/%s ClusterIP 失败: %v", namespace, serviceName, err)
}
clusterIP = strings.TrimSpace(clusterIP)
if clusterIP == "" || strings.EqualFold(clusterIP, "None") {
return "", fmt.Errorf("Service %s/%s 没有可用的 ClusterIP", namespace, serviceName)
}

resolvedPort := strings.TrimSpace(port)
if resolvedPort == "" {
resolvedPort, err = runCommand("kubectl", "get", "svc", serviceName, "-n", namespace, "-o", "jsonpath={.spec.ports[0].port}")
if err != nil {
return "", fmt.Errorf("获取 Service %s/%s 端口失败: %v", namespace, serviceName, err)
}
resolvedPort = strings.TrimSpace(resolvedPort)
if resolvedPort == "" {
return "", fmt.Errorf("Service %s/%s 没有可用的端口", namespace, serviceName)
}
}

return net.JoinHostPort(clusterIP, resolvedPort), nil
}

func findCockroachPod() (string, error) {
labels := []string{
"app.kubernetes.io/name=cockroachdb",
Expand Down
Loading