Skip to content

Commit da6403e

Browse files
authored
Revert "health: Create API to wait for a stream to become active (#180)" (#181)
This reverts commit 5ab219c.
1 parent 5ab219c commit da6403e

File tree

5 files changed

+15
-97
lines changed

5 files changed

+15
-97
lines changed

api/handler.go

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -98,19 +98,14 @@ func (h *apiHandler) streamHealthHandler() chi.Router {
9898
if opts.AuthURL != "" {
9999
router.Use(authorization(opts.AuthURL))
100100
}
101-
102-
regionalMiddlewares := []middleware{
101+
router.Use(
103102
streamStatus(healthcore),
104-
regionProxy(opts.RegionalHostFormat, opts.OwnRegion),
105-
}
103+
regionProxy(opts.RegionalHostFormat, opts.OwnRegion))
104+
106105
h.withMetrics(router, "get_stream_health").
107-
With(regionalMiddlewares...).
108106
MethodFunc("GET", "/health", h.getStreamHealth)
109107
h.withMetrics(router, "stream_health_events").
110-
With(regionalMiddlewares...).
111108
MethodFunc("GET", "/events", h.subscribeEvents)
112-
h.withMetrics(router, "wait_stream_active").
113-
MethodFunc("GET", "/wait-active", h.waitStreamActive)
114109

115110
return router
116111
}
@@ -563,22 +558,6 @@ func (h *apiHandler) subscribeEvents(rw http.ResponseWriter, r *http.Request) {
563558
}
564559
}
565560

566-
func (h *apiHandler) waitStreamActive(rw http.ResponseWriter, r *http.Request) {
567-
if h.core == nil {
568-
respondError(rw, http.StatusNotImplemented, errors.New("stream healthcore is unavailable"))
569-
return
570-
}
571-
572-
streamID := apiParam(r, streamIDParam)
573-
err := h.core.WaitActive(r.Context(), streamID)
574-
if err != nil {
575-
respondError(rw, http.StatusInternalServerError, err)
576-
return
577-
}
578-
579-
rw.WriteHeader(http.StatusNoContent)
580-
}
581-
582561
func makeSSEEventChan(ctx context.Context, pastEvents []data.Event, subscription <-chan data.Event) <-chan jsse.Event {
583562
if subscription == nil {
584563
events := make(chan jsse.Event, len(pastEvents))

health/core.go

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -191,20 +191,6 @@ func (c *Core) handleSingleEvent(evt data.Event) (err error) {
191191
glog.Warningf("Buffer full for health event subscription, skipping message. streamId=%q, eventTs=%q", streamID, ts)
192192
}
193193
}
194-
195-
for _, cond := range record.LastStatus.Conditions {
196-
if cond.Type != ConditionActive {
197-
continue
198-
}
199-
// We flag the record as initialized unless, from the received events,
200-
// we know for sure that the stream is inactive.
201-
isInactive := cond.Status != nil && !*cond.Status
202-
if !isInactive {
203-
record.FlagInitialized()
204-
}
205-
break
206-
}
207-
208194
return nil
209195
}
210196

@@ -261,16 +247,6 @@ func (c *Core) SubscribeEvents(ctx context.Context, manifestID string, lastEvtID
261247
return pastEvents, subs, nil
262248
}
263249

264-
func (c *Core) WaitActive(ctx context.Context, manifestID string) error {
265-
// We actually create the record here if it doesn't exist, so that we can
266-
// wait for it to be initialized.
267-
record := c.storage.GetOrCreate(manifestID, c.conditionTypes)
268-
if err := record.WaitInitialized(ctx); err != nil {
269-
return err
270-
}
271-
return nil
272-
}
273-
274250
func getPastEventsLocked(record *Record, lastEvtID *uuid.UUID, from, to *time.Time) ([]data.Event, error) {
275251
fromIdx, toIdx := 0, len(record.PastEvents)
276252
if lastEvtID != nil {

health/record.go

Lines changed: 7 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ type Record struct {
1616
Conditions []data.ConditionType
1717

1818
sync.RWMutex
19-
initialized chan struct{}
20-
disposed chan struct{}
19+
disposed chan struct{}
2120

2221
PastEvents []data.Event
2322
EventsByID map[uuid.UUID]data.Event
@@ -33,43 +32,11 @@ func NewRecord(id string, conditionTypes []data.ConditionType) *Record {
3332
conditions[i] = data.NewCondition(cond, time.Time{}, nil, nil)
3433
}
3534
return &Record{
36-
ID: id,
37-
Conditions: conditionTypes,
38-
initialized: make(chan struct{}),
39-
disposed: make(chan struct{}),
40-
EventsByID: map[uuid.UUID]data.Event{},
41-
LastStatus: data.NewHealthStatus(id, conditions),
42-
}
43-
}
44-
45-
// FlagInitialized will flag the record as initialized. It is meant to be called
46-
// after the first event is processed, meaning the record is not empty anymore.
47-
//
48-
// This is used to allow waiting until a stream is started by creating its
49-
// record in an uninitialized state first and calling `WaitInitialized`. The
50-
// initialization flag is simply a channel that is closed, which will unblock
51-
// all goroutines waiting to receive from it (`WaitInitialized`).
52-
func (r *Record) FlagInitialized() {
53-
if !r.IsInitialized() {
54-
close(r.initialized)
55-
}
56-
}
57-
58-
func (r *Record) IsInitialized() bool {
59-
select {
60-
case <-r.initialized:
61-
return true
62-
default:
63-
return false
64-
}
65-
}
66-
67-
func (r *Record) WaitInitialized(ctx context.Context) error {
68-
select {
69-
case <-r.initialized:
70-
return nil
71-
case <-ctx.Done():
72-
return ctx.Err()
35+
ID: id,
36+
Conditions: conditionTypes,
37+
disposed: make(chan struct{}),
38+
EventsByID: map[uuid.UUID]data.Event{},
39+
LastStatus: data.NewHealthStatus(id, conditions),
7340
}
7441
}
7542

@@ -135,10 +102,7 @@ func (s *RecordStorage) StartCleanupLoop(ctx context.Context, ttl time.Duration)
135102

136103
func (s *RecordStorage) Get(id string) (*Record, bool) {
137104
if saved, ok := s.records.Load(id); ok {
138-
// Until Initialize is called, the record is considered inexistent
139-
if record := saved.(*Record); record.IsInitialized() {
140-
return record, true
141-
}
105+
return saved.(*Record), true
142106
}
143107
return nil, false
144108
}

health/reducer.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"github.com/livepeer/livepeer-data/pkg/event"
66
)
77

8-
const ConditionActive data.ConditionType = "Active"
9-
108
type Reducer interface {
119
Bindings() []event.BindingArgs
1210
Conditions() []data.ConditionType

health/reducers/stream_state.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ import (
44
"time"
55

66
"github.com/golang/glog"
7-
"github.com/livepeer/livepeer-data/health"
87
"github.com/livepeer/livepeer-data/pkg/data"
98
"github.com/livepeer/livepeer-data/pkg/event"
109
)
1110

1211
const (
1312
streamStateBindingKey = "stream.state.#"
13+
14+
ConditionActive data.ConditionType = "Active"
1415
)
1516

1617
type ActiveConditionExtraData struct {
@@ -27,7 +28,7 @@ func (t StreamStateReducer) Bindings() []event.BindingArgs {
2728
}
2829

2930
func (t StreamStateReducer) Conditions() []data.ConditionType {
30-
return []data.ConditionType{health.ConditionActive}
31+
return []data.ConditionType{ConditionActive}
3132
}
3233

3334
func (t StreamStateReducer) Reduce(current *data.HealthStatus, _ interface{}, evtIface data.Event) (*data.HealthStatus, interface{}) {
@@ -54,7 +55,7 @@ func (t StreamStateReducer) Reduce(current *data.HealthStatus, _ interface{}, ev
5455
current = data.NewHealthStatus(current.ID, conditions)
5556
}
5657
for i, cond := range conditions {
57-
if cond.Type == health.ConditionActive {
58+
if cond.Type == ConditionActive {
5859
newCond := data.NewCondition(cond.Type, evt.Timestamp(), &isActive, cond)
5960
newCond.ExtraData = ActiveConditionExtraData{NodeID: evt.NodeID, Region: evt.Region}
6061
conditions[i] = newCond
@@ -74,7 +75,7 @@ func clearConditions(conditions []*data.Condition) []*data.Condition {
7475
}
7576

7677
func GetLastActiveData(status *data.HealthStatus) ActiveConditionExtraData {
77-
data, ok := status.Condition(health.ConditionActive).ExtraData.(ActiveConditionExtraData)
78+
data, ok := status.Condition(ConditionActive).ExtraData.(ActiveConditionExtraData)
7879
if !ok {
7980
return ActiveConditionExtraData{}
8081
}

0 commit comments

Comments
 (0)