diff --git a/pkg/monitor/fd.go b/pkg/monitor/fd.go index f5fea312..43de826c 100644 --- a/pkg/monitor/fd.go +++ b/pkg/monitor/fd.go @@ -7,6 +7,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/gptscript-ai/gptscript/pkg/runner" @@ -22,7 +23,8 @@ type Event struct { } type fileFactory struct { - file *os.File + file *os.File + runningCount atomic.Int32 } // NewFileFactory creates a new monitor factory that writes events to the location specified. @@ -44,23 +46,26 @@ func NewFileFactory(loc string) (runner.MonitorFactory, error) { file = os.NewFile(uintptr(fd), "events") } else { - file, err = os.OpenFile(loc, os.O_WRONLY|os.O_CREATE, 0) + file, err = os.OpenFile(loc, os.O_WRONLY|os.O_CREATE, 0644) if err != nil { return nil, err } } return &fileFactory{ - file: file, + file: file, + runningCount: atomic.Int32{}, }, nil } -func (s fileFactory) Start(_ context.Context, prg *types.Program, env []string, input string) (runner.Monitor, error) { +func (s *fileFactory) Start(_ context.Context, prg *types.Program, env []string, input string) (runner.Monitor, error) { + s.runningCount.Add(1) fd := &fd{ - prj: prg, - env: env, - input: input, - file: s.file, + prj: prg, + env: env, + input: input, + file: s.file, + factory: s, } fd.event(Event{ @@ -74,12 +79,21 @@ func (s fileFactory) Start(_ context.Context, prg *types.Program, env []string, return fd, nil } +func (s *fileFactory) close() { + if count := s.runningCount.Add(-1); count == 0 { + if err := s.file.Close(); err != nil { + log.Errorf("error closing monitor file: %v", err) + } + } +} + type fd struct { prj *types.Program env []string input string file *os.File runLock sync.Mutex + factory *fileFactory } func (f *fd) Event(event runner.Event) { @@ -117,9 +131,7 @@ func (f *fd) Stop(output string, err error) { } f.event(e) - if err = f.file.Close(); err != nil { - log.Errorf("Failed to close file: %v", err) - } + f.factory.close() } func (f *fd) Pause() func() {