@@ -18,6 +18,7 @@ package discoverymanager
18
18
import (
19
19
"fmt"
20
20
"sync"
21
+ "time"
21
22
22
23
"github.com/arduino/arduino-cli/arduino/discovery"
23
24
"github.com/arduino/arduino-cli/i18n"
@@ -83,7 +84,12 @@ func (dm *DiscoveryManager) Start() {
83
84
return
84
85
}
85
86
86
- go dm .feeder ()
87
+ go func () {
88
+ // Feed all watchers with data coming from the discoveries
89
+ for ev := range dm .feed {
90
+ dm .feedEvent (ev )
91
+ }
92
+ }()
87
93
88
94
var wg sync.WaitGroup
89
95
for _ , d := range dm .discoveries {
@@ -136,13 +142,13 @@ func (dm *DiscoveryManager) Watch() (*PortWatcher, error) {
136
142
dm .Start ()
137
143
138
144
watcher := & PortWatcher {
139
- feed : make (chan * discovery.Event ),
145
+ feed : make (chan * discovery.Event , 10 ),
140
146
}
141
147
watcher .closeCB = func () {
142
148
dm .watchersMutex .Lock ()
143
149
delete (dm .watchers , watcher )
144
- dm .watchersMutex .Unlock ()
145
150
close (watcher .feed )
151
+ dm .watchersMutex .Unlock ()
146
152
}
147
153
go func () {
148
154
dm .watchersMutex .Lock ()
@@ -180,44 +186,43 @@ func (dm *DiscoveryManager) startDiscovery(d *discovery.PluggableDiscovery) (dis
180
186
return nil
181
187
}
182
188
183
- func (dm * DiscoveryManager ) feeder () {
184
- // Feed all watchers with data coming from the discoveries
185
- for ev := range dm .feed {
186
- dm .watchersMutex .Lock ()
187
- for watcher := range dm .watchers {
188
- select {
189
- case watcher .feed <- ev :
190
- // OK
191
- default :
192
- // If the watcher is not able to process event fast enough
193
- // remove the watcher from the list of watchers
194
- go watcher .Close ()
195
- }
189
+ func (dm * DiscoveryManager ) feedEvent (ev * discovery.Event ) {
190
+ dm .watchersMutex .Lock ()
191
+ defer dm .watchersMutex .Unlock ()
192
+
193
+ if ev .Type == "stop" {
194
+ // Remove all the cached events for the terminating discovery
195
+ delete (dm .watchersCache , ev .DiscoveryID )
196
+ return
197
+ }
198
+
199
+ // Send the event to all watchers
200
+ for watcher := range dm .watchers {
201
+ select {
202
+ case watcher .feed <- ev :
203
+ // OK
204
+ case <- time .After (time .Millisecond * 500 ):
205
+ // If the watcher is not able to process event fast enough
206
+ // remove the watcher from the list of watchers
207
+ logrus .Info ("Watcher is not able to process events fast enough, removing it from the list of watchers" )
208
+ delete (dm .watchers , watcher )
196
209
}
197
- dm .cacheEvent (ev )
198
- dm .watchersMutex .Unlock ()
199
210
}
200
- }
201
211
202
- func ( dm * DiscoveryManager ) cacheEvent ( ev * discovery. Event ) {
212
+ // Cache the event for the discovery
203
213
cache := dm .watchersCache [ev .DiscoveryID ]
204
214
if cache == nil {
205
215
cache = map [string ]* discovery.Event {}
206
216
dm .watchersCache [ev .DiscoveryID ] = cache
207
217
}
208
-
209
218
eventID := ev .Port .Address + "|" + ev .Port .Protocol
210
219
switch ev .Type {
211
220
case "add" :
212
221
cache [eventID ] = ev
213
222
case "remove" :
214
223
delete (cache , eventID )
215
- case "quit" :
216
- // Remove all the events for this discovery
217
- delete (dm .watchersCache , ev .DiscoveryID )
218
224
default :
219
225
logrus .Errorf ("Unhandled event from discovery: %s" , ev .Type )
220
- return
221
226
}
222
227
}
223
228
0 commit comments