@@ -5,7 +5,7 @@ package scraperhelper // import "go.opentelemetry.io/collector/receiver/scraperh
5
5
6
6
import (
7
7
"context"
8
- "errors "
8
+ "sync "
9
9
"time"
10
10
11
11
"go.uber.org/multierr"
@@ -63,12 +63,10 @@ type controller struct {
63
63
64
64
tickerCh <- chan time.Time
65
65
66
- initialized bool
67
- done chan struct {}
68
- terminated chan struct {}
66
+ done chan struct {}
67
+ wg sync.WaitGroup
69
68
70
- obsrecv * receiverhelper.ObsReport
71
- recvSettings receiver.Settings
69
+ obsrecv * receiverhelper.ObsReport
72
70
}
73
71
74
72
// NewScraperControllerReceiver creates a Receiver with the configured options, that can control multiple scrapers.
@@ -78,10 +76,6 @@ func NewScraperControllerReceiver(
78
76
nextConsumer consumer.Metrics ,
79
77
options ... ScraperControllerOption ,
80
78
) (component.Component , error ) {
81
- if cfg .CollectionInterval <= 0 {
82
- return nil , errors .New ("collection_interval must be a positive duration" )
83
- }
84
-
85
79
obsrecv , err := receiverhelper .NewObsReport (receiverhelper.ObsReportSettings {
86
80
ReceiverID : set .ID ,
87
81
Transport : "" ,
@@ -99,9 +93,7 @@ func NewScraperControllerReceiver(
99
93
timeout : cfg .Timeout ,
100
94
nextConsumer : nextConsumer ,
101
95
done : make (chan struct {}),
102
- terminated : make (chan struct {}),
103
96
obsrecv : obsrecv ,
104
- recvSettings : set ,
105
97
}
106
98
107
99
for _ , op := range options {
@@ -110,14 +102,11 @@ func NewScraperControllerReceiver(
110
102
111
103
sc .obsScrapers = make ([]* obsReport , len (sc .scrapers ))
112
104
for i , scraper := range sc .scrapers {
113
- scrp , err : = newScraper (obsReportSettings {
105
+ sc . obsScrapers [ i ] , err = newScraper (obsReportSettings {
114
106
ReceiverID : sc .id ,
115
107
Scraper : scraper .ID (),
116
- ReceiverCreateSettings : sc . recvSettings ,
108
+ ReceiverCreateSettings : set ,
117
109
})
118
-
119
- sc .obsScrapers [i ] = scrp
120
-
121
110
if err != nil {
122
111
return nil , err
123
112
}
@@ -134,20 +123,15 @@ func (sc *controller) Start(ctx context.Context, host component.Host) error {
134
123
}
135
124
}
136
125
137
- sc .initialized = true
138
126
sc .startScraping ()
139
127
return nil
140
128
}
141
129
142
130
// Shutdown the receiver, invoked during service shutdown.
143
131
func (sc * controller ) Shutdown (ctx context.Context ) error {
144
- sc .stopScraping ()
145
-
146
- // wait until scraping ticker has terminated
147
- if sc .initialized {
148
- <- sc .terminated
149
- }
150
-
132
+ // Signal the goroutine to stop.
133
+ close (sc .done )
134
+ sc .wg .Wait ()
151
135
var errs error
152
136
for _ , scraper := range sc .scrapers {
153
137
errs = multierr .Append (errs , scraper .Shutdown (ctx ))
@@ -159,12 +143,13 @@ func (sc *controller) Shutdown(ctx context.Context) error {
159
143
// startScraping initiates a ticker that calls Scrape based on the configured
160
144
// collection interval.
161
145
func (sc * controller ) startScraping () {
146
+ sc .wg .Add (1 )
162
147
go func () {
148
+ defer sc .wg .Done ()
163
149
if sc .initialDelay > 0 {
164
150
select {
165
151
case <- time .After (sc .initialDelay ):
166
152
case <- sc .done :
167
- sc .terminated <- struct {}{}
168
153
return
169
154
}
170
155
}
@@ -184,7 +169,6 @@ func (sc *controller) startScraping() {
184
169
case <- sc .tickerCh :
185
170
sc .scrapeMetricsAndReport ()
186
171
case <- sc .done :
187
- sc .terminated <- struct {}{}
188
172
return
189
173
}
190
174
}
@@ -222,11 +206,6 @@ func (sc *controller) scrapeMetricsAndReport() {
222
206
sc .obsrecv .EndMetricsOp (ctx , "" , dataPointCount , err )
223
207
}
224
208
225
- // stopScraping stops the ticker
226
- func (sc * controller ) stopScraping () {
227
- close (sc .done )
228
- }
229
-
230
209
// withScrapeContext will return a context that has no deadline if timeout is 0
231
210
// which implies no explicit timeout had occurred, otherwise, a context
232
211
// with a deadline of the provided timeout is returned.
0 commit comments