diff --git a/cmd/infrakit/main.go b/cmd/infrakit/main.go index 4904aff56..7460276c4 100644 --- a/cmd/infrakit/main.go +++ b/cmd/infrakit/main.go @@ -41,6 +41,7 @@ import ( // Supported "kinds" _ "github.com/docker/infrakit/pkg/run/v0/aws" + _ "github.com/docker/infrakit/pkg/run/v0/enrollment" _ "github.com/docker/infrakit/pkg/run/v0/file" _ "github.com/docker/infrakit/pkg/run/v0/group" _ "github.com/docker/infrakit/pkg/run/v0/hyperkit" diff --git a/docs/controller/enrollment/example.yml b/docs/controller/enrollment/example.yml new file mode 100644 index 000000000..b228bb40a --- /dev/null +++ b/docs/controller/enrollment/example.yml @@ -0,0 +1,79 @@ +# +# +# Example for enrollment controller +# +# Enrollment controller makes sure the instances in the source group (specified +# by the List field) are in sync with entries in the downstream instance plugin +# specified by Instance field. +# +# This example uses swarm as backend. So be sure your swarm on the same host +# is a leader via `docker swarm init`. +# +# +# 1. Start up all the plugins: +# INFRAKIT_MANAGER_BACKEND=swarm build/infrakit plugin start manager group \ +# enrollment:nfs \ +# simulator:nfs-auth simulator:simulator vanilla +# +# This starts up the group at 'us-east', one simulator at 'nfs' and another at 'simulator' +# +# 2. Commit the group to create the groups -- see the group.yml included. +# +# infrakit group controller commit -y docs/controller/enrollment/group.yml +# +# Verify the group has workers: +# +# infrakit group/workers describe +# +# 3. Commit this file to create the enrollments +# +# infrakit nfs controller commit -y docs/controller/enrollment/example.yml +# +# 4. Verify that entries are in sync: check the group describe and nfs-auth/disk (that's +# the Plugin specified in the Instance section of this config. +# +# infrakit group/workers describe # returns the list of nodes in the group +# infrakit infrakit nfs-auth/disk describe # returns the list of corresponding enrollments +# +# For each member of the group workers you should see a corresponding entry in the enrollments. +# Note that the instance ID of an enrollment (in nfs-auth/disk) will be different from the instance +# ID of a member in group/workers; however, the ID of a member of group/workers should show up as +# a label in the instance of nfs-auth/disk (as infrakit.enrollment.sourceID). +# +# 5. Try scale up the workers group +# +# infrakit group/workers scale 10 +# +# After a while, verify enrollment: +# +# infrakit nfs-auth/disk describe + + +kind: enrollment +metadata: + name: nfs/workers # socket file = nfs and the name of control loop is 'workers' +properties: + List: group/workers # socket file = group and group id is 'workers' + Instance: + + # the name of a plugin that has disk as subtype. + Plugin: nfs-auth/disk + + # the entire Properties block here will be rendered and included as the downstream + # instance plugin's instance.Spec when Provision() is called. + Properties: + + # You can include template expressions in this block; however, you need to + # escape the braces. + host: \{\{.ID\}\} + iops: 10 +options: + # This expression is a template used to select the key from each source entry + # of instance.Description. Note here we escape the template tags + # so that template expressions don't get clobbered by infrakit template when + # using that as a preprocessor prior to committing. + SourceKeySelector: \{\{.ID\}\} + + # How often to run the sync. The string value here is in the format of Go's time.Duration. + # For example, 1m means 1 minute. + SyncInterval: 5s # seconds \ No newline at end of file diff --git a/docs/controller/enrollment/group.yml b/docs/controller/enrollment/group.yml new file mode 100644 index 000000000..72dcb40cb --- /dev/null +++ b/docs/controller/enrollment/group.yml @@ -0,0 +1,30 @@ +# +# A group of workers +# +# Start up -- plugin start should include manager, vanilla, simulator, and group +# Then commit +# +# infrakit group controller commit -y docs/controller/enrollment/group.yml +# +kind: group +metadata: + name: workers +properties: + Allocation: + Size: 5 + Flavor: + Plugin: vanilla + Properties: + Attachments: + - ID: attachid + Type: attachtype + Init: + - docker pull nginx:alpine + - docker run -d -p 80:80 nginx-alpine + Tags: + project: infrakit + tier: web + Instance: + Plugin: simulator/compute + Properties: + Note: custom field diff --git a/docs/plugins/instance.md b/docs/plugins/instance.md index 176f74efe..c2c678bb7 100644 --- a/docs/plugins/instance.md +++ b/docs/plugins/instance.md @@ -1,6 +1,7 @@ # Instance plugin API - + + ## API diff --git a/pkg/cli/v0/group/cmd.go b/pkg/cli/v0/group/cmd.go index 8a20e3a92..d680756c5 100644 --- a/pkg/cli/v0/group/cmd.go +++ b/pkg/cli/v0/group/cmd.go @@ -15,15 +15,14 @@ var log = logutil.New("module", "cli/v1/group") func init() { cli.Register(group.InterfaceSpec, []cli.CmdBuilder{ - //Group, + // Group, Ls, Inspect, Describe, Commit, Free, Destroy, - Size, - SetSize, + Scale, DestroyInstances, }) } @@ -42,8 +41,7 @@ func Group(name string, services *cli.Services) *cobra.Command { Commit(name, services), Free(name, services), Destroy(name, services), - Size(name, services), - SetSize(name, services), + Scale(name, services), DestroyInstances(name, services), ) diff --git a/pkg/cli/v0/group/scale.go b/pkg/cli/v0/group/scale.go new file mode 100644 index 000000000..c9c9abcb4 --- /dev/null +++ b/pkg/cli/v0/group/scale.go @@ -0,0 +1,91 @@ +package group + +import ( + "fmt" + "os" + "strconv" + + "github.com/docker/infrakit/pkg/cli" + "github.com/docker/infrakit/pkg/plugin" + "github.com/docker/infrakit/pkg/spi/group" + "github.com/spf13/cobra" +) + +// Scale returns the scale command +func Scale(name string, services *cli.Services) *cobra.Command { + + scale := &cobra.Command{ + Use: "scale [new-target]", + Short: "Returns size of the group if no args provided. Otherwise set the target size.", + RunE: func(cmd *cobra.Command, args []string) error { + + pluginName := plugin.Name(name) + _, gid := pluginName.GetLookupAndType() + + size := -1 + + if gid == "" { + // if gid is not known, then we need it to be provided + switch len(args) { + + case 0: + cmd.Usage() + os.Exit(1) + case 1: + gid = args[0] + case 2: + gid = args[0] + sz, err := strconv.Atoi(args[1]) + if err != nil { + return err + } + size = sz + default: + cmd.Usage() + os.Exit(1) + } + } else { + // if gid is not known, then we need it to be provided + switch len(args) { + + case 0: + size = -1 + case 1: + sz, err := strconv.Atoi(args[0]) + if err != nil { + return err + } + size = sz + default: + cmd.Usage() + os.Exit(1) + } + } + + groupPlugin, err := LoadPlugin(services.Plugins(), name) + if err != nil { + return nil + } + cli.MustNotNil(groupPlugin, "group plugin not found", "name", name) + + groupID := group.ID(gid) + target, err := groupPlugin.Size(groupID) + if err != nil { + return err + } + fmt.Printf("Group %v at %d instances", groupID, target) + + if size > -1 { + err = groupPlugin.SetSize(groupID, size) + if err != nil { + return err + } + fmt.Printf(", scale to %d", size) + } + + fmt.Println() + return nil + }, + } + return scale +} diff --git a/pkg/controller/enrollment/controller.go b/pkg/controller/enrollment/controller.go new file mode 100644 index 000000000..cc30a2de0 --- /dev/null +++ b/pkg/controller/enrollment/controller.go @@ -0,0 +1,57 @@ +package enrollment + +import ( + "github.com/docker/infrakit/pkg/controller" + enrollment "github.com/docker/infrakit/pkg/controller/enrollment/types" + "github.com/docker/infrakit/pkg/controller/internal" + "github.com/docker/infrakit/pkg/discovery" + logutil "github.com/docker/infrakit/pkg/log" + "github.com/docker/infrakit/pkg/manager" + "github.com/docker/infrakit/pkg/types" +) + +var ( + log = logutil.New("module", "controller/enrollment") + debugV = logutil.V(200) +) + +// NewController returns a controller implementation +func NewController(plugins func() discovery.Plugins, leader manager.Leadership, + options enrollment.Options) controller.Controller { + return internal.NewController( + leader, + // the constructor + func(spec types.Spec) (internal.Managed, error) { + return newEnroller(plugins, leader, options), nil + }, + // the key function + func(metadata types.Metadata) string { + return metadata.Name + }, + ) +} + +// NewTypedControllers return typed controllers +func NewTypedControllers(plugins func() discovery.Plugins, leader manager.Leadership, + options enrollment.Options) func() (map[string]controller.Controller, error) { + + return (internal.NewController( + leader, + // the constructor + func(spec types.Spec) (internal.Managed, error) { + log.Debug("Creating managed object", "spec", spec) + return newEnroller(plugins, leader, options), nil + }, + // the key function + func(metadata types.Metadata) string { + return metadata.Name + }, + )).ManagedObjects +} + +func (l *enroller) started() bool { + l.lock.RLock() + defer l.lock.RUnlock() + + return l.running +} diff --git a/pkg/controller/enrollment/enroller.go b/pkg/controller/enrollment/enroller.go new file mode 100644 index 000000000..fc479a78b --- /dev/null +++ b/pkg/controller/enrollment/enroller.go @@ -0,0 +1,230 @@ +package enrollment + +import ( + "fmt" + "sync" + "time" + + "github.com/docker/infrakit/pkg/controller" + enrollment "github.com/docker/infrakit/pkg/controller/enrollment/types" + "github.com/docker/infrakit/pkg/discovery" + "github.com/docker/infrakit/pkg/manager" + "github.com/docker/infrakit/pkg/spi/group" + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/template" + "github.com/docker/infrakit/pkg/types" + "golang.org/x/net/context" +) + +// enroller implements the internal.Managed interface. +// When constructed, it takes a list of instance id, or logical +// names, or a source that can provide this data, and makes sure +// a downstream instance plugin properly reflects this list. +// When there are new entries in the source, the sink's Provision +// will be called. When source entries disappear, the sink's +// Destroy will be called. +// At the moment, destroy will not invoke an flavor plugin to +// execute some kind of drain. That functionality, instead, +// could be implemented as a proxied instance plugin (using the +// interceptor pattern). +type enroller struct { + manager.Leadership + + spec types.Spec + properties enrollment.Properties + options enrollment.Options + + plugins func() discovery.Plugins + + poller *controller.Poller + ticker <-chan time.Time + lock sync.RWMutex + + groupPlugin group.Plugin // source -- where members are to be enrolled + instancePlugin instance.Plugin // sink -- where enrollments are made + running bool + + // template that we use to render with a source instance.Description to get the link Key + sourceKeySelectorTemplate *template.Template + // template used to render the enrollment's Provision propertiesx + enrollmentPropertiesTemplate *template.Template +} + +func newEnroller(plugins func() discovery.Plugins, + leader manager.Leadership, options enrollment.Options) *enroller { + l := &enroller{ + Leadership: leader, + plugins: plugins, + options: options, + } + + interval := l.options.SyncInterval.Duration() + if interval == 0 { + interval = enrollment.DefaultSyncInterval + } + l.ticker = time.Tick(interval) + + l.poller = controller.Poll( + // This determines if the action should be taken when time is up + func() bool { + if mustTrue(l.IsLeader()) { + return true + } + return false + }, + // This does the work + func() (err error) { + return l.sync() + }, + l.ticker) + + return l +} + +func mustTrue(v bool, e error) bool { + if e != nil { + return false + } + return v +} + +// object returns the state +func (l *enroller) object() (*types.Object, error) { + object := types.Object{ + Spec: l.spec, + } + // TODO build the current state + return &object, nil +} + +// Plan implements internal.Managed.Plan +func (l *enroller) Plan(operation controller.Operation, spec types.Spec) (*types.Object, *controller.Plan, error) { + + if operation == controller.Destroy { + o, _ := l.object() + return o, nil, nil + } + + if spec.Properties == nil { + return nil, nil, fmt.Errorf("missing properties") + } + properties := enrollment.Properties{} + err := spec.Properties.Decode(&properties) + if err != nil { + return nil, nil, err + } + + // TODO - build a plan + return &types.Object{ + Spec: spec, + }, &controller.Plan{}, nil + +} + +func (l *enroller) updateSpec(spec types.Spec) error { + + l.lock.Lock() + defer l.lock.Unlock() + + if spec.Options != nil { + options := enrollment.Options{} + if err := spec.Options.Decode(&options); err != nil { + return err + } + l.options = options + } + + if spec.Properties != nil { + properties := enrollment.Properties{} + if err := spec.Properties.Decode(&properties); err != nil { + return err + } + l.properties = properties + } + + l.spec = spec + // set identity + l.spec.Metadata.Identity = &types.Identity{ + ID: l.spec.Metadata.Name, + } + return nil +} + +// Enforce implements internal.Managed.Enforce +func (l *enroller) Enforce(spec types.Spec) (*types.Object, error) { + log.Debug("Enforce", "spec", spec, "V", debugV) + + if err := l.updateSpec(spec); err != nil { + return nil, err + } + + l.Start() + return l.object() +} + +// Inspect implements internal.Managed.Inspect +func (l *enroller) Inspect() (*types.Object, error) { + return l.object() +} + +// Free implements internal.Managed.Free +func (l *enroller) Free() (*types.Object, error) { + return l.Pause() +} + +// Pause implements internal.Managed.Pause +func (l *enroller) Pause() (*types.Object, error) { + if l.Running() { + l.Stop() + } + return l.Inspect() +} + +// Terminate implements internal.Managed.Terminate +func (l *enroller) Terminate() (*types.Object, error) { + o, err := l.object() + if err != nil { + return nil, err + } + + if l.Running() { + l.Stop() + } + + if l.options.DestroyOnTerminate { + if err := l.destroy(); err != nil { + // TODO - how do we handle rollback? + // For now let's not try to restore deleted entries, because + // there are no guarantees that the restore operations will succeed. + return o, err + } + } + return o, nil +} + +// Start implements internal/ControlLoop.Start +func (l *enroller) Start() { + l.lock.Lock() + defer l.lock.Unlock() + + if l.poller != nil { + go l.poller.Run(context.Background()) + l.running = true + } +} + +// Start implements internal/ControlLoop.Stop +func (l *enroller) Stop() error { + l.lock.Lock() + defer l.lock.Unlock() + + if l.poller != nil { + l.poller.Stop() + } + return nil +} + +// Running implements internal/ControlLoop.Running +func (l *enroller) Running() bool { + return l.started() +} diff --git a/pkg/controller/enrollment/enroller_test.go b/pkg/controller/enrollment/enroller_test.go new file mode 100644 index 000000000..d70d31341 --- /dev/null +++ b/pkg/controller/enrollment/enroller_test.go @@ -0,0 +1,141 @@ +package enrollment + +import ( + "fmt" + "testing" + + enrollment "github.com/docker/infrakit/pkg/controller/enrollment/types" + "github.com/docker/infrakit/pkg/discovery" + "github.com/docker/infrakit/pkg/plugin" + "github.com/docker/infrakit/pkg/spi/group" + "github.com/docker/infrakit/pkg/spi/instance" + group_test "github.com/docker/infrakit/pkg/testing/group" + instance_test "github.com/docker/infrakit/pkg/testing/instance" + "github.com/docker/infrakit/pkg/types" + "github.com/stretchr/testify/require" +) + +type fakeLeader func() (bool, error) + +func (f fakeLeader) IsLeader() (bool, error) { + return f() +} + +type fakePlugins map[string]*plugin.Endpoint + +func (f fakePlugins) Find(name plugin.Name) (*plugin.Endpoint, error) { + lookup, _ := name.GetLookupAndType() + if v, has := f[lookup]; has { + return v, nil + } + return nil, fmt.Errorf("not found") +} + +func (f fakePlugins) List() (map[string]*plugin.Endpoint, error) { + return (map[string]*plugin.Endpoint)(f), nil +} + +func TestEnroller(t *testing.T) { + + source := []instance.Description{ + {ID: instance.ID("h1")}, + {ID: instance.ID("h2")}, + {ID: instance.ID("h3")}, + } + + enrolled := []instance.Description{ + {ID: instance.ID("nfs1"), Tags: map[string]string{"infrakit.enrollment.sourceID": "h1"}}, + {ID: instance.ID("nfs2"), Tags: map[string]string{"infrakit.enrollment.sourceID": "h2"}}, + {ID: instance.ID("nfs5"), Tags: map[string]string{"infrakit.enrollment.sourceID": "h5"}}, + } + + seen := make(chan []interface{}, 10) + + enroller := newEnroller( + func() discovery.Plugins { + return fakePlugins{ + "test": &plugin.Endpoint{}, + } + }, + fakeLeader(func() (bool, error) { return false, nil }), + enrollment.Options{}) + enroller.groupPlugin = &group_test.Plugin{ + DoDescribeGroup: func(gid group.ID) (group.Description, error) { + result := group.Description{Instances: source} + return result, nil + }, + } + enroller.instancePlugin = &instance_test.Plugin{ + DoDescribeInstances: func(t map[string]string, p bool) ([]instance.Description, error) { + return enrolled, nil + }, + DoProvision: func(spec instance.Spec) (*instance.ID, error) { + + seen <- []interface{}{spec, "Provision"} + return nil, nil + }, + DoDestroy: func(id instance.ID, ctx instance.Context) error { + + seen <- []interface{}{id, ctx, "Destroy"} + return nil + }, + } + + require.False(t, enroller.Running()) + + spec := types.Spec{} + require.NoError(t, types.AnyYAMLMust([]byte(` +kind: enrollment +metadata: + name: nfs +properties: + List: group/workers + Instance: + Plugin: nfs/authorization + Properties: + host: \{\{.ID\}\} + iops: 10 +options: + SourceKeySelector: \{\{.ID\}\} + +`)).Decode(&spec)) + + require.NoError(t, enroller.updateSpec(spec)) + + st, err := enroller.getSourceKeySelectorTemplate() + require.NoError(t, err) + require.NotNil(t, st) + + et, err := enroller.getEnrollmentPropertiesTemplate() + require.NoError(t, err) + require.NotNil(t, et) + + require.NoError(t, err) + + s, err := enroller.getSourceInstances() + require.NoError(t, err) + require.Equal(t, source, s) + + found, err := enroller.getEnrolledInstances() + require.NoError(t, err) + require.Equal(t, enrolled, found) + + require.NoError(t, enroller.sync()) + + // check the provision and destroy calls + require.Equal(t, []interface{}{ + instance.Spec{ + Properties: types.AnyString(`{"host":"h3","iops":10}`), + Tags: map[string]string{ + "infrakit.enrollment.sourceID": "h3", + "infrakit.enrollment.name": "nfs", + }, + }, + "Provision", + }, <-seen) + require.Equal(t, []interface{}{ + instance.ID("nfs5"), + instance.Termination, + "Destroy", + }, <-seen) +} diff --git a/pkg/controller/enrollment/set.go b/pkg/controller/enrollment/set.go new file mode 100644 index 000000000..81fb7b91d --- /dev/null +++ b/pkg/controller/enrollment/set.go @@ -0,0 +1,77 @@ +package enrollment + +import ( + "sort" + + "github.com/deckarep/golang-set" + "github.com/docker/infrakit/pkg/spi/instance" +) + +// keyFunc is a function that extracts the key from the description +type keyFunc func(instance.Description) (string, error) + +func index(list instance.Descriptions, getKey keyFunc) (map[string]instance.Description, mapset.Set) { + index := map[string]instance.Description{} + this := mapset.NewSet() + for _, n := range list { + key, err := getKey(n) + if err != nil { + log.Error("cannot index entry", "instane.Description", n) + continue + } + this.Add(key) + index[key] = n + } + return index, this +} + +// Difference returns a list of specs that is not in the receiver. +func Difference(list instance.Descriptions, listKeyFunc keyFunc, + other instance.Descriptions, otherKeyFunc keyFunc) instance.Descriptions { + this, thisSet := index(list, listKeyFunc) + _, thatSet := index(other, otherKeyFunc) + return toDescriptions(listKeyFunc, thisSet.Difference(thatSet), this) +} + +func toDescriptions(keyFunc keyFunc, set mapset.Set, index map[string]instance.Description) instance.Descriptions { + out := instance.Descriptions{} + for n := range set.Iter() { + out = append(out, index[n.(string)]) + } + sort.Sort(out) + return out +} + +// Delta computes the changes necessary to make the list match other: +// 1. the add Descriptions are entries to add to other +// 2. the remove Descriptions are entries to remove from other +// 3. changes are a slice of Descriptions where changes[x][0] is the original, and changes[x][1] is new +func Delta(list instance.Descriptions, listKeyFunc keyFunc, other instance.Descriptions, + otherKeyFunc keyFunc) (add instance.Descriptions, remove instance.Descriptions, changes [][2]instance.Description) { + + sort.Sort(instance.Descriptions(list)) + sort.Sort(instance.Descriptions(other)) + + this, thisSet := index(list, listKeyFunc) + that, thatSet := index(other, otherKeyFunc) + + removeSet := thatSet.Difference(thisSet) + remove = toDescriptions(otherKeyFunc, removeSet, that) + + addSet := thisSet.Difference(thatSet) + add = toDescriptions(listKeyFunc, addSet, this) + + changeSet := thatSet.Difference(removeSet) + + sort.Sort(instance.Descriptions(add)) + sort.Sort(instance.Descriptions(remove)) + + changes = [][2]instance.Description{} + for n := range changeSet.Iter() { + key := n.(string) + if this[key].Fingerprint() != that[key].Fingerprint() { + changes = append(changes, [2]instance.Description{this[key], that[key]}) + } + } + return +} diff --git a/pkg/controller/enrollment/set_test.go b/pkg/controller/enrollment/set_test.go new file mode 100644 index 000000000..c5e3dc284 --- /dev/null +++ b/pkg/controller/enrollment/set_test.go @@ -0,0 +1,89 @@ +package enrollment + +import ( + "testing" + + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/stretchr/testify/require" +) + +func TestSet(t *testing.T) { + + a := instance.Descriptions{ + {ID: instance.ID("1")}, + {ID: instance.ID("2")}, + {ID: instance.ID("3")}, + {ID: instance.ID("4")}, + {ID: instance.ID("5")}, + } + + b := instance.Descriptions{ + {ID: instance.ID("2")}, + {ID: instance.ID("3")}, + {ID: instance.ID("5")}, + {ID: instance.ID("6")}, + } + + keyFunc := func(i instance.Description) (string, error) { + return string(i.ID), nil + } + + diff := Difference(a, keyFunc, b, keyFunc) + require.Equal(t, instance.Descriptions{ + {ID: instance.ID("1")}, + {ID: instance.ID("4")}, + }, diff) + + diff2 := Difference(b, keyFunc, a, keyFunc) + require.Equal(t, instance.Descriptions{ + {ID: instance.ID("6")}, + }, diff2) + + add, remove, _ := Delta(instance.Descriptions(a), keyFunc, + instance.Descriptions(b), keyFunc) + require.Equal(t, instance.Descriptions{a[0], a[3]}, add) + require.Equal(t, instance.Descriptions{b[3]}, remove) +} + +func logicalID(s string) *instance.LogicalID { + id := instance.LogicalID(s) + return &id +} + +func TestSetKeyFuncs(t *testing.T) { + + a := instance.Descriptions{ + {ID: instance.ID("0x"), LogicalID: logicalID("0")}, + {ID: instance.ID("1x"), LogicalID: logicalID("1")}, + {ID: instance.ID("2x"), LogicalID: logicalID("2")}, + {ID: instance.ID("3x"), LogicalID: logicalID("3")}, + {ID: instance.ID("4x"), LogicalID: logicalID("4")}, + } + + b := instance.Descriptions{ + {ID: instance.ID("0")}, + {ID: instance.ID("2")}, + {ID: instance.ID("3")}, + {ID: instance.ID("5")}, + {ID: instance.ID("6")}, + } + + aKeyFunc := func(i instance.Description) (string, error) { + return string(*i.LogicalID), nil + } + bKeyFunc := func(i instance.Description) (string, error) { + return string(i.ID), nil + } + + diff := Difference(a, aKeyFunc, b, bKeyFunc) + require.Equal(t, instance.Descriptions{a[1], a[4]}, diff) + + diff2 := Difference(b, bKeyFunc, a, aKeyFunc) + require.Equal(t, instance.Descriptions{b[3], b[4]}, diff2) + + add, remove, change := Delta(instance.Descriptions(a), aKeyFunc, + instance.Descriptions(b), bKeyFunc) + require.Equal(t, instance.Descriptions{a[1], a[4]}, add) + require.Equal(t, instance.Descriptions{b[3], b[4]}, remove) + require.Equal(t, 3, len(change)) +} diff --git a/pkg/controller/enrollment/sync.go b/pkg/controller/enrollment/sync.go new file mode 100644 index 000000000..216e1b2aa --- /dev/null +++ b/pkg/controller/enrollment/sync.go @@ -0,0 +1,259 @@ +package enrollment + +import ( + "fmt" + + enrollment "github.com/docker/infrakit/pkg/controller/enrollment/types" + "github.com/docker/infrakit/pkg/plugin" + group_rpc "github.com/docker/infrakit/pkg/rpc/group" + instance_rpc "github.com/docker/infrakit/pkg/rpc/instance" + "github.com/docker/infrakit/pkg/spi/group" + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/template" + "github.com/docker/infrakit/pkg/types" +) + +func (l *enroller) getSourceInstances() ([]instance.Description, error) { + list, err := l.properties.List.InstanceDescriptions() + if err != nil { + + pn, err := l.properties.List.GroupPlugin() + if err != nil { + return nil, fmt.Errorf("no list source specified") + } + + // we have a plugin name. -- eg. us-east/workers + lookup, gid := pn.GetLookupAndType() + + gp, err := l.getGroupPlugin(plugin.Name(lookup)) + if err != nil { + return nil, fmt.Errorf("cannot connect to group %v", pn) + } + + desc, err := gp.DescribeGroup(group.ID(gid)) + if err != nil { + return nil, err + } + + return desc.Instances, nil + } + return list, err +} + +func (l *enroller) getEnrolledInstances() ([]instance.Description, error) { + instancePlugin, err := l.getInstancePlugin(l.properties.Instance.Plugin) + if err != nil { + return nil, err + } + + return instancePlugin.DescribeInstances(l.properties.Instance.Labels, true) +} + +func (l *enroller) getSourceKeySelectorTemplate() (*template.Template, error) { + l.lock.Lock() + defer l.lock.Unlock() + + if l.options.SourceKeySelector != "" { + if l.sourceKeySelectorTemplate == nil { + t, err := enrollment.TemplateFrom([]byte(l.options.SourceKeySelector)) + if err != nil { + return nil, err + } + l.sourceKeySelectorTemplate = t + } + } + + return l.sourceKeySelectorTemplate, nil +} + +func (l *enroller) getEnrollmentPropertiesTemplate() (*template.Template, error) { + l.lock.Lock() + defer l.lock.Unlock() + + if l.properties.Instance.Properties != nil { + if l.enrollmentPropertiesTemplate == nil { + t, err := enrollment.TemplateFrom(l.properties.Instance.Properties.Bytes()) + if err != nil { + return nil, err + } + l.enrollmentPropertiesTemplate = t + } + } + + return l.enrollmentPropertiesTemplate, nil +} + +// run one synchronization round +func (l *enroller) sync() error { + + source, err := l.getSourceInstances() + if err != nil { + log.Error("Error getting sources. No action", "err", err) + return nil + } + + enrolled, err := l.getEnrolledInstances() + if err != nil { + log.Error("Error getting enrollment. No action", "err", err) + return nil + } + + // We need to compute a projection for each one of the vectors and compare + // them. This is because instance IDs from the respective lists are likely + // to be different. Instead there's a join key / common attribute somewhere + // embedded in the Description.Properties. + sourceKeyFunc := func(d instance.Description) (string, error) { + + t, err := l.getSourceKeySelectorTemplate() + if err != nil { + return "", err + } + if t != nil { + view, err := t.Render(d) + if err != nil { + return "", err + } + return view, nil + } + + return string(d.ID), nil + } + + // As long as the downstream enrollment records are labeled correctly we + // can even support 'importing' out-of-band created enrollment records + enrolledKeyFunc := func(d instance.Description) (string, error) { + if v, has := d.Tags["infrakit.enrollment.sourceID"]; has { + return v, nil + } + return "", fmt.Errorf("not-matched:%v", d.ID) + } + + // compute the delta required to make enrolled look like source + add, remove, _ := Delta( + instance.Descriptions(source), sourceKeyFunc, + instance.Descriptions(enrolled), enrolledKeyFunc, + ) + + log.Info("Computed delta", "add", add, "remove", remove) + + instancePlugin, err := l.getInstancePlugin(l.properties.Instance.Plugin) + if err != nil { + log.Error("cannot get instance plugin", "err", err) + return err + } + + for _, n := range add { + + props, err := l.buildProperties(n) + if err != nil { + log.Error("Cannot bulid properties to enroll", "err", err, "description", n) + continue + } + spec := instance.Spec{ + Properties: props, + Tags: l.labels(n), + } + _, err = instancePlugin.Provision(spec) + if err != nil { + log.Error("Failed to create enrollment", "err", err, "spec", spec) + } + } + + for _, n := range remove { + err = instancePlugin.Destroy(n.ID, instance.Termination) + if err != nil { + log.Error("Failed to remove enrollment", "err", err, "id", n.ID) + continue // get them next time... + } + } + return nil +} + +// buildProperties for calling enrollment / Provision +func (l *enroller) buildProperties(d instance.Description) (*types.Any, error) { + t, err := l.getEnrollmentPropertiesTemplate() + if err != nil { + return nil, err + } + if t == nil { + return types.AnyValue(d) + } + view, err := t.Render(d) + if err != nil { + return nil, err + } + return types.AnyString(view), nil +} + +func (l *enroller) labels(n instance.Description) map[string]string { + labels := l.properties.Instance.Labels + if labels == nil { + labels = map[string]string{} + } + labels["infrakit.enrollment.sourceID"] = string(n.ID) + labels["infrakit.enrollment.name"] = l.spec.Metadata.Name + return labels +} + +// destroy all the instances in the enrolled instance plugin +func (l *enroller) destroy() error { + instancePlugin, err := l.getInstancePlugin(l.properties.Instance.Plugin) + if err != nil { + return err + } + + // TODO -- add retry loop here to let Terminate block until everything is cleaned up. + { + l.lock.Lock() + + enrolled, err := l.getEnrolledInstances() + if err != nil { + return err + } + + for _, n := range enrolled { + err = instancePlugin.Destroy(n.ID, instance.Termination) + if err != nil { + log.Error("failed to destroy instance. retry next cycle.", "id", n.ID) + } + } + + defer l.lock.Unlock() + } + + return nil +} + +func (l *enroller) getGroupPlugin(name plugin.Name) (group.Plugin, error) { + if l.groupPlugin != nil { + return l.groupPlugin, nil + } + return l.connectGroupPlugin(name) +} + +func (l *enroller) connectGroupPlugin(name plugin.Name) (group.Plugin, error) { + l.lock.Lock() + defer l.lock.Unlock() + endpoint, err := l.plugins().Find(name) + if err != nil { + return nil, err + } + return group_rpc.NewClient(endpoint.Address) +} + +func (l *enroller) getInstancePlugin(name plugin.Name) (instance.Plugin, error) { + if l.instancePlugin != nil { + return l.instancePlugin, nil + } + return l.connectInstancePlugin(name) +} + +func (l *enroller) connectInstancePlugin(name plugin.Name) (instance.Plugin, error) { + l.lock.Lock() + defer l.lock.Unlock() + endpoint, err := l.plugins().Find(name) + if err != nil { + return nil, err + } + return instance_rpc.NewClient(name, endpoint.Address) +} diff --git a/pkg/controller/enrollment/types/types.go b/pkg/controller/enrollment/types/types.go new file mode 100644 index 000000000..b80e505a5 --- /dev/null +++ b/pkg/controller/enrollment/types/types.go @@ -0,0 +1,119 @@ +package types + +import ( + "time" + + "github.com/docker/infrakit/pkg/controller" + "github.com/docker/infrakit/pkg/plugin" + "github.com/docker/infrakit/pkg/run/depends" + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/template" + "github.com/docker/infrakit/pkg/types" +) + +const ( + // DefaultSyncInterval is the default interval for syncing enrollments + DefaultSyncInterval = 5 * time.Second +) + +func init() { + depends.Register("enroll", types.InterfaceSpec(controller.InterfaceSpec), ResolveDependencies) +} + +// ResolveDependencies returns a list of dependencies by parsing the opaque Properties blob. +func ResolveDependencies(spec types.Spec) ([]plugin.Name, error) { + if spec.Properties == nil { + return nil, nil + } + + properties := Properties{} + err := spec.Properties.Decode(&properties) + if err != nil { + return nil, err + } + + return []plugin.Name{properties.Instance.Plugin}, nil +} + +// ListSourceUnion is a union type of possible values: +// a list of []intsance.Description +// a group plugin name +type ListSourceUnion types.Any + +// InstanceDescriptions tries to 'cast' the union as list of descriptions +func (u *ListSourceUnion) InstanceDescriptions() ([]instance.Description, error) { + list := []instance.Description{} + err := (*types.Any)(u).Decode(&list) + return list, err +} + +// GroupPlugin tries to 'cast' the union value as a group plugin name +func (u *ListSourceUnion) GroupPlugin() (plugin.Name, error) { + p := plugin.Name("") + err := (*types.Any)(u).Decode(&p) + return p, err +} + +// UnmarshalJSON implements json.Unmarshaler +func (u *ListSourceUnion) UnmarshalJSON(buff []byte) error { + *u = ListSourceUnion(*types.AnyBytes(buff)) + return nil +} + +// MarshalJSON implements json.Marshaler +func (u *ListSourceUnion) MarshalJSON() ([]byte, error) { + if u != nil { + return (*types.Any)(u).MarshalJSON() + } + return []byte{}, nil +} + +// PluginSpec has information about the plugin +type PluginSpec struct { + // Plugin is the name of the instance plugin + Plugin plugin.Name + + // Labels are the labels to use when querying for instances. This is the namespace. + Labels map[string]string + + // Properties is the properties to configure the instance with. + Properties *types.Any `json:",omitempty" yaml:",omitempty"` +} + +// Properties is the schema of the configuration in the types.Spec.Properties +type Properties struct { + + // List is a list of instance descriptions to sync + List *ListSourceUnion `json:",omitempty" yaml:",omitempty"` + + // Instance is the name of the instance plugin which will receive the + // synchronization messages of provision / destroy based on the + // changes in the List + Instance PluginSpec +} + +// Options is the controller options +type Options struct { + + // SourceKeySelector is a string template for selecting the join key from + // a source instance.Description. This selector template should use escapes + // so that the template {{ and }} are preserved. For example, + // SourceKeySelector: \{\{ .ID \}\} # selects the ID field. + SourceKeySelector string + + // SyncInterval is the time interval between reconciliation. Syntax + // is go's time.Duration string representation (e.g. 1m, 30s) + SyncInterval types.Duration + + // DestroyOnTerminiate tells the controller to call instace.Destroy + // for each member it is maintaining. This is a matter of ownership + // depending on use cases the controller may not *own* the data in the + // downstream instance. The controller merely reconciles it. + DestroyOnTerminate bool +} + +// TemplateFrom returns a template after it has un-escapes any escape sequences +func TemplateFrom(source []byte) (*template.Template, error) { + buff := template.Unescape(source) + return template.NewTemplate("str://"+string(buff), template.Options{MultiPass: false}) +} diff --git a/pkg/controller/enrollment/types/types_test.go b/pkg/controller/enrollment/types/types_test.go new file mode 100644 index 000000000..6eea1d08e --- /dev/null +++ b/pkg/controller/enrollment/types/types_test.go @@ -0,0 +1,166 @@ +package types + +import ( + "testing" + + "github.com/docker/infrakit/pkg/plugin" + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/types" + "github.com/stretchr/testify/require" +) + +func mustSpec(s types.Spec, err error) types.Spec { + if err != nil { + panic(err) + } + return s +} + +func specFromString(s string) (types.Spec, error) { + v, err := types.AnyYAML([]byte(s)) + if err != nil { + return types.Spec{}, err + } + spec := types.Spec{} + err = v.Decode(&spec) + return spec, err +} + +func TestWriteProperties(t *testing.T) { + p := Properties{ + List: (*ListSourceUnion)(types.AnyValueMust([]instance.Description{ + {ID: instance.ID("host1")}, + {ID: instance.ID("host2")}, + })), + Instance: PluginSpec{ + Plugin: plugin.Name("simulator/compute"), + Properties: types.AnyValueMust("test"), + }, + } + + buff, err := types.AnyValueMust(p).MarshalYAML() + require.NoError(t, err) + + p2 := Properties{} + err = types.AnyYAMLMust(buff).Decode(&p2) + require.NoError(t, err) + + list1, err := p.List.InstanceDescriptions() + require.NoError(t, err) + + list2, err := p2.List.InstanceDescriptions() + require.NoError(t, err) + + require.EqualValues(t, list2, list1) +} + +func TestParseProperties(t *testing.T) { + + spec := mustSpec(specFromString(` +kind: enrollment +metadata: + name: nfs +properties: + List: + - ID: host1 + - ID: host2 + - ID: host3 + - ID: host4 + Instance: + Plugin: us-east/nfs-authorizer + Properties: + Id: \{\{ .ID \}\} +`)) + + p := Properties{} + err := spec.Properties.Decode(&p) + require.NoError(t, err) + + list, err := p.List.InstanceDescriptions() + require.NoError(t, err) + + _, err = p.List.GroupPlugin() + require.Error(t, err) + + require.EqualValues(t, []instance.Description{ + {ID: instance.ID("host1")}, + {ID: instance.ID("host2")}, + {ID: instance.ID("host3")}, + {ID: instance.ID("host4")}, + }, list) +} + +func TestParsePropertiesWithGroup(t *testing.T) { + + spec := mustSpec(specFromString(` +kind: enrollment +metadata: + name: nfs +properties: + List: us-east/workers + Instance: + Plugin: us-east/nfs-authorizer + Properties: + Id: \{\{ .ID \}\} +`)) + + p := Properties{} + err := spec.Properties.Decode(&p) + require.NoError(t, err) + + _, err = p.List.InstanceDescriptions() + require.Error(t, err) + + g, err := p.List.GroupPlugin() + require.NoError(t, err) + require.Equal(t, plugin.Name("us-east/workers"), g) + + spec2 := types.Spec{} + require.NoError(t, types.AnyString(` +{ + "kind": "enrollment", + "metadata" : { + "name" : "nfs" + }, + "options" : { + "SourceKeySelector" : "\\{\\{.ID\\}\\}" + }, + "properties" : { + "List" : [ + { "ID" : "h1" }, + { "ID" : "h2" } + ], + "Instance" : { + "Plugin" : "us-east/nfs-authorizer", + "Properties" : { + "ID" : "\\{\\{.ID\\}\\}" + } + } + } +} +`).Decode(&spec2)) + + tt, err := TemplateFrom(spec2.Options.Bytes()) + require.NoError(t, err) + + obj := map[string]string{"ID": "hello"} + view, err := tt.Render(obj) + require.NoError(t, err) + require.Equal(t, "{\n \"SourceKeySelector\" : \"hello\"\n }", view) + + type properties struct { + Instance struct { + Properties struct { + ID string + } + } + } + + pp := properties{} + tt, err = TemplateFrom(spec2.Properties.Bytes()) + require.NoError(t, err) + view, err = tt.Render(obj) + require.NoError(t, err) + require.NoError(t, types.AnyString(view).Decode(&pp)) + require.Equal(t, "hello", pp.Instance.Properties.ID) +} diff --git a/pkg/controller/internal/controller.go b/pkg/controller/internal/controller.go index 964ecb354..de37f596c 100644 --- a/pkg/controller/internal/controller.go +++ b/pkg/controller/internal/controller.go @@ -144,6 +144,8 @@ func (c *Controller) Commit(operation controller.Operation, spec types.Spec) (ob c.lock.Lock() defer c.lock.Unlock() + log.Debug("committing", "operation", operation, "spec", spec) + m := []**Managed{} copy := spec m, err = c.getManaged(&spec.Metadata, ©) @@ -151,6 +153,12 @@ func (c *Controller) Commit(operation controller.Operation, spec types.Spec) (ob return } + log.Debug("got managed", "operation", operation, "spec", spec, "m", m) + + if len(m) == 0 { + return types.Object{}, fmt.Errorf("no managed object found %v", spec.Metadata.Name) + } + // In the future maybe will consider wildcard commits... but this is highly discouraged at this time. if len(m) != 1 { err = fmt.Errorf("duplicate objects: %v", m) @@ -162,6 +170,8 @@ func (c *Controller) Commit(operation controller.Operation, spec types.Spec) (ob managed := *(m[0]) if (*managed).Running() { + log.Debug("creating new object to replace running instance.") + // Create a new object newManaged, err := c.alloc(spec) if err != nil { @@ -180,6 +190,7 @@ func (c *Controller) Commit(operation controller.Operation, spec types.Spec) (ob log.Debug("Swapped running managed object", "managed", m[0]) } + log.Debug("calling enforce", "spec", spec, "m", managed) o, e := (*managed).Enforce(spec) if o != nil { object = *o @@ -216,6 +227,15 @@ func (c *Controller) Describe(search *types.Metadata) (objects []types.Object, e if err != nil { return } + + if len(m) == 0 { + ss := fmt.Sprintf("%v", search) + if search != nil { + ss = fmt.Sprintf("%v", *search) + } + return nil, fmt.Errorf("no managed object found %v", ss) + } + objects = []types.Object{} for _, s := range m { o, err := (**s).Inspect() diff --git a/pkg/run/v0/enrollment/enrollment.go b/pkg/run/v0/enrollment/enrollment.go new file mode 100644 index 000000000..ceb928613 --- /dev/null +++ b/pkg/run/v0/enrollment/enrollment.go @@ -0,0 +1,80 @@ +package enrollment + +import ( + enrollment_controller "github.com/docker/infrakit/pkg/controller/enrollment" + enrollment "github.com/docker/infrakit/pkg/controller/enrollment/types" + "github.com/docker/infrakit/pkg/discovery" + "github.com/docker/infrakit/pkg/launch/inproc" + logutil "github.com/docker/infrakit/pkg/log" + "github.com/docker/infrakit/pkg/manager" + "github.com/docker/infrakit/pkg/plugin" + "github.com/docker/infrakit/pkg/rpc/client" + manager_rpc "github.com/docker/infrakit/pkg/rpc/manager" + "github.com/docker/infrakit/pkg/run" + "github.com/docker/infrakit/pkg/types" +) + +const ( + // Kind is the canonical name of the plugin for starting up, etc. + Kind = "enrollment" +) + +var ( + log = logutil.New("module", "run/v0/enrollment") +) + +func init() { + inproc.Register(Kind, Run, DefaultOptions) +} + +// DefaultOptions return an Options with default values filled in. +var DefaultOptions = enrollment.Options{ + SyncInterval: types.Duration(enrollment.DefaultSyncInterval), + DestroyOnTerminate: false, +} + +func leadership(plugins func() discovery.Plugins) (manager.Leadership, error) { + // Scan for a manager + pm, err := plugins().List() + if err != nil { + return nil, err + } + + for _, endpoint := range pm { + rpcClient, err := client.New(endpoint.Address, manager.InterfaceSpec) + if err == nil { + return manager_rpc.Adapt(rpcClient), nil + } + } + return nil, nil +} + +// Run runs the plugin, blocking the current thread. Error is returned immediately +// if the plugin cannot be started. +func Run(plugins func() discovery.Plugins, name plugin.Name, + config *types.Any) (transport plugin.Transport, impls map[run.PluginCode]interface{}, onStop func(), err error) { + + if plugins == nil { + panic("no plugins()") + } + + options := enrollment.Options{} + err = config.Decode(&options) + if err != nil { + return + } + + log.Info("Decoded input", "config", options) + + leader, err := leadership(plugins) + if err != nil { + return + } + + transport.Name = name + impls = map[run.PluginCode]interface{}{ + run.Controller: enrollment_controller.NewTypedControllers(plugins, leader, options), + } + + return +} diff --git a/pkg/spi/instance/description.go b/pkg/spi/instance/description.go new file mode 100644 index 000000000..6b3c98bed --- /dev/null +++ b/pkg/spi/instance/description.go @@ -0,0 +1,39 @@ +package instance + +import ( + "github.com/docker/infrakit/pkg/types" +) + +// Fingerprint returns the fingerprint of the spec +func (d Description) Fingerprint() string { + return types.Fingerprint(types.AnyValueMust(d)) +} + +// Compare compares the two descriptions by ID +func (d Description) Compare(other Description) int { + if d.ID < other.ID { + return -1 + } + if d.ID > other.ID { + return 1 + } + return 0 +} + +// Descriptions is a collection of descriptions +type Descriptions []Description + +// Len is part of sort.Interface. +func (list Descriptions) Len() int { + return len(list) +} + +// Swap is part of sort.Interface. +func (list Descriptions) Swap(i, j int) { + list[i], list[j] = list[j], list[i] +} + +// Less is part of sort.Interface. It is implemented by calling the "by" closure in the sorter. +func (list Descriptions) Less(i, j int) bool { + return list[i].Compare(list[j]) < 0 +} diff --git a/pkg/template/funcs.go b/pkg/template/funcs.go index 08df2a44d..64b7a2038 100644 --- a/pkg/template/funcs.go +++ b/pkg/template/funcs.go @@ -17,6 +17,33 @@ import ( "github.com/vaughan0/go-ini" ) +// Unescape replaces all the \{\{ and \}\} back to the normal unescaped {{ and }}. +func Unescape(source []byte) []byte { + if source == nil { + return source + } + // Apply the template but we need escape the \{\{ if any + buff := source + buff = bytes.Replace(buff, []byte("\\{\\{"), []byte("{{"), -1) + buff = bytes.Replace(buff, []byte("\\}\\}"), []byte("}}"), -1) + + // YAML will escape the escapes... so twice + buff = bytes.Replace(buff, []byte("\\\\{\\\\{"), []byte("{{"), -1) + buff = bytes.Replace(buff, []byte("\\\\}\\\\}"), []byte("}}"), -1) + return buff +} + +// Escape replaces all the {{ and }} with \{\{ and \}\} to escape template content. +func Escape(source []byte) []byte { + if source == nil { + return source + } + buff := source + buff = bytes.Replace(buff, []byte("{{"), []byte("\\{\\{"), -1) + buff = bytes.Replace(buff, []byte("}}"), []byte("\\}\\}"), -1) + return buff +} + // DeepCopyObject makes a deep copy of the argument, using encoding/gob encode/decode. func DeepCopyObject(from interface{}) (interface{}, error) { var mod bytes.Buffer @@ -261,24 +288,26 @@ func (t *Template) Fetch(p string, opt ...interface{}) (string, error) { // Source 'sources' the input file at url, also inherits all the variables. func (t *Template) Source(p string, opt ...interface{}) (string, error) { - headers, context := headersAndContext(opt...) - loc := p - if strings.Index(loc, "str://") == -1 { - u, err := GetURL(t.url, p) - if err != nil { - return "", err - } - loc = u.String() - } - - prev := t.options.CustomizeFetch - t.options.CustomizeFetch = func(req *http.Request) { - setHeaders(req, headers) - if prev != nil { - prev(req) - } - } - sourced, err := NewTemplate(loc, t.options) + // headers, context := headersAndContext(opt...) + // loc := p + // if strings.Index(loc, "str://") == -1 { + // u, err := GetURL(t.url, p) + // if err != nil { + // return "", err + // } + // loc = u.String() + // } + + // prev := t.options.CustomizeFetch + // t.options.CustomizeFetch = func(req *http.Request) { + // setHeaders(req, headers) + // if prev != nil { + // prev(req) + // } + // } + // sourced, err := NewTemplate(loc, t.options) + + _, context, sourced, err := t.raw(p, opt...) if err != nil { return "", err } @@ -296,16 +325,63 @@ func (t *Template) Source(p string, opt ...interface{}) (string, error) { // Include includes the template at the url inline. func (t *Template) Include(p string, opt ...interface{}) (string, error) { + // headers, context := headersAndContext(opt...) + // loc := p + // if strings.Index(loc, "str://") == -1 { + // u, err := GetURL(t.url, p) + // if err != nil { + // return "", err + // } + // loc = u.String() + // } + + // prev := t.options.CustomizeFetch + // t.options.CustomizeFetch = func(req *http.Request) { + // setHeaders(req, headers) + // if prev != nil { + // prev(req) + // } + // } + + // included, err := NewTemplate(loc, t.options) + + _, context, included, err := t.raw(p, opt...) + if err != nil { + return "", err + } + dotCopy, err := included.forkFrom(t) + if err != nil { + return "", err + } + included.context = dotCopy + + if context == nil { + context = included.context + } + + return included.Render(context) +} + +// Raw includes the raw bytes fetched from the url inline. +func (t *Template) Raw(p string, opt ...interface{}) ([]byte, error) { + _, _, included, err := t.raw(p, opt...) + if err != nil { + return []byte{}, err + } + return included.body, nil +} + +func (t *Template) raw(p string, opt ...interface{}) (map[string][]string, interface{}, *Template, error) { + headers, context := headersAndContext(opt...) loc := p if strings.Index(loc, "str://") == -1 { u, err := GetURL(t.url, p) if err != nil { - return "", err + return nil, nil, nil, err } loc = u.String() } - prev := t.options.CustomizeFetch t.options.CustomizeFetch = func(req *http.Request) { setHeaders(req, headers) @@ -313,22 +389,8 @@ func (t *Template) Include(p string, opt ...interface{}) (string, error) { prev(req) } } - - included, err := NewTemplate(loc, t.options) - if err != nil { - return "", err - } - dotCopy, err := included.forkFrom(t) - if err != nil { - return "", err - } - included.context = dotCopy - - if context == nil { - context = included.context - } - - return included.Render(context) + tt, err := NewTemplate(loc, t.options) + return headers, context, tt, err } // DefaultFuncs returns a list of default functions for binding in the template @@ -338,7 +400,14 @@ func (t *Template) DefaultFuncs() []Function { { Name: "fetch", Description: []string{ - "Fetches a resource without evaluation as template", + "Fetches a resource without evaluation as template. The results are cached locally by the url fetched from.", + }, + Func: t.Fetch, + }, + { + Name: "raw", + Description: []string{ + "Get a resource and returns the raw bytes. The results are not cached.", }, Func: t.Fetch, }, @@ -490,6 +559,22 @@ func (t *Template) DefaultFuncs() []Function { }, Func: FromHCL, }, + { + Name: "escape", + Description: []string{ + "Escapes the content as template so it can be inlined another template", + "and not be evaluated all at the same time.", + }, + Func: Escape, + }, + { + Name: "unescape", + Description: []string{ + "Unescapes the content back into a template so that the content", + "can be evaluated.", + }, + Func: Escape, + }, { Name: "echo", Description: []string{ diff --git a/pkg/template/funcs_test.go b/pkg/template/funcs_test.go index 1b4eef567..77654dd7b 100644 --- a/pkg/template/funcs_test.go +++ b/pkg/template/funcs_test.go @@ -413,3 +413,17 @@ func TestYAML(t *testing.T) { require.Equal(t, v, v2) } + +func TestEscapeUnescape(t *testing.T) { + + text := `"\{\{.ID\}\}"` + buff := Unescape([]byte(text)) + text2 := Escape(buff) + require.Equal(t, string(text), string(text2)) + + tt, err := NewTemplateFromBytes(buff, "", Options{}) + require.NoError(t, err) + v, err := tt.Render(map[string]string{"ID": "hello"}) + require.NoError(t, err) + require.Equal(t, `"hello"`, v) +}