diff --git a/pkg/cli/gptscript.go b/pkg/cli/gptscript.go index 8b6b2ffe..2a210f26 100644 --- a/pkg/cli/gptscript.go +++ b/pkg/cli/gptscript.go @@ -44,6 +44,7 @@ type GPTScript struct { Debug bool `usage:"Enable debug logging"` Quiet *bool `usage:"No output logging (set --quiet=false to force on even when there is no TTY)" short:"q"` Output string `usage:"Save output to a file, or - for stdout" short:"o"` + EventsStreamTo string `usage:"Stream events to this location, could be a file descriptor/handle (e.g. fd://2), filename, or named pipe (e.g. \\\\.\\pipe\\my-pipe)" name:"events-stream-to"` Input string `usage:"Read input from a file (\"-\" for stdin)" short:"f"` SubTool string `usage:"Use tool of this name, not the first tool in file" local:"true"` Assemble bool `usage:"Assemble tool to a single artifact, saved to --output" hidden:"true" local:"true"` @@ -137,6 +138,15 @@ func (r *GPTScript) NewGPTScriptOpts() (gptscript.Options, error) { opts.Runner.CredentialOverride = r.CredentialOverride + if r.EventsStreamTo != "" { + mf, err := monitor.NewFileFactory(r.EventsStreamTo) + if err != nil { + return gptscript.Options{}, err + } + + opts.Runner.MonitorFactory = mf + } + return opts, nil } diff --git a/pkg/monitor/display.go b/pkg/monitor/display.go index 55cd8bcb..429b60d2 100644 --- a/pkg/monitor/display.go +++ b/pkg/monitor/display.go @@ -203,7 +203,8 @@ func (d *display) Event(event runner.Event) { log := log.Fields( "id", currentCall.ID, "parentID", currentCall.ParentID, - "toolID", currentCall.ToolID) + "toolID", currentCall.ToolID, + ) _, ok := d.callIDMap[currentCall.ID] if !ok { diff --git a/pkg/monitor/fd.go b/pkg/monitor/fd.go new file mode 100644 index 00000000..6ec4d38c --- /dev/null +++ b/pkg/monitor/fd.go @@ -0,0 +1,130 @@ +package monitor + +import ( + "context" + "encoding/json" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/gptscript-ai/gptscript/pkg/runner" + "github.com/gptscript-ai/gptscript/pkg/types" +) + +type Event struct { + runner.Event `json:",inline"` + Program *types.Program `json:"program,omitempty"` + Input string `json:"input,omitempty"` + Output string `json:"output,omitempty"` + Err string `json:"err,omitempty"` +} + +type fileFactory struct { + file *os.File +} + +// NewFileFactory creates a new monitor factory that writes events to the location specified. +// The location can be one of three things: +// 1. a file descriptor/handle in the form "fd://2" +// 2. a file name +// 3. a named pipe in the form "\\.\pipe\my-pipe" +func NewFileFactory(loc string) (runner.MonitorFactory, error) { + var ( + file *os.File + err error + ) + + if strings.HasPrefix(loc, "fd://") { + fd, err := strconv.Atoi(strings.TrimPrefix(loc, "fd://")) + if err != nil { + return nil, err + } + + file = os.NewFile(uintptr(fd), "events") + } else { + file, err = os.OpenFile(loc, os.O_WRONLY|os.O_CREATE, 0) + if err != nil { + return nil, err + } + } + + return &fileFactory{ + file: file, + }, nil +} + +func (s fileFactory) Start(_ context.Context, prg *types.Program, env []string, input string) (runner.Monitor, error) { + fd := &fd{ + prj: prg, + env: env, + input: input, + file: s.file, + } + + fd.event(Event{ + Event: runner.Event{ + Time: time.Now(), + Type: "runStart", + }, + Program: prg, + }) + + return fd, nil +} + +type fd struct { + prj *types.Program + env []string + input string + file *os.File + runLock sync.Mutex +} + +func (f *fd) Event(event runner.Event) { + f.event(Event{ + Event: event, + Input: f.input, + }) +} + +func (f *fd) event(event Event) { + f.runLock.Lock() + defer f.runLock.Unlock() + b, err := json.Marshal(event) + if err != nil { + log.Errorf("Failed to marshal event: %v", err) + return + } + + if _, err = f.file.Write(append(b, '\n', '\n')); err != nil { + log.Errorf("Failed to write event to file: %v", err) + } +} + +func (f *fd) Stop(output string, err error) { + e := Event{ + Event: runner.Event{ + Time: time.Now(), + Type: "runFinish", + }, + Input: f.input, + Output: output, + } + if err != nil { + e.Err = err.Error() + } + + f.event(e) + if err = f.file.Close(); err != nil { + log.Errorf("Failed to close file: %v", err) + } +} + +func (f *fd) Pause() func() { + f.runLock.Lock() + return func() { + f.runLock.Unlock() + } +}