Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 37 additions & 53 deletions src/core/Akka.Remote/RemoteActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -681,38 +681,11 @@ public Internals(RemoteTransport transport, Akka.Serialization.Serialization ser

#region RemotingTerminator

/// <summary>
/// Describes the FSM states of the <see cref="RemotingTerminator"/>
/// </summary>
private enum TerminatorState
{
/// <summary>
/// TBD
/// </summary>
Uninitialized,
/// <summary>
/// TBD
/// </summary>
Idle,
/// <summary>
/// TBD
/// </summary>
WaitDaemonShutdown,
/// <summary>
/// TBD
/// </summary>
WaitTransportShutdown,
/// <summary>
/// TBD
/// </summary>
Finished
}

/// <summary>
/// Responsible for shutting down the <see cref="RemoteDaemon"/> and all transports
/// when the <see cref="ActorSystem"/> is being shutdown.
/// </summary>
private class RemotingTerminator : FSM<TerminatorState, Internals>, IRequiresMessageQueue<IUnboundedMessageQueueSemantics>
private class RemotingTerminator : ActorBase, IRequiresMessageQueue<IUnboundedMessageQueueSemantics>
{
private readonly IActorRef _systemGuardian;
private readonly ILoggingAdapter _log;
Expand All @@ -723,56 +696,67 @@ public RemotingTerminator(IActorRef systemGuardian)

// can't use normal Logger.GetLogger(this IActorContext) here due to https://github.com/akkadotnet/akka.net/issues/4530
_log = Logging.GetLogger(Context.System.EventStream, "remoting-terminator");
InitFSM();
Become(Uninitialized());
}

private void InitFSM()
protected override bool Receive(object message) => EmptyReceive(message);

private Receive Uninitialized()
{
When(TerminatorState.Uninitialized, @event =>
return (object message) =>
{
if (@event.FsmEvent is Internals internals)
if (message is Internals internals)
{
_systemGuardian.Tell(RegisterTerminationHook.Instance);
return GoTo(TerminatorState.Idle).Using(internals);
Become(Idle(internals));
}
return null;
});
return false;
};
}

When(TerminatorState.Idle, @event =>
private Receive Idle(Internals internals)
{
return (object message) =>
{
if (@event.StateData != null && @event.FsmEvent is TerminationHook)
if (message is TerminationHook)
{
_log.Info("Shutting down remote daemon.");
@event.StateData.RemoteDaemon.Tell(TerminationHook.Instance);
return GoTo(TerminatorState.WaitDaemonShutdown);
internals.RemoteDaemon.Tell(TerminationHook.Instance);
Become(WaitDaemonShutdown(internals));
return true;
}
return null;
});
return false;
};
}

private Receive WaitDaemonShutdown(Internals internals)
{
// TODO: state timeout
When(TerminatorState.WaitDaemonShutdown, @event =>
return (object message) =>
{
if (@event.StateData != null && @event.FsmEvent is TerminationHookDone)
if (message is TerminationHookDone)
{
_log.Info("Remote daemon shut down; proceeding with flushing remote transports.");
@event.StateData.Transport.Shutdown()
internals.Transport.Shutdown()
.ContinueWith(_ => TransportShutdown.Instance,
TaskContinuationOptions.ExecuteSynchronously)
.PipeTo(Self);
return GoTo(TerminatorState.WaitTransportShutdown);
Become(WaitTransportShutdown());
return true;
}
return false;
};
}

return null;
});

When(TerminatorState.WaitTransportShutdown, _ =>
private Receive WaitTransportShutdown()
{
return (object message) =>
{
_log.Info("Remoting shut down.");
_systemGuardian.Tell(TerminationHookDone.Instance);
return Stop();
});

StartWith(TerminatorState.Uninitialized, null);
Context.Stop(Self);
return true;
};
}

public sealed class TransportShutdown
Expand Down
Loading