Skip to content

feat: add event streaming #266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/cli/gptscript.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type GPTScript struct {
Debug bool `usage:"Enable debug logging"`
Quiet *bool `usage:"No output logging (set --quiet=false to force on even when there is no TTY)" short:"q"`
Output string `usage:"Save output to a file, or - for stdout" short:"o"`
EventsStreamTo string `usage:"Stream events to this location, could be a file descriptor/handle (e.g. fd://2), filename, or named pipe (e.g. \\\\.\\pipe\\my-pipe)" name:"events-stream-to"`
Input string `usage:"Read input from a file (\"-\" for stdin)" short:"f"`
SubTool string `usage:"Use tool of this name, not the first tool in file" local:"true"`
Assemble bool `usage:"Assemble tool to a single artifact, saved to --output" hidden:"true" local:"true"`
Expand Down Expand Up @@ -137,6 +138,15 @@ func (r *GPTScript) NewGPTScriptOpts() (gptscript.Options, error) {

opts.Runner.CredentialOverride = r.CredentialOverride

if r.EventsStreamTo != "" {
mf, err := monitor.NewFileFactory(r.EventsStreamTo)
if err != nil {
return gptscript.Options{}, err
}

opts.Runner.MonitorFactory = mf
}

return opts, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/monitor/display.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ func (d *display) Event(event runner.Event) {
log := log.Fields(
"id", currentCall.ID,
"parentID", currentCall.ParentID,
"toolID", currentCall.ToolID)
"toolID", currentCall.ToolID,
)

_, ok := d.callIDMap[currentCall.ID]
if !ok {
Expand Down
130 changes: 130 additions & 0 deletions pkg/monitor/fd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package monitor

import (
"context"
"encoding/json"
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/gptscript-ai/gptscript/pkg/runner"
"github.com/gptscript-ai/gptscript/pkg/types"
)

type Event struct {
runner.Event `json:",inline"`
Program *types.Program `json:"program,omitempty"`
Input string `json:"input,omitempty"`
Output string `json:"output,omitempty"`
Err string `json:"err,omitempty"`
}

type fileFactory struct {
file *os.File
}

// NewFileFactory creates a new monitor factory that writes events to the location specified.
// The location can be one of three things:
// 1. a file descriptor/handle in the form "fd://2"
// 2. a file name
// 3. a named pipe in the form "\\.\pipe\my-pipe"
func NewFileFactory(loc string) (runner.MonitorFactory, error) {
var (
file *os.File
err error
)

if strings.HasPrefix(loc, "fd://") {
fd, err := strconv.Atoi(strings.TrimPrefix(loc, "fd://"))
if err != nil {
return nil, err
}

file = os.NewFile(uintptr(fd), "events")
} else {
file, err = os.OpenFile(loc, os.O_WRONLY|os.O_CREATE, 0)
if err != nil {
return nil, err
}
}

return &fileFactory{
file: file,
}, nil
}

func (s fileFactory) Start(_ context.Context, prg *types.Program, env []string, input string) (runner.Monitor, error) {
fd := &fd{
prj: prg,
env: env,
input: input,
file: s.file,
}

fd.event(Event{
Event: runner.Event{
Time: time.Now(),
Type: "runStart",
},
Program: prg,
})

return fd, nil
}

type fd struct {
prj *types.Program
env []string
input string
file *os.File
runLock sync.Mutex
}

func (f *fd) Event(event runner.Event) {
f.event(Event{
Event: event,
Input: f.input,
})
}

func (f *fd) event(event Event) {
f.runLock.Lock()
defer f.runLock.Unlock()
b, err := json.Marshal(event)
if err != nil {
log.Errorf("Failed to marshal event: %v", err)
return
}

if _, err = f.file.Write(append(b, '\n', '\n')); err != nil {
log.Errorf("Failed to write event to file: %v", err)
}
}

func (f *fd) Stop(output string, err error) {
e := Event{
Event: runner.Event{
Time: time.Now(),
Type: "runFinish",
},
Input: f.input,
Output: output,
}
if err != nil {
e.Err = err.Error()
}

f.event(e)
if err = f.file.Close(); err != nil {
log.Errorf("Failed to close file: %v", err)
}
}

func (f *fd) Pause() func() {
f.runLock.Lock()
return func() {
f.runLock.Unlock()
}
}