forked from taggledevel2/ratchet
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconcurrent_data_processor.go
More file actions
131 lines (120 loc) · 3.64 KB
/
Copy pathconcurrent_data_processor.go
File metadata and controls
131 lines (120 loc) · 3.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package ratchet
import (
"container/list"
"sync"
"github.com/rhansen2/ratchet/data"
"github.com/rhansen2/ratchet/logger"
)
// ConcurrentDataProcessor is a DataProcessor that also defines
// a level of concurrency. For example, if Concurrency() returns 2,
// then the pipeline will allow the stage to execute up to 2 ProcessData()
// calls concurrently.
//
// Note that the order of data processing is maintained, meaning that
// when a DataProcessor receives ProcessData calls d1, d2, ..., the resulting data
// payloads sent on the outputChan will be sent in the same order as received.
type ConcurrentDataProcessor interface {
DataProcessor
Concurrency() int
}
// IsConcurrent returns true if the given DataProcessor implements ConcurrentDataProcessor
func isConcurrent(p DataProcessor) bool {
_, ok := interface{}(p).(ConcurrentDataProcessor)
return ok
}
// dataProcessor embeds concurrentDataProcessor
type concurrentDataProcessor struct {
concurrency int
workThrottle chan workSignal
workList *list.List
doneChan chan bool
inputClosed bool
sync.Mutex
}
type workSignal struct{}
type result struct {
done bool
data []data.JSON
outputChan chan data.JSON
open bool
}
func (dp *dataProcessor) processData(d data.JSON, killChan chan error) {
logger.Debug("dataProcessor: processData", dp, "with concurrency =", dp.concurrency)
// If no concurrency is needed, simply call stage.ProcessData and return...
if dp.concurrency <= 1 {
dp.recordExecution(func() {
dp.ProcessData(d, dp.outputChan, killChan, dp.ctx)
})
return
}
// ... otherwise process the data in a concurrent queue/pool of goroutines
logger.Debug("dataProcessor: processData", dp, "waiting for work")
// wait for room in the queue
dp.workThrottle <- workSignal{}
logger.Debug("dataProcessor: processData", dp, "work obtained")
rc := make(chan data.JSON)
done := make(chan bool)
exit := make(chan bool)
// setup goroutine to handle result
go func() {
res := result{outputChan: dp.outputChan, data: []data.JSON{}, open: true}
dp.Lock()
dp.workList.PushBack(&res)
dp.Unlock()
logger.Debug("dataProcessor: processData", dp, "waiting to receive data on result chan")
for {
select {
case d, open := <-rc:
logger.Debug("dataProcessor: processData", dp, "received data on result chan")
res.data = append(res.data, d)
// outputChan will need to be closed if the rc chan was closed
res.open = open
case <-done:
res.done = true
logger.Debug("dataProcessor: processData", dp, "done, releasing work")
<-dp.workThrottle
dp.sendResults()
exit <- true
return
case <-dp.ctx.Done():
exit <- true
return
}
}
}()
// do normal data processing, passing in new result chan
// instead of the original outputChan
go dp.recordExecution(func() {
dp.ProcessData(d, rc, killChan, dp.ctx)
select {
case done <- true:
case <-dp.ctx.Done():
}
})
// wait on processing to complete
<-exit
}
// sendResults handles sending work that is completed, as well as
// guaranteeing a FIFO order of the resulting data sent over the
// original outputChan.
func (dp *dataProcessor) sendResults() {
dp.Lock()
logger.Debug("dataProcessor: sendResults checking for valid data to send")
e := dp.workList.Front()
for e != nil && e.Value.(*result).done {
logger.Debug("dataHandler: sendResults sending data")
res := dp.workList.Remove(e).(*result)
for _, d := range res.data {
res.outputChan <- d
}
if !res.open {
logger.Debug("dataProcessor: sendResults closing outputChan")
close(res.outputChan)
}
e = dp.workList.Front()
}
dp.Unlock()
if dp.inputClosed && dp.workList.Len() == 0 {
dp.doneChan <- true
}
}