Skip to content

Commit 2adac6d

Browse files
View Manager: handle view context with multiple sessions
Signed-off-by: Marcus Brandenburger <bur@zurich.ibm.com>
1 parent 5812b16 commit 2adac6d

File tree

1 file changed

+103
-84
lines changed

1 file changed

+103
-84
lines changed

platform/view/services/view/manager.go

Lines changed: 103 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ type Manager struct {
4141
metrics *Metrics
4242
localIdentityChecker LocalIdentityChecker
4343

44-
contextsSync sync.RWMutex
45-
ctx context.Context
46-
contexts map[string]disposableContext
44+
ctx context.Context
45+
contexts map[string]disposableContext
46+
contextsMu sync.RWMutex
4747
}
4848

4949
func NewManager(
@@ -119,16 +119,15 @@ func (cm *Manager) InitiateView(view view.View, ctx context.Context) (interface{
119119
return cm.InitiateViewWithIdentity(view, cm.me(), ctx)
120120
}
121121

122-
func (cm *Manager) InitiateViewWithIdentity(view view.View, id view.Identity, c context.Context) (interface{}, error) {
123-
// Create the context
124-
cm.contextsSync.Lock()
125-
ctx := cm.ctx
126-
cm.contextsSync.Unlock()
127-
if ctx == nil {
128-
ctx = context.Background()
122+
func (cm *Manager) InitiateViewWithIdentity(view view.View, id view.Identity, ctx context.Context) (interface{}, error) {
123+
// Get the managers context
124+
cm.contextsMu.Lock()
125+
cctx := cm.ctx
126+
cm.contextsMu.Unlock()
127+
if cctx == nil {
128+
cctx = context.Background()
129129
}
130-
ctx = trace.ContextWithSpanContext(ctx, trace.SpanContextFromContext(c))
131-
130+
ctx = trace.ContextWithSpanContext(cctx, trace.SpanContextFromContext(ctx))
132131
viewContext, err := NewContextForInitiator(
133132
"",
134133
ctx,
@@ -144,20 +143,20 @@ func (cm *Manager) InitiateViewWithIdentity(view view.View, id view.Identity, c
144143
if err != nil {
145144
return nil, err
146145
}
147-
childContext := &childContext{ParentContext: viewContext}
148-
cm.contextsSync.Lock()
149-
cm.contexts[childContext.ID()] = childContext
146+
c := &childContext{ParentContext: viewContext}
147+
cm.contextsMu.Lock()
148+
cm.contexts[c.ID()] = c
150149
cm.metrics.Contexts.Set(float64(len(cm.contexts)))
151-
cm.contextsSync.Unlock()
152-
defer cm.deleteContext(id, childContext.ID())
150+
cm.contextsMu.Unlock()
151+
defer cm.deleteContext(id, c.ID())
153152

154-
logger.DebugfContext(c, "[%s] InitiateView [view:%s], [ContextID:%s]", id, logging.Identifier(view), childContext.ID())
155-
res, err := childContext.RunView(view)
153+
logger.DebugfContext(ctx, "[%s] InitiateView [view:%s], [ContextID:%s]", id, logging.Identifier(view), c.ID())
154+
res, err := c.RunView(view)
156155
if err != nil {
157-
logger.DebugfContext(c, "[%s] InitiateView [view:%s], [ContextID:%s] failed [%s]", id, logging.Identifier(view), childContext.ID(), err)
156+
logger.DebugfContext(ctx, "[%s] InitiateView [view:%s], [ContextID:%s] failed [%s]", id, logging.Identifier(view), c.ID(), err)
158157
return nil, err
159158
}
160-
logger.DebugfContext(c, "[%s] InitiateView [view:%s], [ContextID:%s] terminated", id, logging.Identifier(view), childContext.ID())
159+
logger.DebugfContext(ctx, "[%s] InitiateView [view:%s], [ContextID:%s] terminated", id, logging.Identifier(view), c.ID())
161160
return res, nil
162161
}
163162

@@ -192,15 +191,15 @@ func (cm *Manager) InitiateContextFrom(ctx context.Context, view view.View, id v
192191
if err != nil {
193192
return nil, err
194193
}
195-
childContext := &childContext{ParentContext: viewContext}
196-
cm.contextsSync.Lock()
197-
cm.contexts[childContext.ID()] = childContext
194+
c := &childContext{ParentContext: viewContext}
195+
cm.contextsMu.Lock()
196+
cm.contexts[c.ID()] = c
198197
cm.metrics.Contexts.Set(float64(len(cm.contexts)))
199-
cm.contextsSync.Unlock()
198+
cm.contextsMu.Unlock()
200199

201-
logger.DebugfContext(ctx, "[%s] InitiateContext [view:%s], [ContextID:%s]\n", id, logging.Identifier(view), childContext.ID())
200+
logger.DebugfContext(ctx, "[%s] InitiateContext [view:%s], [ContextID:%s]\n", id, logging.Identifier(view), c.ID())
202201

203-
return childContext, nil
202+
return c, nil
204203
}
205204

206205
func (cm *Manager) Start(ctx context.Context) {
@@ -222,14 +221,15 @@ func (cm *Manager) Start(ctx context.Context) {
222221
}
223222
}
224223

224+
// Context returns a view.Context for a given contextID. If the context does not exist, an error is returned.
225225
func (cm *Manager) Context(contextID string) (view.Context, error) {
226-
cm.contextsSync.RLock()
227-
defer cm.contextsSync.RUnlock()
228-
context, ok := cm.contexts[contextID]
226+
cm.contextsMu.RLock()
227+
defer cm.contextsMu.RUnlock()
228+
viewCtx, ok := cm.contexts[contextID]
229229
if !ok {
230230
return nil, errors.Errorf("context %s not found", contextID)
231231
}
232-
return context, nil
232+
return viewCtx, nil
233233
}
234234

235235
func (cm *Manager) ResolveIdentities(endpoints ...string) ([]view.Identity, error) {
@@ -254,55 +254,61 @@ func (cm *Manager) ExistResponderForCaller(caller string) (view.View, view.Ident
254254
}
255255

256256
// respond executes a given responder view
257-
// the caller is responsible to use the cleanup method to free any resources
258-
func (cm *Manager) respond(responder view.View, id view.Identity, msg *view.Message) (ctx view.Context, res interface{}, cleanup func(), err error) {
257+
func (cm *Manager) respond(responder view.View, id view.Identity, msg *view.Message) (err error) {
259258
defer func() {
260259
if r := recover(); r != nil {
261260
logger.Errorf("respond triggered panic: %s\n%s\n", r, debug.Stack())
262261
err = errors.Errorf("failed responding [%s]", r)
263262
}
264263
}()
265264

266-
cleanup = func() {}
267-
268265
// get context
269-
var isNew bool
270-
ctx, isNew, err = cm.newContext(id, msg)
266+
viewCtx, isNew, err := cm.newContext(id, msg)
271267
if err != nil {
272-
return nil, nil, cleanup, errors.WithMessagef(err, "failed getting context for [%s,%s,%v]", msg.ContextID, id, msg)
268+
return errors.WithMessagef(err, "failed getting context for [%s,%s,%v]", msg.ContextID, id, msg)
273269
}
274270

275-
logger.DebugfContext(ctx.Context(), "[%s] Respond [from:%s], [sessionID:%s], [contextID:%s](%v), [view:%s]", id, msg.FromEndpoint, msg.SessionID, msg.ContextID, isNew, logging.Identifier(responder))
271+
logger.DebugfContext(viewCtx.Context(), "[%s] Respond [from:%s], [sessionID:%s], [contextID:%s](%v), [view:%s]", id, msg.FromEndpoint, msg.SessionID, msg.ContextID, isNew, logging.Identifier(responder))
276272

277273
// if a new context has been created to run the responder,
278274
// then dispose the context when not needed anymore
279275
if isNew {
280-
cleanup = func() {
281-
cm.deleteContext(id, ctx.ID())
282-
}
276+
defer cm.deleteContext(id, viewCtx.ID())
283277
}
284278

285279
// run view
286-
res, err = ctx.RunView(responder)
280+
_, err = viewCtx.RunView(responder)
287281
if err != nil {
288-
logger.DebugfContext(ctx.Context(), "[%s] Respond Failure [from:%s], [sessionID:%s], [contextID:%s] [%s]\n", id, msg.FromEndpoint, msg.SessionID, msg.ContextID, err)
282+
logger.DebugfContext(viewCtx.Context(), "[%s] Respond Failure [from:%s], [sessionID:%s], [contextID:%s] [%s]\n", id, msg.FromEndpoint, msg.SessionID, msg.ContextID, err)
283+
284+
// try to send error back to caller
285+
if err = viewCtx.Session().SendError([]byte(err.Error())); err != nil {
286+
logger.Error(err.Error())
287+
}
289288
}
290289

291-
return ctx, res, cleanup, err
290+
return nil
292291
}
293292

294293
func (cm *Manager) newContext(id view.Identity, msg *view.Message) (view.Context, bool, error) {
295-
cm.contextsSync.Lock()
296-
defer cm.contextsSync.Unlock()
294+
cm.contextsMu.Lock()
295+
defer cm.contextsMu.Unlock()
297296

297+
// get the caller identity
298298
caller, err := cm.endpointService.GetIdentity(msg.FromEndpoint, msg.FromPKID)
299299
if err != nil {
300300
return nil, false, err
301301
}
302302

303303
contextID := msg.ContextID
304+
305+
// check if a viewContext already exists for the given contextID
304306
viewContext, ok := cm.contexts[contextID]
305307
if ok && viewContext.Session() != nil && viewContext.Session().Info().ID != msg.SessionID {
308+
// this case covers the situation where we already have an existing context between two nodes and a new session
309+
// is established.
310+
// An example for this is given in the token SDK where a node has two identities, an auditor identity and an issuer identity.
311+
306312
if logger.IsEnabledFor(zapcore.DebugLevel) {
307313
logger.DebugfContext(viewContext.Context(),
308314
"[%s] Found context with different session id, recreate [contextID:%s, sessionIds:%s,%s]\n",
@@ -312,22 +318,49 @@ func (cm *Manager) newContext(id view.Identity, msg *view.Message) (view.Context
312318
viewContext.Session().Info().ID,
313319
)
314320
}
315-
viewContext.Dispose()
316-
delete(cm.contexts, contextID)
321+
322+
// we create a new session with the ID we received
323+
backend, err := cm.commLayer.NewSessionWithID(msg.SessionID, contextID, msg.FromEndpoint, msg.FromPKID, caller, msg)
324+
if err != nil {
325+
return nil, false, err
326+
}
327+
328+
// next we need to unwrap the actual context to store the session
329+
vCtx, ok := viewContext.(*childContext)
330+
if !ok {
331+
panic("Not a child!")
332+
}
333+
334+
vvCtx, ok := vCtx.ParentContext.(*ctx)
335+
if !ok {
336+
panic("Not a child!")
337+
}
338+
// TODO: replace this with `vCtx.PutSession`, however, that method requires a view as input but we only have the viewID
339+
vvCtx.sessions.Put(msg.Caller, caller, backend)
340+
341+
// we wrap our context and set our new session as the default session
342+
c := &childContext{
343+
ParentContext: vCtx,
344+
session: backend,
345+
}
346+
cm.contexts[contextID] = c
317347
cm.metrics.Contexts.Set(float64(len(cm.contexts)))
318-
ok = false
348+
349+
return c, false, nil
319350
}
320351
if ok {
321352
logger.DebugfContext(viewContext.Context(), "[%s] No new context to respond, reuse [contextID:%s]\n", id, msg.ContextID)
322353
return viewContext, false, nil
323354
}
324355

356+
// next we continue with creating a new context
325357
logger.Debugf("[%s] Create new context to respond [contextID:%s]\n", id, msg.ContextID)
326358
backend, err := cm.commLayer.NewSessionWithID(msg.SessionID, contextID, msg.FromEndpoint, msg.FromPKID, caller, msg)
327359
if err != nil {
328360
return nil, false, err
329361
}
330-
ctx := trace.ContextWithSpanContext(cm.ctx, trace.SpanContextFromContext(msg.Ctx))
362+
363+
ctx := trace.ContextWithSpanContext(context.Background(), trace.SpanContextFromContext(msg.Ctx))
331364
newCtx, err := NewContext(
332365
ctx,
333366
cm.serviceProvider,
@@ -344,59 +377,45 @@ func (cm *Manager) newContext(id view.Identity, msg *view.Message) (view.Context
344377
if err != nil {
345378
return nil, false, err
346379
}
347-
childContext := &childContext{ParentContext: newCtx}
348-
cm.contexts[contextID] = childContext
380+
381+
c := &childContext{ParentContext: newCtx}
382+
cm.contexts[contextID] = c
349383
cm.metrics.Contexts.Set(float64(len(cm.contexts)))
350-
viewContext = childContext
384+
viewContext = c
351385

352386
return viewContext, true, nil
353387
}
354388

389+
// deleteContext removes a context from the manager and calls Dispose on the context.
355390
func (cm *Manager) deleteContext(id view.Identity, contextID string) {
356-
cm.contextsSync.Lock()
357-
defer cm.contextsSync.Unlock()
391+
cm.contextsMu.Lock()
392+
defer cm.contextsMu.Unlock()
358393

359394
logger.Debugf("[%s] Delete context [contextID:%s]\n", id, contextID)
360395
// dispose context
361-
if context, ok := cm.contexts[contextID]; ok {
362-
context.Dispose()
396+
if viewCtx, ok := cm.contexts[contextID]; ok {
397+
viewCtx.Dispose()
363398
delete(cm.contexts, contextID)
364399
cm.metrics.Contexts.Set(float64(len(cm.contexts)))
365400
}
366401
}
367402

368-
func (cm *Manager) existResponder(msg *view.Message) (view.View, view.Identity, error) {
369-
return cm.ExistResponderForCaller(msg.Caller)
370-
}
371-
403+
// callView is meant to be used to invoke a view via the p2p comm stack
372404
func (cm *Manager) callView(msg *view.Message) {
373405
logger.Debugf("Will call responder view for context [%s]", msg.ContextID)
374-
responder, id, err := cm.existResponder(msg)
406+
responder, id, err := cm.ExistResponderForCaller(msg.Caller)
375407
if err != nil {
376-
// TODO: No responder exists for this message
377-
// Let's cache it for a while an re-post
408+
// dropping message
378409
logger.Errorf("[%s] No responder exists for [%s]: [%s]", cm.me(), msg.String(), err)
379410
return
380411
}
381412
if id.IsNone() {
382413
id = cm.me()
383414
}
384415

385-
ctx, _, cleanup, err := cm.respond(responder, id, msg)
386-
defer cleanup()
387-
if err != nil {
388-
logger.Errorf("failed responding [%v, %v], err: [%s]", logging.Identifier(responder), msg.String(), err)
389-
if ctx == nil {
390-
logger.Debugf("no context set, returning")
391-
return
392-
}
393-
394-
// Return the error to the caller
395-
logger.Debugf("return the error to the caller [%s]", err)
396-
err = ctx.Session().SendError([]byte(err.Error()))
397-
if err != nil {
398-
logger.Error(err.Error())
399-
}
416+
if err := cm.respond(responder, id, msg); err != nil {
417+
logger.Errorf("[%s] error during respond [%s]", cm.me(), err)
418+
return
400419
}
401420
}
402421

@@ -405,14 +424,14 @@ func (cm *Manager) me() view.Identity {
405424
}
406425

407426
func (cm *Manager) getCurrentContext() context.Context {
408-
cm.contextsSync.Lock()
427+
cm.contextsMu.Lock()
409428
ctx := cm.ctx
410-
cm.contextsSync.Unlock()
429+
cm.contextsMu.Unlock()
411430
return ctx
412431
}
413432

414433
func (cm *Manager) setCurrentContext(ctx context.Context) {
415-
cm.contextsSync.Lock()
434+
cm.contextsMu.Lock()
416435
cm.ctx = ctx
417-
cm.contextsSync.Unlock()
436+
cm.contextsMu.Unlock()
418437
}

0 commit comments

Comments
 (0)