Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions pkg/edgeview/src/basics.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,19 +524,31 @@ func getJSONFileID(path string) string {
return ""
}

// we still keep this for backward compatibility, can remove this later when
// all the EVE versions are supporting full SHA256 hash
func getTokenHashString(token string) []byte {
if edgeviewInstID > 0 {
token = token + "." + strconv.Itoa(edgeviewInstID)
}
h := sha256.New()
_, err := h.Write([]byte(token))
if err != nil {
fmt.Printf("hash write error: %v\n", err)
}
h.Write([]byte(token))
hash16 := h.Sum(nil)[:16]
return []byte(base64.RawURLEncoding.EncodeToString(hash16))
}

// getTokenHashStringFull returns the full 32-byte SHA256 hash of the token
// encoded with base64 RawURLEncoding. Newer dispatchers should accept this
// stronger form but older ones may still expect the 16-byte truncated version.
func getTokenHashStringFull(token string) []byte {
if edgeviewInstID > 0 {
token = token + "." + strconv.Itoa(edgeviewInstID)
}
h := sha256.New()
h.Write([]byte(token))
full := h.Sum(nil)
return []byte(base64.RawURLEncoding.EncodeToString(full))
}

// in the format of yyyyMMDDhhmmss to use as part of the file name
func getFileTimeStr(t1 time.Time) string {
t := t1.UTC()
Expand Down
80 changes: 62 additions & 18 deletions pkg/edgeview/src/edge-view.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,21 @@ var (
)

const (
agentName = "edgeview"
closeMessage = "+++Done+++"
tarCopyDoneMsg = "+++TarCopyDone+++"
edgeViewVersion = "0.8.8"
cpLogFileString = "copy-logfiles"
clientIPMsg = "YourEndPointIPAddr:"
serverRateMsg = "ServerRateLimit:disable"
tcpPktRate = MbpsToBytes * 5 * 1.2 // 125k Bytes * 5 * 1.2, or 5Mbits add 20%
tcpPktBurst = 65536 // burst allow bytes
tarMinVersion = "0.8.5" // for tar operation, expect client to have newer version
encMinVersion = "0.8.8" // for encryption operation, expect client to have newer version
verifyFailed = "+++Verify failed+++"
keepaliveInterval = 30 * time.Second // interval for sending websocket ping messages
agentName = "edgeview"
closeMessage = "+++Done+++"
tarCopyDoneMsg = "+++TarCopyDone+++"
edgeViewVersion = "0.8.8"
cpLogFileString = "copy-logfiles"
clientIPMsg = "YourEndPointIPAddr:"
noDeviceStr = "no device online"
serverConnectedMsg = "server is connected"
serverRateMsg = "ServerRateLimit:disable"
tcpPktRate = MbpsToBytes * 5 * 1.2 // 125k Bytes * 5 * 1.2, or 5Mbits add 20%
tcpPktBurst = 65536 // burst allow bytes
tarMinVersion = "0.8.5" // for tar operation, expect client to have newer version
encMinVersion = "0.8.7" // for encryption operation, expect client to have newer version
verifyFailed = "+++Verify failed+++"
keepaliveInterval = 30 * time.Second // interval for sending websocket ping messages
)

type cmdOpt struct {
Expand Down Expand Up @@ -301,15 +303,57 @@ func main() {
urlWSS := url.URL{Scheme: "wss", Host: wsAddr, Path: pathStr}

var done chan struct{}
var tokenHash string
var tokenHash16 string
hostname, _ := os.Hostname()
if edgeviewInstID > 0 {
hostname = hostname + "-inst-" + strconv.Itoa(edgeviewInstID)
}
tokenHash16 = string(getTokenHashString(*ptoken))
// Prefer full 32-byte hash, but probe dispatcher to detect server compatibility.
fullHash := string(getTokenHashStringFull(*ptoken))
shortHash := string(getTokenHashString(*ptoken))
tokenHash16 = shortHash

fmt.Printf("%s connecting to %s\n", hostname, urlWSS.String())
// on server, the script will retry in some minutes later
ok := setupWebC(hostname, tokenHash16, urlWSS, runOnServer)

// When running as client (not runOnServer), probe the dispatcher to
// determine whether it accepts the full 32-byte hash. Try the full hash
// first, fall back to the 16-byte form if probe indicates "no device".
if !runOnServer {
// probe full hash and inspect response body
_, probeBody := setupWebC(hostname, fullHash, urlWSS, runOnServer, true)
if strings.Contains(probeBody, serverConnectedMsg) {
tokenHash = fullHash
} else if strings.Contains(probeBody, noDeviceStr) {
// Could be no device or hash mismatch; try short hash
_, probeBody2 := setupWebC(hostname, shortHash, urlWSS, runOnServer, true)
if strings.Contains(probeBody2, serverConnectedMsg) {
tokenHash = shortHash
} else {
// neither form returned serverConnectedMsg; print dispatcher response and exit
if probeBody2 != "" {
fmt.Printf("probe response: %s\n", probeBody2)
} else {
fmt.Printf("probe response: %s\n", probeBody)
}
return
}
} else {
// probe returned something else (e.g., error text) - print and exit
if probeBody != "" {
fmt.Printf("probe response: %s\n", probeBody)
} else {
fmt.Printf("probe failed for full hash and no fallback available\n")
}
return
}
} else {
// when running on server/device, prefer the full 32-byte hash
tokenHash = fullHash
}

// Now establish the real websocket connection (not a probe)
ok, _ := setupWebC(hostname, tokenHash, urlWSS, runOnServer, false)
if !ok {
return
}
Expand Down Expand Up @@ -383,7 +427,7 @@ func main() {
var recvCmds cmdOpt
isJSON, verifyOK, message, keyComment := verifyEnvelopeData(msg, mtype == websocket.TextMessage)
if !isJSON {
if strings.Contains(string(msg), "no device online") {
if strings.Contains(string(msg), noDeviceStr) {
log.Tracef("read: peer not there yet, continue")
lostClientPeer = true
if isTCPServer {
Expand Down Expand Up @@ -411,7 +455,7 @@ func main() {
lostClientPeer = false

if mtype == websocket.TextMessage {
if strings.Contains(string(message), "no device online") ||
if strings.Contains(string(message), noDeviceStr) ||
strings.Contains(string(message), closeMessage) {
log.Tracef("read: no device, continue")
continue
Expand Down
55 changes: 48 additions & 7 deletions pkg/edgeview/src/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ var (
pipeBufHalfSize int
)

func setupWebC(hostname, token string, u url.URL, isServer bool) bool {
// setupWebC establishes connection to the dispatcher. If probe is true
// it will perform a lightweight HTTP probe (no websocket upgrade) and
// return the response body. Returns (connected, probeBody).
func setupWebC(hostname, token string, u url.URL, isServer bool, probe bool) (bool, string) {
var pport int
var pIP, serverStr string
var useProxy int
Expand Down Expand Up @@ -115,8 +118,43 @@ func setupWebC(hostname, token string, u url.URL, isServer bool) bool {
}
tlsDialer, err := tlsDial(isServer, proxyIP, proxyPort, intfSrcs, idx)
if err != nil {
return false
return false, ""
}
// transport built from the configured dialer so proxy and TLS
// settings are honored.
if probe {
transport := &http.Transport{
TLSClientConfig: tlsDialer.TLSClientConfig,
Proxy: tlsDialer.Proxy,
DialContext: tlsDialer.NetDialContext,
}
client := &http.Client{Transport: transport, Timeout: 10 * time.Second}
// Use HTTPS for probe requests regardless of websocket scheme
// The probe part in dispatcher is before upgrade to websocket
probeURL := u
probeURL.Scheme = "https"
req, err := http.NewRequest("GET", probeURL.String(), nil)
if err != nil {
return false, ""
}
req.Header.Set("X-Session-Probe", "true")
req.Header.Set("X-Session-Token", token)
resp, err := client.Do(req)
if err != nil {
// try next interface/proxy option
durr = durr * (retry + 1)
if durr > maxReconnWait {
durr = maxReconnWait
}
time.Sleep(time.Duration(durr) * time.Millisecond)
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
return true, string(body)
}

// Not a probe: perform the websocket dial (existing behavior)
c, resp, err := tlsDialer.Dial(u.String(),
http.Header{
"X-Session-Token": []string{token},
Expand All @@ -142,18 +180,22 @@ func setupWebC(hostname, token string, u url.URL, isServer bool) bool {
} else {
fmt.Printf("connect success to websocket server\n")
}
return true
return true, ""
}
if !isServer && retry > 1 {
return false
return false, ""
}
}
if probe { // only the client does probe, we don't need to retry, user can try again
return false, ""
}
// do exponential backoff after walking through all the interfaces
// to speed up the connection retrial
retry++
}
}

// probeDispatcher function removed as its logic is integrated into setupWebC
// TLS Dialer
func tlsDial(isServer bool, pIP string, pport int, src []net.IP, idx int) (*websocket.Dialer, error) {
tlsConfig := &tls.Config{}
Expand Down Expand Up @@ -308,14 +350,13 @@ func retryWebSocket(hostname, token string, urlWSS url.URL, err error) bool {
fmt.Printf("retryWebSocket: client timeout or reset, close and resetup websocket, %v\n", err)
}
time.Sleep(time.Duration(duration) * time.Millisecond)
ok := setupWebC(hostname, token, urlWSS, true)
ok, _ := setupWebC(hostname, token, urlWSS, true, false)
tcpRetryWait = false
if ok {
reconnectCnt = 0
return true
} else {
log.Noticef("retry failed.")
}
log.Noticef("retry failed.")
return false
}

Expand Down
Loading