@@ -5,21 +5,26 @@ package scraperhelper // import "go.opentelemetry.io/collector/scraper/scraperhe
55
66import (
77 "context"
8- "sync"
98 "time"
109
11- "go.uber.org/multierr"
12-
1310 "go.opentelemetry.io/collector/component"
1411 "go.opentelemetry.io/collector/consumer"
1512 "go.opentelemetry.io/collector/pdata/plog"
1613 "go.opentelemetry.io/collector/pdata/pmetric"
1714 "go.opentelemetry.io/collector/receiver"
18- "go.opentelemetry.io/collector/receiver/receiverhelper"
1915 "go.opentelemetry.io/collector/scraper"
2016 "go.opentelemetry.io/collector/scraper/scrapererror"
17+ "go.opentelemetry.io/collector/scraper/scraperhelper/internal/controller"
2118)
2219
20+ type ControllerConfig = controller.ControllerConfig
21+
22+ // NewDefaultControllerConfig returns default scraper controller
23+ // settings with a collection interval of one minute.
24+ func NewDefaultControllerConfig () ControllerConfig {
25+ return controller .NewDefaultControllerConfig ()
26+ }
27+
2328// ControllerOption apply changes to internal options.
2429type ControllerOption interface {
2530 apply (* controllerOptions )
@@ -75,111 +80,6 @@ type controllerOptions struct {
7580 factoriesWithConfig []factoryWithConfig
7681}
7782
78- type controller [T component.Component ] struct {
79- collectionInterval time.Duration
80- initialDelay time.Duration
81- timeout time.Duration
82-
83- scrapers []T
84- scrapeFunc func (* controller [T ])
85- tickerCh <- chan time.Time
86-
87- done chan struct {}
88- wg sync.WaitGroup
89-
90- obsrecv * receiverhelper.ObsReport
91- }
92-
93- func newController [T component.Component ](
94- cfg * ControllerConfig ,
95- rSet receiver.Settings ,
96- scrapers []T ,
97- scrapeFunc func (* controller [T ]),
98- tickerCh <- chan time.Time ,
99- ) (* controller [T ], error ) {
100- obsrecv , err := receiverhelper .NewObsReport (receiverhelper.ObsReportSettings {
101- ReceiverID : rSet .ID ,
102- Transport : "" ,
103- ReceiverCreateSettings : rSet ,
104- })
105- if err != nil {
106- return nil , err
107- }
108-
109- cs := & controller [T ]{
110- collectionInterval : cfg .CollectionInterval ,
111- initialDelay : cfg .InitialDelay ,
112- timeout : cfg .Timeout ,
113- scrapers : scrapers ,
114- scrapeFunc : scrapeFunc ,
115- done : make (chan struct {}),
116- tickerCh : tickerCh ,
117- obsrecv : obsrecv ,
118- }
119-
120- return cs , nil
121- }
122-
123- // Start the receiver, invoked during service start.
124- func (sc * controller [T ]) Start (ctx context.Context , host component.Host ) error {
125- for _ , scrp := range sc .scrapers {
126- if err := scrp .Start (ctx , host ); err != nil {
127- return err
128- }
129- }
130-
131- sc .startScraping ()
132- return nil
133- }
134-
135- // Shutdown the receiver, invoked during service shutdown.
136- func (sc * controller [T ]) Shutdown (ctx context.Context ) error {
137- // Signal the goroutine to stop.
138- close (sc .done )
139- sc .wg .Wait ()
140- var errs error
141- for _ , scrp := range sc .scrapers {
142- errs = multierr .Append (errs , scrp .Shutdown (ctx ))
143- }
144-
145- return errs
146- }
147-
148- // startScraping initiates a ticker that calls Scrape based on the configured
149- // collection interval.
150- func (sc * controller [T ]) startScraping () {
151- sc .wg .Add (1 )
152- go func () {
153- defer sc .wg .Done ()
154- if sc .initialDelay > 0 {
155- select {
156- case <- time .After (sc .initialDelay ):
157- case <- sc .done :
158- return
159- }
160- }
161-
162- if sc .tickerCh == nil {
163- ticker := time .NewTicker (sc .collectionInterval )
164- defer ticker .Stop ()
165-
166- sc .tickerCh = ticker .C
167- }
168- // Call scrape method during initialization to ensure
169- // that scrapers start from when the component starts
170- // instead of waiting for the full duration to start.
171- sc .scrapeFunc (sc )
172- for {
173- select {
174- case <- sc .tickerCh :
175- sc .scrapeFunc (sc )
176- case <- sc .done :
177- return
178- }
179- }
180- }()
181- }
182-
18383// NewLogsController creates a receiver.Logs with the configured options, that can control multiple scraper.Logs.
18484func NewLogsController (cfg * ControllerConfig ,
18585 rSet receiver.Settings ,
@@ -189,7 +89,7 @@ func NewLogsController(cfg *ControllerConfig,
18989 co := getOptions (options )
19090 scrapers := make ([]scraper.Logs , 0 , len (co .factoriesWithConfig ))
19191 for _ , fwc := range co .factoriesWithConfig {
192- set := getSettings (fwc .f .Type (), rSet )
92+ set := controller . GetSettings (fwc .f .Type (), rSet )
19393 s , err := fwc .f .CreateLogs (context .Background (), set , fwc .cfg )
19494 if err != nil {
19595 return nil , err
@@ -200,8 +100,8 @@ func NewLogsController(cfg *ControllerConfig,
200100 }
201101 scrapers = append (scrapers , s )
202102 }
203- return newController [scraper.Logs ](
204- cfg , rSet , scrapers , func (c * controller [scraper.Logs ]) { scrapeLogs (c , nextConsumer ) }, co .tickerCh )
103+ return controller . NewController [scraper.Logs ](
104+ cfg , rSet , scrapers , func (c * controller. Controller [scraper.Logs ]) { scrapeLogs (c , nextConsumer ) }, co .tickerCh )
205105}
206106
207107// NewMetricsController creates a receiver.Metrics with the configured options, that can control multiple scraper.Metrics.
@@ -213,7 +113,7 @@ func NewMetricsController(cfg *ControllerConfig,
213113 co := getOptions (options )
214114 scrapers := make ([]scraper.Metrics , 0 , len (co .factoriesWithConfig ))
215115 for _ , fwc := range co .factoriesWithConfig {
216- set := getSettings (fwc .f .Type (), rSet )
116+ set := controller . GetSettings (fwc .f .Type (), rSet )
217117 s , err := fwc .f .CreateMetrics (context .Background (), set , fwc .cfg )
218118 if err != nil {
219119 return nil , err
@@ -224,46 +124,46 @@ func NewMetricsController(cfg *ControllerConfig,
224124 }
225125 scrapers = append (scrapers , s )
226126 }
227- return newController [scraper.Metrics ](
228- cfg , rSet , scrapers , func (c * controller [scraper.Metrics ]) { scrapeMetrics (c , nextConsumer ) }, co .tickerCh )
127+ return controller . NewController [scraper.Metrics ](
128+ cfg , rSet , scrapers , func (c * controller. Controller [scraper.Metrics ]) { scrapeMetrics (c , nextConsumer ) }, co .tickerCh )
229129}
230130
231- func scrapeLogs (c * controller [scraper.Logs ], nextConsumer consumer.Logs ) {
232- ctx , done := withScrapeContext (c .timeout )
131+ func scrapeLogs (c * controller. Controller [scraper.Logs ], nextConsumer consumer.Logs ) {
132+ ctx , done := controller . WithScrapeContext (c .Timeout )
233133 defer done ()
234134
235135 logs := plog .NewLogs ()
236- for i := range c .scrapers {
237- md , err := c .scrapers [i ].ScrapeLogs (ctx )
136+ for i := range c .Scrapers {
137+ md , err := c .Scrapers [i ].ScrapeLogs (ctx )
238138 if err != nil && ! scrapererror .IsPartialScrapeError (err ) {
239139 continue
240140 }
241141 md .ResourceLogs ().MoveAndAppendTo (logs .ResourceLogs ())
242142 }
243143
244144 logRecordCount := logs .LogRecordCount ()
245- ctx = c .obsrecv .StartMetricsOp (ctx )
145+ ctx = c .Obsrecv .StartMetricsOp (ctx )
246146 err := nextConsumer .ConsumeLogs (ctx , logs )
247- c .obsrecv .EndMetricsOp (ctx , "" , logRecordCount , err )
147+ c .Obsrecv .EndMetricsOp (ctx , "" , logRecordCount , err )
248148}
249149
250- func scrapeMetrics (c * controller [scraper.Metrics ], nextConsumer consumer.Metrics ) {
251- ctx , done := withScrapeContext (c .timeout )
150+ func scrapeMetrics (c * controller. Controller [scraper.Metrics ], nextConsumer consumer.Metrics ) {
151+ ctx , done := controller . WithScrapeContext (c .Timeout )
252152 defer done ()
253153
254154 metrics := pmetric .NewMetrics ()
255- for i := range c .scrapers {
256- md , err := c .scrapers [i ].ScrapeMetrics (ctx )
155+ for i := range c .Scrapers {
156+ md , err := c .Scrapers [i ].ScrapeMetrics (ctx )
257157 if err != nil && ! scrapererror .IsPartialScrapeError (err ) {
258158 continue
259159 }
260160 md .ResourceMetrics ().MoveAndAppendTo (metrics .ResourceMetrics ())
261161 }
262162
263163 dataPointCount := metrics .DataPointCount ()
264- ctx = c .obsrecv .StartMetricsOp (ctx )
164+ ctx = c .Obsrecv .StartMetricsOp (ctx )
265165 err := nextConsumer .ConsumeMetrics (ctx , metrics )
266- c .obsrecv .EndMetricsOp (ctx , "" , dataPointCount , err )
166+ c .Obsrecv .EndMetricsOp (ctx , "" , dataPointCount , err )
267167}
268168
269169func getOptions (options []ControllerOption ) controllerOptions {
@@ -273,21 +173,3 @@ func getOptions(options []ControllerOption) controllerOptions {
273173 }
274174 return co
275175}
276-
277- func getSettings (sType component.Type , rSet receiver.Settings ) scraper.Settings {
278- return scraper.Settings {
279- ID : component .NewID (sType ),
280- TelemetrySettings : rSet .TelemetrySettings ,
281- BuildInfo : rSet .BuildInfo ,
282- }
283- }
284-
285- // withScrapeContext will return a context that has no deadline if timeout is 0
286- // which implies no explicit timeout had occurred, otherwise, a context
287- // with a deadline of the provided timeout is returned.
288- func withScrapeContext (timeout time.Duration ) (context.Context , context.CancelFunc ) {
289- if timeout == 0 {
290- return context .WithCancel (context .Background ())
291- }
292- return context .WithTimeout (context .Background (), timeout )
293- }
0 commit comments