Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions agreement/demux.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,6 @@ func (d *demux) next(s *Service, deadline time.Duration, fastDeadline time.Durat
speculationDeadlineCh = s.Clock.TimeoutAt(speculationDeadline)
}

//d.log.Infof("demux deadline %d, fastD %d, specD %d, d.monitor %v", deadline, fastDeadline, speculationDeadline, d.monitor) // not threadsafe in some tests

d.UpdateEventsQueue(eventQueueDemux, 0)
d.monitor.dec(demuxCoserviceType)

Expand Down
3 changes: 2 additions & 1 deletion agreement/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ const (
// period.
//
// fastTimeout is like timeout but for fast partition recovery.
//
// speculation timeout marks when the player should start speculative
// block assembly.
timeout
Expand Down Expand Up @@ -374,7 +375,7 @@ func (e roundInterruptionEvent) AttachConsensusVersion(v ConsensusVersionView) e
}

type timeoutEvent struct {
// {timeout,fastTimeout,speculationTimeout}
// T in {timeout,fastTimeout,speculationTimeout}
T eventType

RandomEntropy uint64
Expand Down
10 changes: 5 additions & 5 deletions agreement/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (p *player) handle(r routerHandle, e event) []action {
}
}

// handleSpeculationTimeout TODO: rename this 'timeout' is the START of speculative assembly.
// handleSpeculationTimeout is when to _start_ speculative assembly.
func (p *player) handleSpeculationTimeout(r routerHandle, e timeoutEvent) []action {
if e.Proto.Err != nil {
r.t.log.Errorf("failed to read protocol version for speculationTimeout event (proto %v): %v", e.Proto.Version, e.Proto.Err)
Expand Down Expand Up @@ -178,7 +178,6 @@ func (p *player) issueSoftVote(r routerHandle) (actions []action) {
// If we arrive due to fast-forward/soft threshold; then answer.Bottom = false and answer.Proposal = bottom
// and we should soft-vote normally (not based on the starting value)
a.Proposal = nextStatus.Proposal
// TODO: how do we speculative block assemble based on nextStatus.Proposal?
return append(actions, a)
}

Expand Down Expand Up @@ -626,9 +625,10 @@ func (p *player) handleMessageEvent(r routerHandle, e messageEvent) (actions []a
actions = append(actions, a)
}

// StartSpeculativeBlockAssembly every time we validate a proposal
// TODO: maybe only do this if after speculation has started; interrupt speculation on a block when we get a better block
// TODO: maybe don't do this at all and just delete it?
// StartSpeculativeBlockAssembly when we validate a
// proposal, but only re-start when we're finding a
// better proposal than what we had when we started
// due to timer.
if ef.t() == payloadAccepted {
actions = p.startSpeculativeBlockAsm(r, actions, true)
}
Expand Down
8 changes: 0 additions & 8 deletions agreement/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func (s *Service) Start() {
input := make(chan externalEvent)
output := make(chan []action)
ready := make(chan externalDemuxSignals)
// TODO: use demuxOne() inside of mainLoop() instead of this nonsense pair of threads
go s.demuxLoop(ctx, input, output, ready)
go s.mainLoop(input, output, ready)
}
Expand Down Expand Up @@ -181,12 +180,6 @@ func (s *Service) demuxLoop(ctx context.Context, input chan<- externalEvent, out
close(s.done)
}

// TODO: use demuxOne() inside mainLoop() instead of having a pair of synchronous go threads trading off via chan
func (s *Service) demuxOne(ctx context.Context, a []action, extSignals externalDemuxSignals) (e externalEvent, ok bool) {
s.do(ctx, a)
return s.demux.next(s, extSignals.Deadline, extSignals.FastRecoveryDeadline, extSignals.SpeculativeBlockAsmDeadline, extSignals.CurrentRound)
}

// mainLoop drives the state machine.
//
// After possibly restoring from disk and then initializing, it does the following in a loop:
Expand Down Expand Up @@ -241,7 +234,6 @@ func (s *Service) mainLoop(input <-chan externalEvent, output chan<- []action, r
// set speculative block assembly based on the current local configuration
specClock := SpeculativeBlockAsmTime(status.Period, status.ConsensusVersion, s.parameters.Local.SpeculativeAsmTimeOffset)

// TODO: e, ok := s.demuxOne(ctx, a, externalDemuxSignals{Deadline: status.Deadline, FastRecoveryDeadline: status.FastRecoveryDeadline, SpeculativeBlockAsmDeadline: specClock, CurrentRound: status.Round})
output <- a
ready <- externalDemuxSignals{Deadline: status.Deadline, FastRecoveryDeadline: status.FastRecoveryDeadline, SpeculativeBlockAsmDeadline: specClock, CurrentRound: status.Round}
e, ok := <-input
Expand Down
Loading