From 693c970d24f79a61cee02abe12ec2ef9280ac19c Mon Sep 17 00:00:00 2001 From: Donnie Adams Date: Thu, 30 May 2024 10:39:49 -0400 Subject: [PATCH] fix: make --events-stream-to work with chat When chatting, the file passed to --events-stream-to is closed and the events for chat messages aren't appended. This change will reopen the file when necessary and continue streaming events to the file. Signed-off-by: Donnie Adams --- pkg/monitor/fd.go | 68 ++++++++++++++++++++++++++++++----------------- 1 file changed, 43 insertions(+), 25 deletions(-) diff --git a/pkg/monitor/fd.go b/pkg/monitor/fd.go index 43de826c..08b73ed3 100644 --- a/pkg/monitor/fd.go +++ b/pkg/monitor/fd.go @@ -7,7 +7,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/gptscript-ai/gptscript/pkg/runner" @@ -23,8 +22,10 @@ type Event struct { } type fileFactory struct { + fileName string file *os.File - runningCount atomic.Int32 + lock sync.Mutex + runningCount int } // NewFileFactory creates a new monitor factory that writes events to the location specified. @@ -33,33 +34,23 @@ type fileFactory struct { // 2. a file name // 3. a named pipe in the form "\\.\pipe\my-pipe" func NewFileFactory(loc string) (runner.MonitorFactory, error) { - var ( - file *os.File - err error - ) - - if strings.HasPrefix(loc, "fd://") { - fd, err := strconv.Atoi(strings.TrimPrefix(loc, "fd://")) - if err != nil { - return nil, err - } - - file = os.NewFile(uintptr(fd), "events") - } else { - file, err = os.OpenFile(loc, os.O_WRONLY|os.O_CREATE, 0644) - if err != nil { - return nil, err - } - } - return &fileFactory{ - file: file, - runningCount: atomic.Int32{}, + fileName: loc, }, nil } func (s *fileFactory) Start(_ context.Context, prg *types.Program, env []string, input string) (runner.Monitor, error) { - s.runningCount.Add(1) + s.lock.Lock() + s.runningCount++ + if s.runningCount == 1 { + if err := s.openFile(); err != nil { + s.runningCount-- + s.lock.Unlock() + return nil, err + } + } + s.lock.Unlock() + fd := &fd{ prj: prg, env: env, @@ -80,13 +71,40 @@ func (s *fileFactory) Start(_ context.Context, prg *types.Program, env []string, } func (s *fileFactory) close() { - if count := s.runningCount.Add(-1); count == 0 { + s.lock.Lock() + defer s.lock.Unlock() + + s.runningCount-- + if s.runningCount == 0 { if err := s.file.Close(); err != nil { log.Errorf("error closing monitor file: %v", err) } } } +func (s *fileFactory) openFile() error { + var ( + err error + file *os.File + ) + if strings.HasPrefix(s.fileName, "fd://") { + fd, err := strconv.Atoi(strings.TrimPrefix(s.fileName, "fd://")) + if err != nil { + return err + } + + file = os.NewFile(uintptr(fd), "events") + } else { + file, err = os.OpenFile(s.fileName, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0644) + if err != nil { + return err + } + } + + s.file = file + return nil +} + type fd struct { prj *types.Program env []string