@@ -34,6 +34,8 @@ func NewGossipActor(requestTimeout time.Duration, myID string, getBlockedMembers
3434// Receive method.
3535func (ga * GossipActor ) Receive (ctx actor.Context ) {
3636 switch r := ctx .Message ().(type ) {
37+ case * actor.Started :
38+ //pass
3739 case * SetGossipStateKey :
3840 ga .onSetGossipStateKey (r , ctx )
3941 case * GetGossipStateRequest :
@@ -49,7 +51,7 @@ func (ga *GossipActor) Receive(ctx actor.Context) {
4951 case * ClusterTopology :
5052 ga .onClusterTopology (r )
5153 case * GossipResponse :
52- // noop: review after roger's work is done
54+ plog . Error ( "GossipResponse should not be received by GossipActor" ) //it should be a response to a request
5355 default :
5456 plog .Warn ("Gossip received unknown message request" , log .Message (r ))
5557 }
@@ -74,7 +76,7 @@ func (ga *GossipActor) onGetGossipStateKey(r *GetGossipStateRequest, ctx actor.C
7476}
7577
7678func (ga * GossipActor ) onGossipRequest (r * GossipRequest , ctx actor.Context ) {
77- plog .Debug ("Gossip request " , log .PID ("sender" , ctx .Sender ()))
79+ plog .Debug ("OnGossipRequest " , log .PID ("sender" , ctx .Sender ()))
7880 ga .ReceiveState (r .State , ctx )
7981
8082 if ! GetCluster (ctx .ActorSystem ()).MemberList .ContainsMemberID (r .MemberId ) {
@@ -97,26 +99,36 @@ func (ga *GossipActor) onGossipRequest(r *GossipRequest, ctx actor.Context) {
9799 return
98100 }
99101
100- msg := GossipResponse {
101- State : memberState .State ,
102- }
103- future := ctx .RequestFuture (ctx .Sender (), & msg , GetCluster (ctx .ActorSystem ()).Config .GossipRequestTimeout )
104-
105- // wait until we get a response or an error from the future
106- resp , err := future .Result ()
107- if err != nil {
108- plog .Error ("onSendGossipState failed" , log .Error (err ))
109-
110- return
111- }
112-
113- if _ , ok := resp .(* GossipResponseAck ); ok {
114- memberState .CommitOffsets ()
115-
116- return
117- }
118-
119- plog .Error ("onSendGossipState received unknown response message" , log .Message (r ))
102+ ctx .Respond (& GossipResponse {})
103+ return
104+
105+ //turn off acking for now
106+
107+ //msg := GossipResponse{
108+ // State: memberState.State,
109+ //}
110+ //future := ctx.RequestFuture(ctx.Sender(), &msg, GetCluster(ctx.ActorSystem()).Config.GossipRequestTimeout)
111+ //
112+ //ctx.ReenterAfter(future, func(res interface{}, err error) {
113+ // if err != nil {
114+ // plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err))
115+ // return
116+ // }
117+ //
118+ // if _, ok := res.(*GossipResponseAck); ok {
119+ // memberState.CommitOffsets()
120+ // return
121+ // }
122+ //
123+ // m, ok := res.(proto.Message)
124+ // if !ok {
125+ // plog.Warn("onGossipRequest failed", log.String("MemberId", r.MemberId), log.Error(err))
126+ // return
127+ // }
128+ // n := string(proto.MessageName(m).Name())
129+ //
130+ // plog.Error("onGossipRequest received unknown response message", log.String("type", n), log.Message(r))
131+ //})
120132}
121133
122134func (ga * GossipActor ) onSetGossipStateKey (r * SetGossipStateKey , ctx actor.Context ) {
@@ -145,43 +157,39 @@ func (ga *GossipActor) ReceiveState(remoteState *GossipState, ctx actor.Context)
145157
146158func (ga * GossipActor ) sendGossipForMember (member * Member , memberStateDelta * MemberStateDelta , ctx actor.Context ) {
147159 pid := actor .NewPID (member .Address (), DefaultGossipActorName )
148- plog .Info ("Sending GossipRequest" , log .String ("MemberId" , member .Id ))
160+ plog .Debug ("Sending GossipRequest" , log .String ("MemberId" , member .Id ))
149161
150162 // a short timeout is massively important, we cannot afford hanging around waiting
151163 // for timeout, blocking other gossips from getting through
152164
153165 msg := GossipRequest {
154- // TODO: Uncomment this line when we replace the current "address:port" as ID
155- // with the proper ActorSystem.ID after new API refactor changes
156- // Oscar Campos: 2022-04-09
157- // MemberId: ctx.ActorSystem().ID,
158- MemberId : member .Address (),
166+ MemberId : member .Id ,
159167 State : memberStateDelta .State ,
160168 }
161169 future := ctx .RequestFuture (pid , & msg , ga .gossipRequestTimeout )
162170
163- // wait until we get a response or an error from the future
164- r , err := future . Result ()
165- if err != nil {
166- plog . Error ( "onSendGossipState failed" , log . Error ( err ))
171+ ctx . ReenterAfter ( future , func ( res interface {}, err error ) {
172+ if ctx . Sender () != nil {
173+ ctx . Send ( ctx . Sender (), & GossipResponseAck {})
174+ }
167175
168- return
169- }
176+ if err != nil {
177+ plog .Warn ("sendGossipForMember failed" , log .String ("MemberId" , member .Id ), log .Error (err ))
178+ return
179+ }
170180
171- resp , ok := r .(* GossipResponse )
172- if ! ok {
173- plog .Error ("onSendGossipState received unknown response message" , log .Message (r ))
181+ resp , ok := res .(* GossipResponse )
182+ if ! ok {
183+ plog .Error ("sendGossipForMember received unknown response message" , log .Message (resp ))
174184
175- return
176- }
185+ return
186+ }
177187
178- memberStateDelta .CommitOffsets ()
188+ memberStateDelta .CommitOffsets ()
179189
180- if resp .State != nil {
181- ga .ReceiveState (resp .State , ctx )
190+ if resp .State != nil {
191+ ga .ReceiveState (resp .State , ctx )
182192
183- if ctx .Sender () != nil {
184- ctx .Send (ctx .Sender (), & GossipResponseAck {})
185193 }
186- }
194+ })
187195}
0 commit comments