Skip to content

Commit 263c690

Browse files
committed
fix: correct push protocol and WebSocket message handling
- Enable WebSocket compression (permessage-deflate) to match server - Add `extension` field to push metadata (required by server) - Fix push protocol: read metadata response before sending chunks, read per-chunk acks, handle "ok"/"next" server responses - Drain pending push notifications before pushing in push command - Handle server error field `err` in addition to `error` - Add channel-based message dispatcher (readLoop) to safely multiplex push notifications, responses, and binary frames across goroutines Previously pushes silently failed: the server rejected them but errors were swallowed due to missing error field mapping and wrong message ordering. The WebSocket connection also dropped every ~20s due to unhandled compression frames.
1 parent 0ba53e5 commit 263c690

4 files changed

Lines changed: 225 additions & 91 deletions

File tree

internal/cmd/push.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,17 @@ func (c *PushCmd) Run(ctx context.Context, flags *RootFlags) error {
165165
}
166166
defer sc.Close()
167167

168+
// Drain pending push notifications until "ready" before pushing.
169+
for {
170+
msg, err := sc.ReceivePush(ctx)
171+
if err != nil {
172+
return fmt.Errorf("receive push: %w", err)
173+
}
174+
if msg.Op == "ready" {
175+
break
176+
}
177+
}
178+
168179
// Push changed/new files.
169180
var pushCount int
170181
for _, path := range pushPaths {

internal/cmd/push_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ func TestPushCmd_Success(t *testing.T) {
2525

2626
var pushedPaths []string
2727
cleanup := mockSyncServer(t, vaults, func(conn *websocket.Conn) {
28+
// Send ready so client drains initial sync.
29+
conn.WriteJSON(map[string]any{"op": "ready", "version": int64(0)})
30+
2831
// Read push metadata.
2932
var meta map[string]any
3033
if err := conn.ReadJSON(&meta); err != nil {
@@ -37,6 +40,9 @@ func TestPushCmd_Success(t *testing.T) {
3740
plainPath, _ := crypto.DecryptPath(key, encPath)
3841
pushedPaths = append(pushedPaths, plainPath)
3942

43+
// Respond to metadata.
44+
conn.WriteJSON(map[string]any{})
45+
4046
// Read binary chunk(s).
4147
pieces := int(meta["pieces"].(float64))
4248
for i := 0; i < pieces; i++ {
@@ -45,10 +51,9 @@ func TestPushCmd_Success(t *testing.T) {
4551
t.Logf("read chunk: %v", err)
4652
return
4753
}
54+
// Ack chunk.
55+
conn.WriteJSON(map[string]any{"res": "ok"})
4856
}
49-
50-
// Send ack.
51-
conn.WriteJSON(map[string]any{"res": "ok"})
5257
})
5358
defer cleanup()
5459

@@ -104,6 +109,9 @@ func TestPushCmd_DeletedFile(t *testing.T) {
104109

105110
var deletedPaths []string
106111
cleanup := mockSyncServer(t, vaults, func(conn *websocket.Conn) {
112+
// Send ready so client drains initial sync.
113+
conn.WriteJSON(map[string]any{"op": "ready", "version": int64(10)})
114+
107115
// Read delete metadata.
108116
var meta map[string]any
109117
if err := conn.ReadJSON(&meta); err != nil {

0 commit comments

Comments
 (0)