Skip to content

Commit 58f4a1f

Browse files
authored
fix: Added support of JSONDoc for realtime events + more logs (#4625)
### Description Realtime events are serialized/deserialized through Redis pub/sub, causing the original *vfs.FileDoc type to become *realtime.JSONDoc. This fix adds handling for real-time.JSONDoc.
2 parents a149730 + 69be64e commit 58f4a1f

File tree

1 file changed

+45
-0
lines changed

1 file changed

+45
-0
lines changed

web/sharings/realtime.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,14 @@ func wsWrite(ws *websocket.Conn, ch chan *wsResponse, errc chan *wsError) error
167167
}
168168

169169
func filterMapEvents(ds *realtime.Subscriber, ch chan *wsResponse, inst *instance.Instance, s *sharing.Sharing) {
170+
log := inst.Logger().WithNamespace("sharing-realtime")
171+
170172
var dir vfs.DirDoc
171173
if err := couchdb.GetDoc(inst, consts.Files, s.Rules[0].Values[0], &dir); err != nil {
174+
log.Errorf("filterMapEvents: failed to get root dir %s: %s", s.Rules[0].Values[0], err)
172175
return
173176
}
177+
log.Debugf("filterMapEvents: watching %s for sharing %s", dir.Fullpath, s.SID)
174178

175179
testPath := func(doc realtime.Doc) bool {
176180
if d, ok := doc.(*vfs.DirDoc); ok {
@@ -186,6 +190,27 @@ func filterMapEvents(ds *realtime.Subscriber, ch chan *wsResponse, inst *instanc
186190
}
187191
return strings.HasPrefix(p, dir.Fullpath+"/")
188192
}
193+
// Handle realtime.JSONDoc (happens when events come through Redis in multi-stack)
194+
if j, ok := doc.(*realtime.JSONDoc); ok {
195+
docType, _ := j.M["type"].(string)
196+
if docType == consts.DirType {
197+
fullpath, _ := j.M["path"].(string)
198+
return strings.HasPrefix(fullpath, dir.Fullpath+"/")
199+
}
200+
dirID, _ := j.M["dir_id"].(string)
201+
if dirID == "" {
202+
log.Warnf("filterMapEvents: failed to get dir_id in event doc with id:%s", doc.ID())
203+
return false
204+
}
205+
parentDir, err := inst.VFS().DirByID(dirID)
206+
if err != nil {
207+
log.Warnf("filterMapEvents: failed to get parent dir %s: %s", dirID, err)
208+
return false
209+
}
210+
name, _ := j.M["name"].(string)
211+
filePath := parentDir.Fullpath + "/" + name
212+
return strings.HasPrefix(filePath, dir.Fullpath+"/")
213+
}
189214
return false
190215
}
191216

@@ -220,6 +245,8 @@ func filterMapEvents(ds *realtime.Subscriber, ch chan *wsResponse, inst *instanc
220245
}
221246

222247
func wsOwner(c echo.Context, inst *instance.Instance, s *sharing.Sharing) error {
248+
log := inst.Logger().WithNamespace("sharing-realtime")
249+
223250
ws, err := upgradeWs(c)
224251
if err != nil {
225252
return nil
@@ -238,12 +265,14 @@ func wsOwner(c echo.Context, inst *instance.Instance, s *sharing.Sharing) error
238265
defer close(errc)
239266
pdoc, wsErr := getPermission(c, inst, ws)
240267
if wsErr != nil {
268+
log.Warnf("wsOwner: AUTH failed for sharing %s", s.SID)
241269
sendErr(ctx, errc, wsErr)
242270
return
243271
}
244272
for _, rule := range s.Rules {
245273
for _, value := range rule.Values {
246274
if !pdoc.Permissions.AllowID(permission.GET, rule.DocType, value) {
275+
log.Warnf("wsOwner: permission denied for sharing %s", s.SID)
247276
sendErr(ctx, errc, unauthorized(pdoc))
248277
return
249278
}
@@ -259,6 +288,8 @@ func wsOwner(c echo.Context, inst *instance.Instance, s *sharing.Sharing) error
259288
// Ws is the API handler for realtime via a websocket connection.
260289
func Ws(c echo.Context) error {
261290
inst := middlewares.GetInstance(c)
291+
log := inst.Logger().WithNamespace("sharing-realtime")
292+
262293
s, err := sharing.FindSharing(inst, c.Param("id"))
263294
if err != nil {
264295
return wrapErrors(err)
@@ -268,6 +299,7 @@ func Ws(c echo.Context) error {
268299
}
269300

270301
if s.Owner {
302+
log.Debugf("Ws: new connection for sharing %s (owner mode)", s.SID)
271303
return wsOwner(c, inst, s)
272304
}
273305

@@ -291,12 +323,16 @@ func Ws(c echo.Context) error {
291323
// instances are on the same domain (same stack), we can watch directly
292324
// the real-time events. It helps for performances.
293325
if owner, err := lifecycle.GetInstance(u.Host); err == nil {
326+
log.Debugf("Ws: new connection for sharing %s (hijack mode, owner=%s)", s.SID, owner.Domain)
294327
return wsHijack(c, inst, owner, s)
295328
}
329+
log.Debugf("Ws: new connection for sharing %s (proxy mode, owner=%s)", s.SID, u.Host)
296330
return wsProxy(c, inst, s, token)
297331
}
298332

299333
func wsHijack(c echo.Context, inst, owner *instance.Instance, s *sharing.Sharing) error {
334+
log := inst.Logger().WithNamespace("sharing-realtime")
335+
300336
ws, err := upgradeWs(c)
301337
if err != nil {
302338
return nil
@@ -315,10 +351,12 @@ func wsHijack(c echo.Context, inst, owner *instance.Instance, s *sharing.Sharing
315351
defer close(errc)
316352
pdoc, wsErr := getPermission(c, inst, ws)
317353
if wsErr != nil {
354+
log.Warnf("wsHijack: AUTH failed for sharing %s", s.SID)
318355
sendErr(ctx, errc, wsErr)
319356
return
320357
}
321358
if !pdoc.Permissions.AllowWholeType(permission.GET, consts.Files) {
359+
log.Warnf("wsHijack: permission denied for sharing %s", s.SID)
322360
sendErr(ctx, errc, unauthorized(pdoc))
323361
return
324362
}
@@ -330,6 +368,8 @@ func wsHijack(c echo.Context, inst, owner *instance.Instance, s *sharing.Sharing
330368
}
331369

332370
func wsProxy(c echo.Context, inst *instance.Instance, s *sharing.Sharing, token string) error {
371+
log := inst.Logger().WithNamespace("sharing-realtime")
372+
333373
ws, err := upgradeWs(c)
334374
if err != nil {
335375
return nil
@@ -346,10 +386,12 @@ func wsProxy(c echo.Context, inst *instance.Instance, s *sharing.Sharing, token
346386
defer close(errc)
347387
pdoc, wsErr := getPermission(c, inst, ws)
348388
if wsErr != nil {
389+
log.Warnf("wsProxy: AUTH failed for sharing %s", s.SID)
349390
sendErr(ctx, errc, wsErr)
350391
return
351392
}
352393
if !pdoc.Permissions.AllowWholeType(permission.GET, consts.Files) {
394+
log.Warnf("wsProxy: permission denied for sharing %s", s.SID)
353395
sendErr(ctx, errc, unauthorized(pdoc))
354396
return
355397
}
@@ -372,6 +414,7 @@ func wsProxy(c echo.Context, inst *instance.Instance, s *sharing.Sharing, token
372414

373415
ownerWS, _, err := websocket.DefaultDialer.Dial(wsURL.String(), nil)
374416
if err != nil {
417+
log.Errorf("wsProxy: failed to dial owner %s: %s", wsURL.String(), err)
375418
return
376419
}
377420
defer ownerWS.Close()
@@ -381,8 +424,10 @@ func wsProxy(c echo.Context, inst *instance.Instance, s *sharing.Sharing, token
381424
"payload": token,
382425
}
383426
if err := ownerWS.WriteJSON(authMsg); err != nil {
427+
log.Errorf("wsProxy: failed to send AUTH to owner: %s", err)
384428
return
385429
}
430+
log.Debugf("wsProxy: connected to owner %s for sharing %s", u.Host, s.SID)
386431

387432
for {
388433
var msg wsResponse

0 commit comments

Comments
 (0)