From e0220f8f766f842c5ad1c1efc2fbbd0c12445e4c Mon Sep 17 00:00:00 2001 From: David Chung Date: Thu, 7 Sep 2017 00:19:37 -0700 Subject: [PATCH 01/12] initial enrollment controller Signed-off-by: David Chung --- cmd/infrakit/main.go | 1 + pkg/controller/enrollment/controller.go | 73 ++++++ pkg/controller/enrollment/enroller.go | 170 ++++++++++++++ pkg/controller/enrollment/set.go | 75 ++++++ pkg/controller/enrollment/sync.go | 215 ++++++++++++++++++ pkg/controller/enrollment/types/types.go | 140 ++++++++++++ pkg/controller/enrollment/types/types_test.go | 117 ++++++++++ pkg/run/v0/enrollment/enrollment.go | 82 +++++++ pkg/spi/instance/description.go | 39 ++++ 9 files changed, 912 insertions(+) create mode 100644 pkg/controller/enrollment/controller.go create mode 100644 pkg/controller/enrollment/enroller.go create mode 100644 pkg/controller/enrollment/set.go create mode 100644 pkg/controller/enrollment/sync.go create mode 100644 pkg/controller/enrollment/types/types.go create mode 100644 pkg/controller/enrollment/types/types_test.go create mode 100644 pkg/run/v0/enrollment/enrollment.go create mode 100644 pkg/spi/instance/description.go diff --git a/cmd/infrakit/main.go b/cmd/infrakit/main.go index 4904aff56..7460276c4 100644 --- a/cmd/infrakit/main.go +++ b/cmd/infrakit/main.go @@ -41,6 +41,7 @@ import ( // Supported "kinds" _ "github.com/docker/infrakit/pkg/run/v0/aws" + _ "github.com/docker/infrakit/pkg/run/v0/enrollment" _ "github.com/docker/infrakit/pkg/run/v0/file" _ "github.com/docker/infrakit/pkg/run/v0/group" _ "github.com/docker/infrakit/pkg/run/v0/hyperkit" diff --git a/pkg/controller/enrollment/controller.go b/pkg/controller/enrollment/controller.go new file mode 100644 index 000000000..37c037c3e --- /dev/null +++ b/pkg/controller/enrollment/controller.go @@ -0,0 +1,73 @@ +package enrollment + +import ( + "github.com/docker/infrakit/pkg/controller" + enrollment "github.com/docker/infrakit/pkg/controller/enrollment/types" + "github.com/docker/infrakit/pkg/controller/internal" + "github.com/docker/infrakit/pkg/discovery" + logutil "github.com/docker/infrakit/pkg/log" + "github.com/docker/infrakit/pkg/manager" + "github.com/docker/infrakit/pkg/types" + "golang.org/x/net/context" +) + +var log = logutil.New("module", "controller/enrollment") + +// NewController returns a controller implementation +func NewController(plugins func() discovery.Plugins, leader manager.Leadership, + options enrollment.Options) controller.Controller { + return internal.NewController( + leader, + // the constructor + func(spec types.Spec) (internal.Managed, error) { + return newEnroller(plugins, leader, options), nil + }, + // the key function + func(metadata types.Metadata) string { + return metadata.Name + }, + ) +} + +// NewTypedControllers return typed controllers +func NewTypedControllers(plugins func() discovery.Plugins, leader manager.Leadership, + options enrollment.Options) func() (map[string]controller.Controller, error) { + + return (internal.NewController( + leader, + // the constructor + func(spec types.Spec) (internal.Managed, error) { + return newEnroller(plugins, leader, options), nil + }, + // the key function + func(metadata types.Metadata) string { + return metadata.Name + }, + )).ManagedObjects +} + +func (l *enroller) start() { + l.lock.Lock() + defer l.lock.Unlock() + + if l.poller != nil { + go l.poller.Run(context.Background()) + l.running = true + } +} + +func (l *enroller) started() bool { + l.lock.RLock() + defer l.lock.RUnlock() + + return l.running +} + +func (l *enroller) stop() { + l.lock.Lock() + defer l.lock.Unlock() + + if l.poller != nil { + l.poller.Stop() + } +} diff --git a/pkg/controller/enrollment/enroller.go b/pkg/controller/enrollment/enroller.go new file mode 100644 index 000000000..2e4e44de5 --- /dev/null +++ b/pkg/controller/enrollment/enroller.go @@ -0,0 +1,170 @@ +package enrollment + +import ( + "fmt" + "sync" + "time" + + "github.com/docker/infrakit/pkg/controller" + enrollment "github.com/docker/infrakit/pkg/controller/enrollment/types" + "github.com/docker/infrakit/pkg/discovery" + "github.com/docker/infrakit/pkg/manager" + "github.com/docker/infrakit/pkg/spi/group" + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/types" +) + +// enroller implements the internal.Managed interface. +// When constructed, it takes a list of instance id, or logical +// names, or a source that can provide this data, and makes sure +// a downstream instance plugin properly reflects this list. +// When there are new entries in the source, the sink's Provision +// will be called. When source entries disappear, the sink's +// Destroy will be called. +// At the moment, destroy will not invoke an flavor plugin to +// execute some kind of drain. That functionality, instead, +// could be implemented as a proxied instance plugin (using the +// interceptor pattern). +type enroller struct { + manager.Leadership + + spec types.Spec + properties enrollment.Properties + options enrollment.Options + + plugins func() discovery.Plugins + + poller *controller.Poller + ticker <-chan time.Time + lock sync.RWMutex + + groupPlugin group.Plugin // source -- where members are to be enrolled + instancePlugin instance.Plugin // sink -- where enrollments are made + running bool +} + +func newEnroller(plugins func() discovery.Plugins, + leader manager.Leadership, options enrollment.Options) *enroller { + l := &enroller{ + Leadership: leader, + plugins: plugins, + options: options, + } + + interval := l.options.SyncInterval + if interval == 0 { + interval = enrollment.DefaultSyncInterval + } + l.ticker = time.Tick(interval) + + l.poller = controller.Poll( + // This determines if the action should be taken when time is up + func() bool { + if mustTrue(l.IsLeader()) { + return true + } + return false + }, + // This does the work + func() (err error) { + return l.sync() + }, + l.ticker) + + return l +} + +func mustTrue(v bool, e error) bool { + if e != nil { + return false + } + return v +} + +// object returns the state +func (l enroller) object() (*types.Object, error) { + object := types.Object{ + Spec: l.spec, + } + // TODO build the current state + return &object, nil +} + +// Plan implements internal.Managed.Plan +func (l *enroller) Plan(operation controller.Operation, spec types.Spec) (*types.Object, *controller.Plan, error) { + + if operation == controller.Destroy { + o, _ := l.object() + return o, nil, nil + } + + if spec.Properties == nil { + return nil, nil, fmt.Errorf("missing properties") + } + properties := enrollment.Properties{} + err := spec.Properties.Decode(&properties) + if err != nil { + return nil, nil, err + } + + // TODO - build a plan + return &types.Object{ + Spec: spec, + }, &controller.Plan{}, nil + +} + +// Enforce implements internal.Managed.Enforce +func (l *enroller) Enforce(types.Spec) (*types.Object, error) { + l.lock.Lock() + defer l.lock.Unlock() + + l.start() + return l.object() +} + +// Inspect implements internal.Managed.Inspect +func (l *enroller) Inspect() (*types.Object, error) { + return l.object() +} + +// Free implements internal.Managed.Free +func (l *enroller) Free() (*types.Object, error) { + return l.Pause() +} + +// Pause implements internal.Managed.Pause +func (l *enroller) Pause() (*types.Object, error) { + l.lock.Lock() + defer l.lock.Unlock() + + if l.started() { + l.stop() + } + return l.Inspect() +} + +// Terminate implements internal.Managed.Terminate +func (l *enroller) Terminate() (*types.Object, error) { + l.lock.Lock() + defer l.lock.Unlock() + + o, err := l.object() + if err != nil { + return nil, err + } + + if l.started() { + l.stop() + } + + if l.options.DestroyOnTerminate { + if err := l.destroy(); err != nil { + // TODO - how do we handle rollback? + // For now let's not try to restore deleted entries, because + // there are no guarantees that the restore operations will succeed. + return o, err + } + } + return o, nil +} diff --git a/pkg/controller/enrollment/set.go b/pkg/controller/enrollment/set.go new file mode 100644 index 000000000..b88fa3bcf --- /dev/null +++ b/pkg/controller/enrollment/set.go @@ -0,0 +1,75 @@ +package enrollment + +import ( + "sort" + + "github.com/deckarep/golang-set" + "github.com/docker/infrakit/pkg/spi/instance" +) + +// keyFunc is a function that extracts the key from the description +type keyFunc func(instance.Description) string + +// Descriptions is a slice of descriptions +type Descriptions []instance.Description + +func (list Descriptions) index(getKey keyFunc) (map[string]instance.Description, mapset.Set) { + index := map[string]instance.Description{} + this := mapset.NewSet() + for _, n := range list { + key := getKey(n) + this.Add(key) + index[key] = n + } + return index, this +} + +// Difference returns a list of specs that is not in the receiver. +func Difference(list Descriptions, listKeyFunc keyFunc, + other Descriptions, otherKeyFunc keyFunc) Descriptions { + this, thisSet := list.index(listKeyFunc) + _, thatSet := other.index(otherKeyFunc) + return toDescriptions(listKeyFunc, thisSet.Difference(thatSet), this) +} + +func toDescriptions(keyFunc keyFunc, set mapset.Set, index map[string]instance.Description) Descriptions { + out := Descriptions{} + for n := range set.Iter() { + out = append(out, index[n.(string)]) + } + return out +} + +// Delta computes the changes necessary to make the receiver match the input: +// 1. the add Descriptions are entries to add to receiver +// 2. the remove Descriptions are entries to remove from receiver +// 3. changes are a slice of Descriptions where changes[x][0] is the original, and changes[x][1] is new +func Delta(list Descriptions, listKeyFunc keyFunc, other Descriptions, + otherKeyFunc keyFunc) (add Descriptions, remove Descriptions, changes [][2]instance.Description) { + + sort.Sort(instance.Descriptions(list)) + sort.Sort(instance.Descriptions(other)) + + this, thisSet := list.index(listKeyFunc) + that, thatSet := other.index(otherKeyFunc) + + removeSet := thisSet.Difference(thatSet) + remove = toDescriptions(listKeyFunc, removeSet, this) + + addSet := thatSet.Difference(thisSet) + add = toDescriptions(otherKeyFunc, addSet, that) + + changeSet := thisSet.Difference(removeSet) + + sort.Sort(instance.Descriptions(add)) + sort.Sort(instance.Descriptions(remove)) + + changes = [][2]instance.Description{} + for n := range changeSet.Iter() { + key := n.(string) + if this[key].Fingerprint() != that[key].Fingerprint() { + changes = append(changes, [2]instance.Description{this[key], that[key]}) + } + } + return +} diff --git a/pkg/controller/enrollment/sync.go b/pkg/controller/enrollment/sync.go new file mode 100644 index 000000000..72a3e2cdd --- /dev/null +++ b/pkg/controller/enrollment/sync.go @@ -0,0 +1,215 @@ +package enrollment + +import ( + "fmt" + + "github.com/docker/infrakit/pkg/plugin" + group_rpc "github.com/docker/infrakit/pkg/rpc/group" + instance_rpc "github.com/docker/infrakit/pkg/rpc/instance" + "github.com/docker/infrakit/pkg/spi/group" + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/template" + "github.com/docker/infrakit/pkg/types" +) + +func (l *enroller) getSourceInstances() ([]instance.Description, error) { + list, err := l.properties.List.InstanceDescriptions() + if err != nil { + + pn, err := l.properties.List.GroupPlugin() + if err != nil { + return nil, fmt.Errorf("No list source specified.") + } + + // we have a plugin name. -- eg. us-east/workers + lookup, gid := pn.GetLookupAndType() + + gp, err := l.getGroupPlugin(plugin.Name(lookup)) + if err != nil { + return nil, fmt.Errorf("Cannot connect to group %v", pn) + } + + desc, err := gp.DescribeGroup(group.ID(gid)) + if err != nil { + return nil, err + } + + list = desc.Instances + } + return list, err +} + +func (l *enroller) getEnrolledInstances() ([]instance.Description, error) { + instancePlugin, err := l.getInstancePlugin(l.properties.Instance.Plugin) + if err != nil { + return nil, err + } + + return instancePlugin.DescribeInstances(l.properties.Instance.Labels, true) +} + +// run one synchronization round +func (l *enroller) sync() error { + + instancePlugin, err := l.getInstancePlugin(l.properties.Instance.Plugin) + if err != nil { + return err + } + + source, err := l.getSourceInstances() + if err != nil { + log.Error("Error getting sources. No action", "err", err) + return nil + } + + enrolled, err := l.getEnrolledInstances() + if err != nil { + log.Error("Error getting enrollment. No action", "err", err) + return nil + } + + // We need to compute a projection for each one of the vectors and compare + // them. This is because instance IDs from the respective lists are likely + // to be different. Instead there's a join key / common attribute somewhere + // embedded in the Description.Properties. + sourceKeyFunc := func(d instance.Description) string { + // TODO render a template + return string(d.ID) + } + enrolledKeyFunc := func(d instance.Description) string { + // TODO render a template + if d.LogicalID != nil { + return string(*d.LogicalID) + } + return string(d.ID) + } + + add, remove, _ := Delta(Descriptions(enrolled), enrolledKeyFunc, Descriptions(source), sourceKeyFunc) + + log.Debug("Computed delta", "add", add, "remove", remove) + + for _, n := range add { + + props, err := l.buildProperties(n) + if err != nil { + log.Error("Cannot bulid properties to enroll", "err", err, "description", n) + continue + } + + logicalID := instance.LogicalID(string(n.ID)) + spec := instance.Spec{ + LogicalID: &logicalID, + // TODO - render a template using the value n as context? + Properties: props, + Tags: l.labels(n), + } + _, err = instancePlugin.Provision(spec) + if err != nil { + log.Error("Failed to create enrollment", "err", err, "spec", spec) + } + } + + for _, n := range remove { + err = instancePlugin.Destroy(n.ID, instance.Termination) + if err != nil { + log.Error("Failed to remove enrollment", "err", err, "id", n.ID) + } + } + return nil +} + +// buildProperties for calling enrollment / Provision +func (l *enroller) buildProperties(d instance.Description) (props *types.Any, err error) { + props = l.properties.Instance.Properties + + if props == nil { + return + } + + t, e := template.NewTemplate(props.String(), template.Options{MultiPass: false}) + if e != nil { + err = e + return + } + + view, e := t.Render(d) + if e != nil { + err = e + return + } + return types.AnyString(view), nil +} + +func (l *enroller) labels(n instance.Description) map[string]string { + labels := l.properties.Instance.Labels + if labels == nil { + labels = map[string]string{} + } + labels["infrakit.enrollment.sourceID"] = string(n.ID) + labels["infrakit.enrollment.name"] = l.spec.Metadata.Name + return labels +} + +// destroy all the instances in the enrolled instance plugin +func (l *enroller) destroy() error { + + instancePlugin, err := l.getInstancePlugin(l.properties.Instance.Plugin) + if err != nil { + return err + } + + // TODO -- add retry loop here to let Terminate block until everything is cleaned up. + { + l.lock.Lock() + + enrolled, err := l.getEnrolledInstances() + if err != nil { + return err + } + + for _, n := range enrolled { + err = instancePlugin.Destroy(n.ID, instance.Termination) + if err != nil { + log.Error("failed to destroy instance. retry next cycle.", "id", n.ID) + } + } + + defer l.lock.Unlock() + } + + return nil +} + +func (l *enroller) getGroupPlugin(name plugin.Name) (group.Plugin, error) { + if l.groupPlugin != nil { + return l.groupPlugin, nil + } + return l.connectGroupPlugin(name) +} + +func (l *enroller) connectGroupPlugin(name plugin.Name) (group.Plugin, error) { + l.lock.Lock() + defer l.lock.Unlock() + endpoint, err := l.plugins().Find(name) + if err != nil { + return nil, err + } + return group_rpc.NewClient(endpoint.Address) +} + +func (l *enroller) getInstancePlugin(name plugin.Name) (instance.Plugin, error) { + if l.instancePlugin != nil { + return l.instancePlugin, nil + } + return l.connectInstancePlugin(name) +} + +func (l *enroller) connectInstancePlugin(name plugin.Name) (instance.Plugin, error) { + l.lock.Lock() + defer l.lock.Unlock() + endpoint, err := l.plugins().Find(name) + if err != nil { + return nil, err + } + return instance_rpc.NewClient(name, endpoint.Address) +} diff --git a/pkg/controller/enrollment/types/types.go b/pkg/controller/enrollment/types/types.go new file mode 100644 index 000000000..b237d739c --- /dev/null +++ b/pkg/controller/enrollment/types/types.go @@ -0,0 +1,140 @@ +package types + +import ( + "time" + + "github.com/docker/infrakit/pkg/controller" + "github.com/docker/infrakit/pkg/plugin" + "github.com/docker/infrakit/pkg/run/depends" + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/types" +) + +const ( + // DefaultSyncInterval is the default interval for syncing enrollments + DefaultSyncInterval = 5 * time.Second +) + +func init() { + depends.Register("enroll", types.InterfaceSpec(controller.InterfaceSpec), ResolveDependencies) +} + +// ResolveDependencies returns a list of dependencies by parsing the opaque Properties blob. +func ResolveDependencies(spec types.Spec) ([]plugin.Name, error) { + if spec.Properties == nil { + return nil, nil + } + + properties := Properties{} + err := spec.Properties.Decode(&properties) + if err != nil { + return nil, err + } + + return []plugin.Name{properties.Instance.Plugin}, nil +} + +// // ListSourceUnion is a union type of possible values: +// // a list of []intsance.Description +// // a group plugin name +// type ListSourceUnion struct { +// *types.Any `json:",inline" yaml:",inline"` +// } + +// // InstanceDescriptions tries to 'cast' the union as list of descriptions +// func (u ListSourceUnion) InstanceDescriptions() ([]instance.Description, error) { +// list := []instance.Description{} +// err := u.Any.Decode(&list) +// return list, err +// } + +// // GroupPluginName tries to 'cast' the union value as a group plugin name +// func (u ListSourceUnion) GroupPlugin() (plugin.Name, error) { +// p := plugin.Name("") +// err := u.Any.Decode(&p) +// return p, err +// } + +// // UnmarshalJSON implements json.Unmarshaler +// func (u *ListSourceUnion) UnmarshalJSON(buff []byte) error { +// u.Any = types.AnyBytes(buff) +// return nil +// } + +// // MarshalJSON implements json.Marshaler +// func (u *ListSourceUnion) MarshalJSON() ([]byte, error) { +// if u.Any != nil { +// return u.Any.MarshalJSON() +// } +// return []byte{}, nil +// } + +// ListSourceUnion is a union type of possible values: +// a list of []intsance.Description +// a group plugin name +type ListSourceUnion types.Any + +// InstanceDescriptions tries to 'cast' the union as list of descriptions +func (u *ListSourceUnion) InstanceDescriptions() ([]instance.Description, error) { + list := []instance.Description{} + err := (*types.Any)(u).Decode(&list) + return list, err +} + +// GroupPluginName tries to 'cast' the union value as a group plugin name +func (u *ListSourceUnion) GroupPlugin() (plugin.Name, error) { + p := plugin.Name("") + err := (*types.Any)(u).Decode(&p) + return p, err +} + +// UnmarshalJSON implements json.Unmarshaler +func (u *ListSourceUnion) UnmarshalJSON(buff []byte) error { + *u = ListSourceUnion(*types.AnyBytes(buff)) + return nil +} + +// MarshalJSON implements json.Marshaler +func (u *ListSourceUnion) MarshalJSON() ([]byte, error) { + if u != nil { + return (*types.Any)(u).MarshalJSON() + } + return []byte{}, nil +} + +// PluginSpec has information about the plugin +type PluginSpec struct { + // Plugin is the name of the instance plugin + Plugin plugin.Name + + // Labels are the labels to use when querying for instances. This is the namespace. + Labels map[string]string + + // Properties is the properties to configure the instance with. + Properties *types.Any `json:",omitempty" yaml:",omitempty"` +} + +// Properties is the schema of the configuration in the types.Spec.Properties +type Properties struct { + + // List is a list of instance descriptions to sync + List *ListSourceUnion `json:",omitempty" yaml:",omitempty"` + + // Instance is the name of the instance plugin which will receive the + // synchronization messages of provision / destroy based on the + // changes in the List + Instance PluginSpec +} + +// Options is the controller options +type Options struct { + + // SyncInterval is the time interval between reconciliation + SyncInterval time.Duration + + // DestroyOnTerminiate tells the controller to call instace.Destroy + // for each member it is maintaining. This is a matter of ownership + // depending on use cases the controller may not *own* the data in the + // downstream instance. The controller merely reconciles it. + DestroyOnTerminate bool +} diff --git a/pkg/controller/enrollment/types/types_test.go b/pkg/controller/enrollment/types/types_test.go new file mode 100644 index 000000000..fd4c73b31 --- /dev/null +++ b/pkg/controller/enrollment/types/types_test.go @@ -0,0 +1,117 @@ +package types + +import ( + "testing" + + "github.com/docker/infrakit/pkg/plugin" + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/types" + "github.com/stretchr/testify/require" +) + +func mustSpec(s types.Spec, err error) types.Spec { + if err != nil { + panic(err) + } + return s +} + +func specFromString(s string) (types.Spec, error) { + v, err := types.AnyYAML([]byte(s)) + if err != nil { + return types.Spec{}, err + } + spec := types.Spec{} + err = v.Decode(&spec) + return spec, err +} + +func TestWriteProperties(t *testing.T) { + p := Properties{ + List: (*ListSourceUnion)(types.AnyValueMust([]instance.Description{ + {ID: instance.ID("host1")}, + {ID: instance.ID("host2")}, + })), + Instance: PluginSpec{ + Plugin: plugin.Name("simulator/compute"), + Properties: types.AnyValueMust("test"), + }, + } + + buff, err := types.AnyValueMust(p).MarshalYAML() + require.NoError(t, err) + + p2 := Properties{} + err = types.AnyYAMLMust(buff).Decode(&p2) + require.NoError(t, err) + + list1, err := p.List.InstanceDescriptions() + require.NoError(t, err) + + list2, err := p2.List.InstanceDescriptions() + require.NoError(t, err) + + require.EqualValues(t, list2, list1) +} + +func TestParseProperties(t *testing.T) { + + spec := mustSpec(specFromString(` +kind: enrollment +metadata: + name: nfs +properties: + List: + - ID: host1 + - ID: host2 + - ID: host3 + - ID: host4 + Instance: + Plugin: us-east/nfs-authorizer + Properties: + Id: \{\{ .ID \}\} +`)) + + p := Properties{} + err := spec.Properties.Decode(&p) + require.NoError(t, err) + + list, err := p.List.InstanceDescriptions() + require.NoError(t, err) + + _, err = p.List.GroupPlugin() + require.Error(t, err) + + require.EqualValues(t, []instance.Description{ + {ID: instance.ID("host1")}, + {ID: instance.ID("host2")}, + {ID: instance.ID("host3")}, + {ID: instance.ID("host4")}, + }, list) +} + +func TestParsePropertiesWithGroup(t *testing.T) { + + spec := mustSpec(specFromString(` +kind: enrollment +metadata: + name: nfs +properties: + List: us-east/workers + Instance: + Plugin: us-east/nfs-authorizer + Properties: + Id: \{\{ .ID \}\} +`)) + + p := Properties{} + err := spec.Properties.Decode(&p) + require.NoError(t, err) + + _, err = p.List.InstanceDescriptions() + require.Error(t, err) + + g, err := p.List.GroupPlugin() + require.NoError(t, err) + require.Equal(t, plugin.Name("us-east/workers"), g) +} diff --git a/pkg/run/v0/enrollment/enrollment.go b/pkg/run/v0/enrollment/enrollment.go new file mode 100644 index 000000000..14ad152b2 --- /dev/null +++ b/pkg/run/v0/enrollment/enrollment.go @@ -0,0 +1,82 @@ +package enrollment + +import ( + "time" + + enrollment_controller "github.com/docker/infrakit/pkg/controller/enrollment" + enrollment "github.com/docker/infrakit/pkg/controller/enrollment/types" + "github.com/docker/infrakit/pkg/discovery" + "github.com/docker/infrakit/pkg/launch/inproc" + logutil "github.com/docker/infrakit/pkg/log" + "github.com/docker/infrakit/pkg/manager" + "github.com/docker/infrakit/pkg/plugin" + "github.com/docker/infrakit/pkg/rpc/client" + manager_rpc "github.com/docker/infrakit/pkg/rpc/manager" + "github.com/docker/infrakit/pkg/run" + "github.com/docker/infrakit/pkg/types" +) + +const ( + // Kind is the canonical name of the plugin for starting up, etc. + Kind = "enrollment" +) + +var ( + log = logutil.New("module", "run/v0/enrollment") +) + +func init() { + inproc.Register(Kind, Run, DefaultOptions) +} + +// DefaultOptions return an Options with default values filled in. +var DefaultOptions = enrollment.Options{ + SyncInterval: 5 * time.Second, + DestroyOnTerminate: false, +} + +func leadership(plugins func() discovery.Plugins) (manager.Leadership, error) { + // Scan for a manager + pm, err := plugins().List() + if err != nil { + return nil, err + } + + for _, endpoint := range pm { + rpcClient, err := client.New(endpoint.Address, manager.InterfaceSpec) + if err == nil { + return manager_rpc.Adapt(rpcClient), nil + } + } + return nil, nil +} + +// Run runs the plugin, blocking the current thread. Error is returned immediately +// if the plugin cannot be started. +func Run(plugins func() discovery.Plugins, name plugin.Name, + config *types.Any) (transport plugin.Transport, impls map[run.PluginCode]interface{}, onStop func(), err error) { + + if plugins == nil { + panic("no plugins()") + } + + options := enrollment.Options{} + err = config.Decode(&options) + if err != nil { + return + } + + log.Info("Decoded input", "config", options) + + leader, err := leadership(plugins) + if err != nil { + return + } + + transport.Name = name + impls = map[run.PluginCode]interface{}{ + run.Controller: enrollment_controller.NewTypedControllers(plugins, leader, options), + } + + return +} diff --git a/pkg/spi/instance/description.go b/pkg/spi/instance/description.go new file mode 100644 index 000000000..e505f4b23 --- /dev/null +++ b/pkg/spi/instance/description.go @@ -0,0 +1,39 @@ +package instance + +import ( + "github.com/docker/infrakit/pkg/types" +) + +// Fingerprint returns the fingerprint of the spec +func (s Description) Fingerprint() string { + return types.Fingerprint(types.AnyValueMust(s)) +} + +// Compare compares the two descriptions by ID +func (d Description) Compare(other Description) int { + if d.ID < other.ID { + return -1 + } + if d.ID > other.ID { + return 1 + } + return 0 +} + +// Descriptions is a collection of descriptions +type Descriptions []Description + +// Len is part of sort.Interface. +func (list Descriptions) Len() int { + return len(list) +} + +// Swap is part of sort.Interface. +func (list Descriptions) Swap(i, j int) { + list[i], list[j] = list[j], list[i] +} + +// Less is part of sort.Interface. It is implemented by calling the "by" closure in the sorter. +func (list Descriptions) Less(i, j int) bool { + return list[i].Compare(list[j]) < 0 +} From dff07eb5bf7d06748fc5bee09d092231b68dd638 Mon Sep 17 00:00:00 2001 From: David Chung Date: Thu, 7 Sep 2017 09:52:01 -0700 Subject: [PATCH 02/12] fix lint / vet; merge upstream Signed-off-by: David Chung --- pkg/controller/enrollment/enroller.go | 2 +- pkg/controller/enrollment/sync.go | 4 ++-- pkg/controller/enrollment/types/types.go | 2 +- pkg/spi/instance/description.go | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/controller/enrollment/enroller.go b/pkg/controller/enrollment/enroller.go index 2e4e44de5..529ab6f8c 100644 --- a/pkg/controller/enrollment/enroller.go +++ b/pkg/controller/enrollment/enroller.go @@ -82,7 +82,7 @@ func mustTrue(v bool, e error) bool { } // object returns the state -func (l enroller) object() (*types.Object, error) { +func (l *enroller) object() (*types.Object, error) { object := types.Object{ Spec: l.spec, } diff --git a/pkg/controller/enrollment/sync.go b/pkg/controller/enrollment/sync.go index 72a3e2cdd..98666f3ea 100644 --- a/pkg/controller/enrollment/sync.go +++ b/pkg/controller/enrollment/sync.go @@ -18,7 +18,7 @@ func (l *enroller) getSourceInstances() ([]instance.Description, error) { pn, err := l.properties.List.GroupPlugin() if err != nil { - return nil, fmt.Errorf("No list source specified.") + return nil, fmt.Errorf("no list source specified") } // we have a plugin name. -- eg. us-east/workers @@ -26,7 +26,7 @@ func (l *enroller) getSourceInstances() ([]instance.Description, error) { gp, err := l.getGroupPlugin(plugin.Name(lookup)) if err != nil { - return nil, fmt.Errorf("Cannot connect to group %v", pn) + return nil, fmt.Errorf("cannot connect to group %v", pn) } desc, err := gp.DescribeGroup(group.ID(gid)) diff --git a/pkg/controller/enrollment/types/types.go b/pkg/controller/enrollment/types/types.go index b237d739c..ce7835e04 100644 --- a/pkg/controller/enrollment/types/types.go +++ b/pkg/controller/enrollment/types/types.go @@ -81,7 +81,7 @@ func (u *ListSourceUnion) InstanceDescriptions() ([]instance.Description, error) return list, err } -// GroupPluginName tries to 'cast' the union value as a group plugin name +// GroupPlugin tries to 'cast' the union value as a group plugin name func (u *ListSourceUnion) GroupPlugin() (plugin.Name, error) { p := plugin.Name("") err := (*types.Any)(u).Decode(&p) diff --git a/pkg/spi/instance/description.go b/pkg/spi/instance/description.go index e505f4b23..6b3c98bed 100644 --- a/pkg/spi/instance/description.go +++ b/pkg/spi/instance/description.go @@ -5,8 +5,8 @@ import ( ) // Fingerprint returns the fingerprint of the spec -func (s Description) Fingerprint() string { - return types.Fingerprint(types.AnyValueMust(s)) +func (d Description) Fingerprint() string { + return types.Fingerprint(types.AnyValueMust(d)) } // Compare compares the two descriptions by ID From 991288ed8d24dd74e10b65266372c8b976f4887f Mon Sep 17 00:00:00 2001 From: David Chung Date: Sat, 9 Sep 2017 10:15:00 -0700 Subject: [PATCH 03/12] tests Signed-off-by: David Chung --- pkg/controller/enrollment/controller.go | 20 ----- pkg/controller/enrollment/enroller.go | 38 +++++++-- pkg/controller/enrollment/enroller_test.go | 45 +++++++++++ pkg/controller/enrollment/set.go | 42 +++++----- pkg/controller/enrollment/set_test.go | 89 ++++++++++++++++++++++ pkg/controller/enrollment/sync.go | 13 ++-- 6 files changed, 194 insertions(+), 53 deletions(-) create mode 100644 pkg/controller/enrollment/enroller_test.go create mode 100644 pkg/controller/enrollment/set_test.go diff --git a/pkg/controller/enrollment/controller.go b/pkg/controller/enrollment/controller.go index 37c037c3e..661977c28 100644 --- a/pkg/controller/enrollment/controller.go +++ b/pkg/controller/enrollment/controller.go @@ -8,7 +8,6 @@ import ( logutil "github.com/docker/infrakit/pkg/log" "github.com/docker/infrakit/pkg/manager" "github.com/docker/infrakit/pkg/types" - "golang.org/x/net/context" ) var log = logutil.New("module", "controller/enrollment") @@ -46,28 +45,9 @@ func NewTypedControllers(plugins func() discovery.Plugins, leader manager.Leader )).ManagedObjects } -func (l *enroller) start() { - l.lock.Lock() - defer l.lock.Unlock() - - if l.poller != nil { - go l.poller.Run(context.Background()) - l.running = true - } -} - func (l *enroller) started() bool { l.lock.RLock() defer l.lock.RUnlock() return l.running } - -func (l *enroller) stop() { - l.lock.Lock() - defer l.lock.Unlock() - - if l.poller != nil { - l.poller.Stop() - } -} diff --git a/pkg/controller/enrollment/enroller.go b/pkg/controller/enrollment/enroller.go index 529ab6f8c..06454896a 100644 --- a/pkg/controller/enrollment/enroller.go +++ b/pkg/controller/enrollment/enroller.go @@ -12,6 +12,7 @@ import ( "github.com/docker/infrakit/pkg/spi/group" "github.com/docker/infrakit/pkg/spi/instance" "github.com/docker/infrakit/pkg/types" + "golang.org/x/net/context" ) // enroller implements the internal.Managed interface. @@ -119,7 +120,7 @@ func (l *enroller) Enforce(types.Spec) (*types.Object, error) { l.lock.Lock() defer l.lock.Unlock() - l.start() + l.Start() return l.object() } @@ -138,8 +139,8 @@ func (l *enroller) Pause() (*types.Object, error) { l.lock.Lock() defer l.lock.Unlock() - if l.started() { - l.stop() + if l.Running() { + l.Stop() } return l.Inspect() } @@ -154,8 +155,8 @@ func (l *enroller) Terminate() (*types.Object, error) { return nil, err } - if l.started() { - l.stop() + if l.Running() { + l.Stop() } if l.options.DestroyOnTerminate { @@ -168,3 +169,30 @@ func (l *enroller) Terminate() (*types.Object, error) { } return o, nil } + +// Start implements internal/ControlLoop.Start +func (l *enroller) Start() { + l.lock.Lock() + defer l.lock.Unlock() + + if l.poller != nil { + go l.poller.Run(context.Background()) + l.running = true + } +} + +// Start implements internal/ControlLoop.Stop +func (l *enroller) Stop() error { + l.lock.Lock() + defer l.lock.Unlock() + + if l.poller != nil { + l.poller.Stop() + } + return nil +} + +// Running implements internal/ControlLoop.Running +func (l *enroller) Running() bool { + return l.started() +} diff --git a/pkg/controller/enrollment/enroller_test.go b/pkg/controller/enrollment/enroller_test.go new file mode 100644 index 000000000..6e0ef65f4 --- /dev/null +++ b/pkg/controller/enrollment/enroller_test.go @@ -0,0 +1,45 @@ +package enrollment + +import ( + "fmt" + "testing" + + enrollment "github.com/docker/infrakit/pkg/controller/enrollment/types" + "github.com/docker/infrakit/pkg/discovery" + "github.com/docker/infrakit/pkg/plugin" + "github.com/stretchr/testify/require" +) + +type fakeLeader func() (bool, error) + +func (f fakeLeader) IsLeader() (bool, error) { + return f() +} + +type fakePlugins map[string]*plugin.Endpoint + +func (f fakePlugins) Find(name plugin.Name) (*plugin.Endpoint, error) { + lookup, _ := name.GetLookupAndType() + if v, has := f[lookup]; has { + return v, nil + } + return nil, fmt.Errorf("not found") +} + +func (f fakePlugins) List() (map[string]*plugin.Endpoint, error) { + return (map[string]*plugin.Endpoint)(f), nil +} + +func TestEnroller(t *testing.T) { + + enroller := newEnroller( + func() discovery.Plugins { + return fakePlugins{ + "test": &plugin.Endpoint{}, + } + }, + fakeLeader(func() (bool, error) { return false, nil }), + enrollment.Options{}) + + require.False(t, enroller.Running()) +} diff --git a/pkg/controller/enrollment/set.go b/pkg/controller/enrollment/set.go index b88fa3bcf..3e68875d3 100644 --- a/pkg/controller/enrollment/set.go +++ b/pkg/controller/enrollment/set.go @@ -10,10 +10,7 @@ import ( // keyFunc is a function that extracts the key from the description type keyFunc func(instance.Description) string -// Descriptions is a slice of descriptions -type Descriptions []instance.Description - -func (list Descriptions) index(getKey keyFunc) (map[string]instance.Description, mapset.Set) { +func index(list instance.Descriptions, getKey keyFunc) (map[string]instance.Description, mapset.Set) { index := map[string]instance.Description{} this := mapset.NewSet() for _, n := range list { @@ -25,41 +22,42 @@ func (list Descriptions) index(getKey keyFunc) (map[string]instance.Description, } // Difference returns a list of specs that is not in the receiver. -func Difference(list Descriptions, listKeyFunc keyFunc, - other Descriptions, otherKeyFunc keyFunc) Descriptions { - this, thisSet := list.index(listKeyFunc) - _, thatSet := other.index(otherKeyFunc) +func Difference(list instance.Descriptions, listKeyFunc keyFunc, + other instance.Descriptions, otherKeyFunc keyFunc) instance.Descriptions { + this, thisSet := index(list, listKeyFunc) + _, thatSet := index(other, otherKeyFunc) return toDescriptions(listKeyFunc, thisSet.Difference(thatSet), this) } -func toDescriptions(keyFunc keyFunc, set mapset.Set, index map[string]instance.Description) Descriptions { - out := Descriptions{} +func toDescriptions(keyFunc keyFunc, set mapset.Set, index map[string]instance.Description) instance.Descriptions { + out := instance.Descriptions{} for n := range set.Iter() { out = append(out, index[n.(string)]) } + sort.Sort(out) return out } -// Delta computes the changes necessary to make the receiver match the input: -// 1. the add Descriptions are entries to add to receiver -// 2. the remove Descriptions are entries to remove from receiver +// Delta computes the changes necessary to make the list match other: +// 1. the add Descriptions are entries to add to other +// 2. the remove Descriptions are entries to remove from other // 3. changes are a slice of Descriptions where changes[x][0] is the original, and changes[x][1] is new -func Delta(list Descriptions, listKeyFunc keyFunc, other Descriptions, - otherKeyFunc keyFunc) (add Descriptions, remove Descriptions, changes [][2]instance.Description) { +func Delta(list instance.Descriptions, listKeyFunc keyFunc, other instance.Descriptions, + otherKeyFunc keyFunc) (add instance.Descriptions, remove instance.Descriptions, changes [][2]instance.Description) { sort.Sort(instance.Descriptions(list)) sort.Sort(instance.Descriptions(other)) - this, thisSet := list.index(listKeyFunc) - that, thatSet := other.index(otherKeyFunc) + this, thisSet := index(list, listKeyFunc) + that, thatSet := index(other, otherKeyFunc) - removeSet := thisSet.Difference(thatSet) - remove = toDescriptions(listKeyFunc, removeSet, this) + removeSet := thatSet.Difference(thisSet) + remove = toDescriptions(otherKeyFunc, removeSet, that) - addSet := thatSet.Difference(thisSet) - add = toDescriptions(otherKeyFunc, addSet, that) + addSet := thisSet.Difference(thatSet) + add = toDescriptions(listKeyFunc, addSet, this) - changeSet := thisSet.Difference(removeSet) + changeSet := thatSet.Difference(removeSet) sort.Sort(instance.Descriptions(add)) sort.Sort(instance.Descriptions(remove)) diff --git a/pkg/controller/enrollment/set_test.go b/pkg/controller/enrollment/set_test.go new file mode 100644 index 000000000..4d0592934 --- /dev/null +++ b/pkg/controller/enrollment/set_test.go @@ -0,0 +1,89 @@ +package enrollment + +import ( + "testing" + + "github.com/docker/infrakit/pkg/spi/instance" + "github.com/stretchr/testify/require" +) + +func TestSet(t *testing.T) { + + a := instance.Descriptions{ + {ID: instance.ID("1")}, + {ID: instance.ID("2")}, + {ID: instance.ID("3")}, + {ID: instance.ID("4")}, + {ID: instance.ID("5")}, + } + + b := instance.Descriptions{ + {ID: instance.ID("2")}, + {ID: instance.ID("3")}, + {ID: instance.ID("5")}, + {ID: instance.ID("6")}, + } + + keyFunc := func(i instance.Description) string { + return string(i.ID) + } + + diff := Difference(a, keyFunc, b, keyFunc) + require.Equal(t, instance.Descriptions{ + {ID: instance.ID("1")}, + {ID: instance.ID("4")}, + }, diff) + + diff2 := Difference(b, keyFunc, a, keyFunc) + require.Equal(t, instance.Descriptions{ + {ID: instance.ID("6")}, + }, diff2) + + add, remove, _ := Delta(instance.Descriptions(a), keyFunc, + instance.Descriptions(b), keyFunc) + require.Equal(t, instance.Descriptions{a[0], a[3]}, add) + require.Equal(t, instance.Descriptions{b[3]}, remove) +} + +func logicalID(s string) *instance.LogicalID { + id := instance.LogicalID(s) + return &id +} + +func TestSetKeyFuncs(t *testing.T) { + + a := instance.Descriptions{ + {ID: instance.ID("0x"), LogicalID: logicalID("0")}, + {ID: instance.ID("1x"), LogicalID: logicalID("1")}, + {ID: instance.ID("2x"), LogicalID: logicalID("2")}, + {ID: instance.ID("3x"), LogicalID: logicalID("3")}, + {ID: instance.ID("4x"), LogicalID: logicalID("4")}, + } + + b := instance.Descriptions{ + {ID: instance.ID("0")}, + {ID: instance.ID("2")}, + {ID: instance.ID("3")}, + {ID: instance.ID("5")}, + {ID: instance.ID("6")}, + } + + aKeyFunc := func(i instance.Description) string { + return string(*i.LogicalID) + } + bKeyFunc := func(i instance.Description) string { + return string(i.ID) + } + + diff := Difference(a, aKeyFunc, b, bKeyFunc) + require.Equal(t, instance.Descriptions{a[1], a[4]}, diff) + + diff2 := Difference(b, bKeyFunc, a, aKeyFunc) + require.Equal(t, instance.Descriptions{b[3], b[4]}, diff2) + + add, remove, change := Delta(instance.Descriptions(a), aKeyFunc, + instance.Descriptions(b), bKeyFunc) + require.Equal(t, instance.Descriptions{a[1], a[4]}, add) + require.Equal(t, instance.Descriptions{b[3], b[4]}, remove) + require.Equal(t, 3, len(change)) +} diff --git a/pkg/controller/enrollment/sync.go b/pkg/controller/enrollment/sync.go index 98666f3ea..58051999d 100644 --- a/pkg/controller/enrollment/sync.go +++ b/pkg/controller/enrollment/sync.go @@ -51,11 +51,6 @@ func (l *enroller) getEnrolledInstances() ([]instance.Description, error) { // run one synchronization round func (l *enroller) sync() error { - instancePlugin, err := l.getInstancePlugin(l.properties.Instance.Plugin) - if err != nil { - return err - } - source, err := l.getSourceInstances() if err != nil { log.Error("Error getting sources. No action", "err", err) @@ -84,10 +79,16 @@ func (l *enroller) sync() error { return string(d.ID) } - add, remove, _ := Delta(Descriptions(enrolled), enrolledKeyFunc, Descriptions(source), sourceKeyFunc) + add, remove, _ := Delta(instance.Descriptions(enrolled), enrolledKeyFunc, instance.Descriptions(source), sourceKeyFunc) log.Debug("Computed delta", "add", add, "remove", remove) + instancePlugin, err := l.getInstancePlugin(l.properties.Instance.Plugin) + if err != nil { + log.Error("cannot get instance plugin", "err", err) + return err + } + for _, n := range add { props, err := l.buildProperties(n) From c0b4022ecb72d518d27a424742cca0d0243fc3e9 Mon Sep 17 00:00:00 2001 From: David Chung Date: Sat, 9 Sep 2017 12:57:28 -0700 Subject: [PATCH 04/12] add tests Signed-off-by: David Chung --- pkg/controller/enrollment/enroller.go | 33 +++++- pkg/controller/enrollment/enroller_test.go | 96 ++++++++++++++++ pkg/controller/enrollment/set.go | 8 +- pkg/controller/enrollment/set_test.go | 12 +- pkg/controller/enrollment/sync.go | 122 +++++++++++++++------ pkg/controller/enrollment/types/types.go | 6 + 6 files changed, 237 insertions(+), 40 deletions(-) diff --git a/pkg/controller/enrollment/enroller.go b/pkg/controller/enrollment/enroller.go index 06454896a..803754a7d 100644 --- a/pkg/controller/enrollment/enroller.go +++ b/pkg/controller/enrollment/enroller.go @@ -11,6 +11,7 @@ import ( "github.com/docker/infrakit/pkg/manager" "github.com/docker/infrakit/pkg/spi/group" "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/template" "github.com/docker/infrakit/pkg/types" "golang.org/x/net/context" ) @@ -42,6 +43,11 @@ type enroller struct { groupPlugin group.Plugin // source -- where members are to be enrolled instancePlugin instance.Plugin // sink -- where enrollments are made running bool + + // template that we use to render with a source instance.Description to get the link Key + sourceKeySelectorTemplate *template.Template + // template used to render the enrollment's Provision propertiesx + enrollmentPropertiesTemplate *template.Template } func newEnroller(plugins func() discovery.Plugins, @@ -115,11 +121,36 @@ func (l *enroller) Plan(operation controller.Operation, spec types.Spec) (*types } +func (l *enroller) updateSpec(spec types.Spec) error { + if spec.Options != nil { + options := enrollment.Options{} + if err := spec.Options.Decode(&options); err != nil { + return err + } + l.options = options + } + + if spec.Properties != nil { + properties := enrollment.Properties{} + if err := spec.Properties.Decode(&properties); err != nil { + return err + } + l.properties = properties + } + + l.spec = spec + return nil +} + // Enforce implements internal.Managed.Enforce -func (l *enroller) Enforce(types.Spec) (*types.Object, error) { +func (l *enroller) Enforce(spec types.Spec) (*types.Object, error) { l.lock.Lock() defer l.lock.Unlock() + if err := l.updateSpec(spec); err != nil { + return nil, err + } + l.Start() return l.object() } diff --git a/pkg/controller/enrollment/enroller_test.go b/pkg/controller/enrollment/enroller_test.go index 6e0ef65f4..d70d31341 100644 --- a/pkg/controller/enrollment/enroller_test.go +++ b/pkg/controller/enrollment/enroller_test.go @@ -7,6 +7,11 @@ import ( enrollment "github.com/docker/infrakit/pkg/controller/enrollment/types" "github.com/docker/infrakit/pkg/discovery" "github.com/docker/infrakit/pkg/plugin" + "github.com/docker/infrakit/pkg/spi/group" + "github.com/docker/infrakit/pkg/spi/instance" + group_test "github.com/docker/infrakit/pkg/testing/group" + instance_test "github.com/docker/infrakit/pkg/testing/instance" + "github.com/docker/infrakit/pkg/types" "github.com/stretchr/testify/require" ) @@ -32,6 +37,20 @@ func (f fakePlugins) List() (map[string]*plugin.Endpoint, error) { func TestEnroller(t *testing.T) { + source := []instance.Description{ + {ID: instance.ID("h1")}, + {ID: instance.ID("h2")}, + {ID: instance.ID("h3")}, + } + + enrolled := []instance.Description{ + {ID: instance.ID("nfs1"), Tags: map[string]string{"infrakit.enrollment.sourceID": "h1"}}, + {ID: instance.ID("nfs2"), Tags: map[string]string{"infrakit.enrollment.sourceID": "h2"}}, + {ID: instance.ID("nfs5"), Tags: map[string]string{"infrakit.enrollment.sourceID": "h5"}}, + } + + seen := make(chan []interface{}, 10) + enroller := newEnroller( func() discovery.Plugins { return fakePlugins{ @@ -40,6 +59,83 @@ func TestEnroller(t *testing.T) { }, fakeLeader(func() (bool, error) { return false, nil }), enrollment.Options{}) + enroller.groupPlugin = &group_test.Plugin{ + DoDescribeGroup: func(gid group.ID) (group.Description, error) { + result := group.Description{Instances: source} + return result, nil + }, + } + enroller.instancePlugin = &instance_test.Plugin{ + DoDescribeInstances: func(t map[string]string, p bool) ([]instance.Description, error) { + return enrolled, nil + }, + DoProvision: func(spec instance.Spec) (*instance.ID, error) { + + seen <- []interface{}{spec, "Provision"} + return nil, nil + }, + DoDestroy: func(id instance.ID, ctx instance.Context) error { + + seen <- []interface{}{id, ctx, "Destroy"} + return nil + }, + } require.False(t, enroller.Running()) + + spec := types.Spec{} + require.NoError(t, types.AnyYAMLMust([]byte(` +kind: enrollment +metadata: + name: nfs +properties: + List: group/workers + Instance: + Plugin: nfs/authorization + Properties: + host: \{\{.ID\}\} + iops: 10 +options: + SourceKeySelector: \{\{.ID\}\} + +`)).Decode(&spec)) + + require.NoError(t, enroller.updateSpec(spec)) + + st, err := enroller.getSourceKeySelectorTemplate() + require.NoError(t, err) + require.NotNil(t, st) + + et, err := enroller.getEnrollmentPropertiesTemplate() + require.NoError(t, err) + require.NotNil(t, et) + + require.NoError(t, err) + + s, err := enroller.getSourceInstances() + require.NoError(t, err) + require.Equal(t, source, s) + + found, err := enroller.getEnrolledInstances() + require.NoError(t, err) + require.Equal(t, enrolled, found) + + require.NoError(t, enroller.sync()) + + // check the provision and destroy calls + require.Equal(t, []interface{}{ + instance.Spec{ + Properties: types.AnyString(`{"host":"h3","iops":10}`), + Tags: map[string]string{ + "infrakit.enrollment.sourceID": "h3", + "infrakit.enrollment.name": "nfs", + }, + }, + "Provision", + }, <-seen) + require.Equal(t, []interface{}{ + instance.ID("nfs5"), + instance.Termination, + "Destroy", + }, <-seen) } diff --git a/pkg/controller/enrollment/set.go b/pkg/controller/enrollment/set.go index 3e68875d3..81fb7b91d 100644 --- a/pkg/controller/enrollment/set.go +++ b/pkg/controller/enrollment/set.go @@ -8,13 +8,17 @@ import ( ) // keyFunc is a function that extracts the key from the description -type keyFunc func(instance.Description) string +type keyFunc func(instance.Description) (string, error) func index(list instance.Descriptions, getKey keyFunc) (map[string]instance.Description, mapset.Set) { index := map[string]instance.Description{} this := mapset.NewSet() for _, n := range list { - key := getKey(n) + key, err := getKey(n) + if err != nil { + log.Error("cannot index entry", "instane.Description", n) + continue + } this.Add(key) index[key] = n } diff --git a/pkg/controller/enrollment/set_test.go b/pkg/controller/enrollment/set_test.go index 4d0592934..c5e3dc284 100644 --- a/pkg/controller/enrollment/set_test.go +++ b/pkg/controller/enrollment/set_test.go @@ -24,8 +24,8 @@ func TestSet(t *testing.T) { {ID: instance.ID("6")}, } - keyFunc := func(i instance.Description) string { - return string(i.ID) + keyFunc := func(i instance.Description) (string, error) { + return string(i.ID), nil } diff := Difference(a, keyFunc, b, keyFunc) @@ -68,11 +68,11 @@ func TestSetKeyFuncs(t *testing.T) { {ID: instance.ID("6")}, } - aKeyFunc := func(i instance.Description) string { - return string(*i.LogicalID) + aKeyFunc := func(i instance.Description) (string, error) { + return string(*i.LogicalID), nil } - bKeyFunc := func(i instance.Description) string { - return string(i.ID) + bKeyFunc := func(i instance.Description) (string, error) { + return string(i.ID), nil } diff := Difference(a, aKeyFunc, b, bKeyFunc) diff --git a/pkg/controller/enrollment/sync.go b/pkg/controller/enrollment/sync.go index 58051999d..84a733f2f 100644 --- a/pkg/controller/enrollment/sync.go +++ b/pkg/controller/enrollment/sync.go @@ -1,6 +1,7 @@ package enrollment import ( + "bytes" "fmt" "github.com/docker/infrakit/pkg/plugin" @@ -34,7 +35,7 @@ func (l *enroller) getSourceInstances() ([]instance.Description, error) { return nil, err } - list = desc.Instances + return desc.Instances, nil } return list, err } @@ -48,6 +49,56 @@ func (l *enroller) getEnrolledInstances() ([]instance.Description, error) { return instancePlugin.DescribeInstances(l.properties.Instance.Labels, true) } +func buildTemplate(source []byte) (*template.Template, error) { + if source == nil { + return nil, nil + } + // Apply the template but we need escape the \{\{ if any + buff := source + buff = bytes.Replace(buff, []byte("\\{\\{"), []byte("{{"), -1) + buff = bytes.Replace(buff, []byte("\\}\\}"), []byte("}}"), -1) + + // YAML will escape the escapes... so twice + buff = bytes.Replace(buff, []byte("\\\\{\\\\{"), []byte("{{"), -1) + buff = bytes.Replace(buff, []byte("\\\\}\\\\}"), []byte("}}"), -1) + + return template.NewTemplate("str://"+string(buff), template.Options{MultiPass: false}) +} + +func (l *enroller) getSourceKeySelectorTemplate() (*template.Template, error) { + l.lock.Lock() + defer l.lock.Unlock() + + if l.options.SourceKeySelector != "" { + if l.sourceKeySelectorTemplate == nil { + t, err := buildTemplate([]byte(l.options.SourceKeySelector)) + if err != nil { + return nil, err + } + l.sourceKeySelectorTemplate = t + } + } + + return l.sourceKeySelectorTemplate, nil +} + +func (l *enroller) getEnrollmentPropertiesTemplate() (*template.Template, error) { + l.lock.Lock() + defer l.lock.Unlock() + + if l.properties.Instance.Properties != nil { + if l.enrollmentPropertiesTemplate == nil { + t, err := buildTemplate(l.properties.Instance.Properties.Bytes()) + if err != nil { + return nil, err + } + l.enrollmentPropertiesTemplate = t + } + } + + return l.enrollmentPropertiesTemplate, nil +} + // run one synchronization round func (l *enroller) sync() error { @@ -67,21 +118,39 @@ func (l *enroller) sync() error { // them. This is because instance IDs from the respective lists are likely // to be different. Instead there's a join key / common attribute somewhere // embedded in the Description.Properties. - sourceKeyFunc := func(d instance.Description) string { - // TODO render a template - return string(d.ID) - } - enrolledKeyFunc := func(d instance.Description) string { - // TODO render a template - if d.LogicalID != nil { - return string(*d.LogicalID) + sourceKeyFunc := func(d instance.Description) (string, error) { + + t, err := l.getSourceKeySelectorTemplate() + if err != nil { + return "", err } - return string(d.ID) + if t != nil { + view, err := t.Render(d) + if err != nil { + return "", err + } + return view, nil + } + + return string(d.ID), nil } - add, remove, _ := Delta(instance.Descriptions(enrolled), enrolledKeyFunc, instance.Descriptions(source), sourceKeyFunc) + // As long as the downstream enrollment records are labeled correctly we + // can even support 'importing' out-of-band created enrollment records + enrolledKeyFunc := func(d instance.Description) (string, error) { + if v, has := d.Tags["infrakit.enrollment.sourceID"]; has { + return v, nil + } + return "", fmt.Errorf("not-matched:%v", d.ID) + } + + // compute the delta required to make enrolled look like source + add, remove, _ := Delta( + instance.Descriptions(source), sourceKeyFunc, + instance.Descriptions(enrolled), enrolledKeyFunc, + ) - log.Debug("Computed delta", "add", add, "remove", remove) + log.Info("Computed delta", "add", add, "remove", remove) instancePlugin, err := l.getInstancePlugin(l.properties.Instance.Plugin) if err != nil { @@ -96,11 +165,7 @@ func (l *enroller) sync() error { log.Error("Cannot bulid properties to enroll", "err", err, "description", n) continue } - - logicalID := instance.LogicalID(string(n.ID)) spec := instance.Spec{ - LogicalID: &logicalID, - // TODO - render a template using the value n as context? Properties: props, Tags: l.labels(n), } @@ -114,29 +179,24 @@ func (l *enroller) sync() error { err = instancePlugin.Destroy(n.ID, instance.Termination) if err != nil { log.Error("Failed to remove enrollment", "err", err, "id", n.ID) + continue // get them next time... } } return nil } // buildProperties for calling enrollment / Provision -func (l *enroller) buildProperties(d instance.Description) (props *types.Any, err error) { - props = l.properties.Instance.Properties - - if props == nil { - return +func (l *enroller) buildProperties(d instance.Description) (*types.Any, error) { + t, err := l.getEnrollmentPropertiesTemplate() + if err != nil { + return nil, err } - - t, e := template.NewTemplate(props.String(), template.Options{MultiPass: false}) - if e != nil { - err = e - return + if t == nil { + return types.AnyValue(d) } - - view, e := t.Render(d) - if e != nil { - err = e - return + view, err := t.Render(d) + if err != nil { + return nil, err } return types.AnyString(view), nil } diff --git a/pkg/controller/enrollment/types/types.go b/pkg/controller/enrollment/types/types.go index ce7835e04..71b625ba2 100644 --- a/pkg/controller/enrollment/types/types.go +++ b/pkg/controller/enrollment/types/types.go @@ -129,6 +129,12 @@ type Properties struct { // Options is the controller options type Options struct { + // SourceKeySelector is a string template for selecting the join key from + // a source instance.Description. This selector template should use escapes + // so that the template {{ and }} are preserved. For example, + // SourceKeySelector: \{\{ .ID \}\} # selects the ID field. + SourceKeySelector string + // SyncInterval is the time interval between reconciliation SyncInterval time.Duration From 26d2195bbfc712f86f9c305f61f01da077a05481 Mon Sep 17 00:00:00 2001 From: David Chung Date: Sat, 9 Sep 2017 17:14:15 -0700 Subject: [PATCH 05/12] fix docs check Signed-off-by: David Chung --- docs/plugins/instance.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/plugins/instance.md b/docs/plugins/instance.md index 176f74efe..cb9714994 100644 --- a/docs/plugins/instance.md +++ b/docs/plugins/instance.md @@ -1,6 +1,6 @@ # Instance plugin API - + ## API From 0fda83d3c43e95aa31e03c551aaaaf820dd28369 Mon Sep 17 00:00:00 2001 From: David Chung Date: Sun, 10 Sep 2017 01:34:20 -0700 Subject: [PATCH 06/12] more tests; move common template functions to template pkg Signed-off-by: David Chung --- pkg/controller/enrollment/enroller.go | 2 +- pkg/controller/enrollment/sync.go | 22 +-- pkg/controller/enrollment/types/types.go | 48 ++---- pkg/controller/enrollment/types/types_test.go | 49 ++++++ pkg/run/v0/enrollment/enrollment.go | 4 +- pkg/template/funcs.go | 159 ++++++++++++++---- pkg/template/funcs_test.go | 14 ++ 7 files changed, 200 insertions(+), 98 deletions(-) diff --git a/pkg/controller/enrollment/enroller.go b/pkg/controller/enrollment/enroller.go index 803754a7d..17e9179cd 100644 --- a/pkg/controller/enrollment/enroller.go +++ b/pkg/controller/enrollment/enroller.go @@ -58,7 +58,7 @@ func newEnroller(plugins func() discovery.Plugins, options: options, } - interval := l.options.SyncInterval + interval := l.options.SyncInterval.Duration() if interval == 0 { interval = enrollment.DefaultSyncInterval } diff --git a/pkg/controller/enrollment/sync.go b/pkg/controller/enrollment/sync.go index 84a733f2f..98bfd979c 100644 --- a/pkg/controller/enrollment/sync.go +++ b/pkg/controller/enrollment/sync.go @@ -1,7 +1,6 @@ package enrollment import ( - "bytes" "fmt" "github.com/docker/infrakit/pkg/plugin" @@ -9,7 +8,6 @@ import ( instance_rpc "github.com/docker/infrakit/pkg/rpc/instance" "github.com/docker/infrakit/pkg/spi/group" "github.com/docker/infrakit/pkg/spi/instance" - "github.com/docker/infrakit/pkg/template" "github.com/docker/infrakit/pkg/types" ) @@ -49,29 +47,13 @@ func (l *enroller) getEnrolledInstances() ([]instance.Description, error) { return instancePlugin.DescribeInstances(l.properties.Instance.Labels, true) } -func buildTemplate(source []byte) (*template.Template, error) { - if source == nil { - return nil, nil - } - // Apply the template but we need escape the \{\{ if any - buff := source - buff = bytes.Replace(buff, []byte("\\{\\{"), []byte("{{"), -1) - buff = bytes.Replace(buff, []byte("\\}\\}"), []byte("}}"), -1) - - // YAML will escape the escapes... so twice - buff = bytes.Replace(buff, []byte("\\\\{\\\\{"), []byte("{{"), -1) - buff = bytes.Replace(buff, []byte("\\\\}\\\\}"), []byte("}}"), -1) - - return template.NewTemplate("str://"+string(buff), template.Options{MultiPass: false}) -} - func (l *enroller) getSourceKeySelectorTemplate() (*template.Template, error) { l.lock.Lock() defer l.lock.Unlock() if l.options.SourceKeySelector != "" { if l.sourceKeySelectorTemplate == nil { - t, err := buildTemplate([]byte(l.options.SourceKeySelector)) + t, err := enrollment.TemplateFrom([]byte(l.options.SourceKeySelector)) if err != nil { return nil, err } @@ -88,7 +70,7 @@ func (l *enroller) getEnrollmentPropertiesTemplate() (*template.Template, error) if l.properties.Instance.Properties != nil { if l.enrollmentPropertiesTemplate == nil { - t, err := buildTemplate(l.properties.Instance.Properties.Bytes()) + t, err := enrollment.TemplateFrom(l.properties.Instance.Properties.Bytes()) if err != nil { return nil, err } diff --git a/pkg/controller/enrollment/types/types.go b/pkg/controller/enrollment/types/types.go index 71b625ba2..3d1a6db42 100644 --- a/pkg/controller/enrollment/types/types.go +++ b/pkg/controller/enrollment/types/types.go @@ -1,12 +1,14 @@ package types import ( + "bytes" "time" "github.com/docker/infrakit/pkg/controller" "github.com/docker/infrakit/pkg/plugin" "github.com/docker/infrakit/pkg/run/depends" "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/template" "github.com/docker/infrakit/pkg/types" ) @@ -34,41 +36,6 @@ func ResolveDependencies(spec types.Spec) ([]plugin.Name, error) { return []plugin.Name{properties.Instance.Plugin}, nil } -// // ListSourceUnion is a union type of possible values: -// // a list of []intsance.Description -// // a group plugin name -// type ListSourceUnion struct { -// *types.Any `json:",inline" yaml:",inline"` -// } - -// // InstanceDescriptions tries to 'cast' the union as list of descriptions -// func (u ListSourceUnion) InstanceDescriptions() ([]instance.Description, error) { -// list := []instance.Description{} -// err := u.Any.Decode(&list) -// return list, err -// } - -// // GroupPluginName tries to 'cast' the union value as a group plugin name -// func (u ListSourceUnion) GroupPlugin() (plugin.Name, error) { -// p := plugin.Name("") -// err := u.Any.Decode(&p) -// return p, err -// } - -// // UnmarshalJSON implements json.Unmarshaler -// func (u *ListSourceUnion) UnmarshalJSON(buff []byte) error { -// u.Any = types.AnyBytes(buff) -// return nil -// } - -// // MarshalJSON implements json.Marshaler -// func (u *ListSourceUnion) MarshalJSON() ([]byte, error) { -// if u.Any != nil { -// return u.Any.MarshalJSON() -// } -// return []byte{}, nil -// } - // ListSourceUnion is a union type of possible values: // a list of []intsance.Description // a group plugin name @@ -135,8 +102,9 @@ type Options struct { // SourceKeySelector: \{\{ .ID \}\} # selects the ID field. SourceKeySelector string - // SyncInterval is the time interval between reconciliation - SyncInterval time.Duration + // SyncInterval is the time interval between reconciliation. Syntax + // is go's time.Duration string representation (e.g. 1m, 30s) + SyncInterval types.Duration // DestroyOnTerminiate tells the controller to call instace.Destroy // for each member it is maintaining. This is a matter of ownership @@ -144,3 +112,9 @@ type Options struct { // downstream instance. The controller merely reconciles it. DestroyOnTerminate bool } + +// TemplateFrom returns a template after it has un-escapes any escape sequences +func TemplateFrom(source []byte) (*template.Template, error) { + buff := template.Unescape(source) + return template.NewTemplate("str://"+string(buff), template.Options{MultiPass: false}) +} diff --git a/pkg/controller/enrollment/types/types_test.go b/pkg/controller/enrollment/types/types_test.go index fd4c73b31..6eea1d08e 100644 --- a/pkg/controller/enrollment/types/types_test.go +++ b/pkg/controller/enrollment/types/types_test.go @@ -114,4 +114,53 @@ properties: g, err := p.List.GroupPlugin() require.NoError(t, err) require.Equal(t, plugin.Name("us-east/workers"), g) + + spec2 := types.Spec{} + require.NoError(t, types.AnyString(` +{ + "kind": "enrollment", + "metadata" : { + "name" : "nfs" + }, + "options" : { + "SourceKeySelector" : "\\{\\{.ID\\}\\}" + }, + "properties" : { + "List" : [ + { "ID" : "h1" }, + { "ID" : "h2" } + ], + "Instance" : { + "Plugin" : "us-east/nfs-authorizer", + "Properties" : { + "ID" : "\\{\\{.ID\\}\\}" + } + } + } +} +`).Decode(&spec2)) + + tt, err := TemplateFrom(spec2.Options.Bytes()) + require.NoError(t, err) + + obj := map[string]string{"ID": "hello"} + view, err := tt.Render(obj) + require.NoError(t, err) + require.Equal(t, "{\n \"SourceKeySelector\" : \"hello\"\n }", view) + + type properties struct { + Instance struct { + Properties struct { + ID string + } + } + } + + pp := properties{} + tt, err = TemplateFrom(spec2.Properties.Bytes()) + require.NoError(t, err) + view, err = tt.Render(obj) + require.NoError(t, err) + require.NoError(t, types.AnyString(view).Decode(&pp)) + require.Equal(t, "hello", pp.Instance.Properties.ID) } diff --git a/pkg/run/v0/enrollment/enrollment.go b/pkg/run/v0/enrollment/enrollment.go index 14ad152b2..ceb928613 100644 --- a/pkg/run/v0/enrollment/enrollment.go +++ b/pkg/run/v0/enrollment/enrollment.go @@ -1,8 +1,6 @@ package enrollment import ( - "time" - enrollment_controller "github.com/docker/infrakit/pkg/controller/enrollment" enrollment "github.com/docker/infrakit/pkg/controller/enrollment/types" "github.com/docker/infrakit/pkg/discovery" @@ -31,7 +29,7 @@ func init() { // DefaultOptions return an Options with default values filled in. var DefaultOptions = enrollment.Options{ - SyncInterval: 5 * time.Second, + SyncInterval: types.Duration(enrollment.DefaultSyncInterval), DestroyOnTerminate: false, } diff --git a/pkg/template/funcs.go b/pkg/template/funcs.go index 08df2a44d..64b7a2038 100644 --- a/pkg/template/funcs.go +++ b/pkg/template/funcs.go @@ -17,6 +17,33 @@ import ( "github.com/vaughan0/go-ini" ) +// Unescape replaces all the \{\{ and \}\} back to the normal unescaped {{ and }}. +func Unescape(source []byte) []byte { + if source == nil { + return source + } + // Apply the template but we need escape the \{\{ if any + buff := source + buff = bytes.Replace(buff, []byte("\\{\\{"), []byte("{{"), -1) + buff = bytes.Replace(buff, []byte("\\}\\}"), []byte("}}"), -1) + + // YAML will escape the escapes... so twice + buff = bytes.Replace(buff, []byte("\\\\{\\\\{"), []byte("{{"), -1) + buff = bytes.Replace(buff, []byte("\\\\}\\\\}"), []byte("}}"), -1) + return buff +} + +// Escape replaces all the {{ and }} with \{\{ and \}\} to escape template content. +func Escape(source []byte) []byte { + if source == nil { + return source + } + buff := source + buff = bytes.Replace(buff, []byte("{{"), []byte("\\{\\{"), -1) + buff = bytes.Replace(buff, []byte("}}"), []byte("\\}\\}"), -1) + return buff +} + // DeepCopyObject makes a deep copy of the argument, using encoding/gob encode/decode. func DeepCopyObject(from interface{}) (interface{}, error) { var mod bytes.Buffer @@ -261,24 +288,26 @@ func (t *Template) Fetch(p string, opt ...interface{}) (string, error) { // Source 'sources' the input file at url, also inherits all the variables. func (t *Template) Source(p string, opt ...interface{}) (string, error) { - headers, context := headersAndContext(opt...) - loc := p - if strings.Index(loc, "str://") == -1 { - u, err := GetURL(t.url, p) - if err != nil { - return "", err - } - loc = u.String() - } - - prev := t.options.CustomizeFetch - t.options.CustomizeFetch = func(req *http.Request) { - setHeaders(req, headers) - if prev != nil { - prev(req) - } - } - sourced, err := NewTemplate(loc, t.options) + // headers, context := headersAndContext(opt...) + // loc := p + // if strings.Index(loc, "str://") == -1 { + // u, err := GetURL(t.url, p) + // if err != nil { + // return "", err + // } + // loc = u.String() + // } + + // prev := t.options.CustomizeFetch + // t.options.CustomizeFetch = func(req *http.Request) { + // setHeaders(req, headers) + // if prev != nil { + // prev(req) + // } + // } + // sourced, err := NewTemplate(loc, t.options) + + _, context, sourced, err := t.raw(p, opt...) if err != nil { return "", err } @@ -296,16 +325,63 @@ func (t *Template) Source(p string, opt ...interface{}) (string, error) { // Include includes the template at the url inline. func (t *Template) Include(p string, opt ...interface{}) (string, error) { + // headers, context := headersAndContext(opt...) + // loc := p + // if strings.Index(loc, "str://") == -1 { + // u, err := GetURL(t.url, p) + // if err != nil { + // return "", err + // } + // loc = u.String() + // } + + // prev := t.options.CustomizeFetch + // t.options.CustomizeFetch = func(req *http.Request) { + // setHeaders(req, headers) + // if prev != nil { + // prev(req) + // } + // } + + // included, err := NewTemplate(loc, t.options) + + _, context, included, err := t.raw(p, opt...) + if err != nil { + return "", err + } + dotCopy, err := included.forkFrom(t) + if err != nil { + return "", err + } + included.context = dotCopy + + if context == nil { + context = included.context + } + + return included.Render(context) +} + +// Raw includes the raw bytes fetched from the url inline. +func (t *Template) Raw(p string, opt ...interface{}) ([]byte, error) { + _, _, included, err := t.raw(p, opt...) + if err != nil { + return []byte{}, err + } + return included.body, nil +} + +func (t *Template) raw(p string, opt ...interface{}) (map[string][]string, interface{}, *Template, error) { + headers, context := headersAndContext(opt...) loc := p if strings.Index(loc, "str://") == -1 { u, err := GetURL(t.url, p) if err != nil { - return "", err + return nil, nil, nil, err } loc = u.String() } - prev := t.options.CustomizeFetch t.options.CustomizeFetch = func(req *http.Request) { setHeaders(req, headers) @@ -313,22 +389,8 @@ func (t *Template) Include(p string, opt ...interface{}) (string, error) { prev(req) } } - - included, err := NewTemplate(loc, t.options) - if err != nil { - return "", err - } - dotCopy, err := included.forkFrom(t) - if err != nil { - return "", err - } - included.context = dotCopy - - if context == nil { - context = included.context - } - - return included.Render(context) + tt, err := NewTemplate(loc, t.options) + return headers, context, tt, err } // DefaultFuncs returns a list of default functions for binding in the template @@ -338,7 +400,14 @@ func (t *Template) DefaultFuncs() []Function { { Name: "fetch", Description: []string{ - "Fetches a resource without evaluation as template", + "Fetches a resource without evaluation as template. The results are cached locally by the url fetched from.", + }, + Func: t.Fetch, + }, + { + Name: "raw", + Description: []string{ + "Get a resource and returns the raw bytes. The results are not cached.", }, Func: t.Fetch, }, @@ -490,6 +559,22 @@ func (t *Template) DefaultFuncs() []Function { }, Func: FromHCL, }, + { + Name: "escape", + Description: []string{ + "Escapes the content as template so it can be inlined another template", + "and not be evaluated all at the same time.", + }, + Func: Escape, + }, + { + Name: "unescape", + Description: []string{ + "Unescapes the content back into a template so that the content", + "can be evaluated.", + }, + Func: Escape, + }, { Name: "echo", Description: []string{ diff --git a/pkg/template/funcs_test.go b/pkg/template/funcs_test.go index 1b4eef567..77654dd7b 100644 --- a/pkg/template/funcs_test.go +++ b/pkg/template/funcs_test.go @@ -413,3 +413,17 @@ func TestYAML(t *testing.T) { require.Equal(t, v, v2) } + +func TestEscapeUnescape(t *testing.T) { + + text := `"\{\{.ID\}\}"` + buff := Unescape([]byte(text)) + text2 := Escape(buff) + require.Equal(t, string(text), string(text2)) + + tt, err := NewTemplateFromBytes(buff, "", Options{}) + require.NoError(t, err) + v, err := tt.Render(map[string]string{"ID": "hello"}) + require.NoError(t, err) + require.Equal(t, `"hello"`, v) +} From bb9947db3611341011f567ac4bca1360b54ffef1 Mon Sep 17 00:00:00 2001 From: David Chung Date: Sun, 10 Sep 2017 02:11:42 -0700 Subject: [PATCH 07/12] fix check docs Signed-off-by: David Chung --- docs/plugins/instance.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/plugins/instance.md b/docs/plugins/instance.md index cb9714994..c2c678bb7 100644 --- a/docs/plugins/instance.md +++ b/docs/plugins/instance.md @@ -1,6 +1,7 @@ # Instance plugin API - + + ## API From 1590d90d869fa2ed4bf4c945ae312f6ed808c526 Mon Sep 17 00:00:00 2001 From: David Chung Date: Sun, 10 Sep 2017 02:31:08 -0700 Subject: [PATCH 08/12] fix broken build Signed-off-by: David Chung --- pkg/controller/enrollment/types/types.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/controller/enrollment/types/types.go b/pkg/controller/enrollment/types/types.go index 3d1a6db42..b80e505a5 100644 --- a/pkg/controller/enrollment/types/types.go +++ b/pkg/controller/enrollment/types/types.go @@ -1,7 +1,6 @@ package types import ( - "bytes" "time" "github.com/docker/infrakit/pkg/controller" From bae1e576cef42a60e686b5b138b0ec40ab45e3c8 Mon Sep 17 00:00:00 2001 From: David Chung Date: Sun, 10 Sep 2017 02:39:28 -0700 Subject: [PATCH 09/12] fix broken build Signed-off-by: David Chung --- pkg/controller/enrollment/sync.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/controller/enrollment/sync.go b/pkg/controller/enrollment/sync.go index 98bfd979c..523057268 100644 --- a/pkg/controller/enrollment/sync.go +++ b/pkg/controller/enrollment/sync.go @@ -3,11 +3,13 @@ package enrollment import ( "fmt" + enrollment "github.com/docker/infrakit/pkg/controller/enrollment/types" "github.com/docker/infrakit/pkg/plugin" group_rpc "github.com/docker/infrakit/pkg/rpc/group" instance_rpc "github.com/docker/infrakit/pkg/rpc/instance" "github.com/docker/infrakit/pkg/spi/group" "github.com/docker/infrakit/pkg/spi/instance" + "github.com/docker/infrakit/pkg/template" "github.com/docker/infrakit/pkg/types" ) From 810373755e55d4f7ea141f379a7f05e223cd3b19 Mon Sep 17 00:00:00 2001 From: David Chung Date: Sun, 10 Sep 2017 08:06:38 -0700 Subject: [PATCH 10/12] added documentation / example yml Signed-off-by: David Chung --- docs/controller/enrollment/example.yml | 71 +++++++++++++++++++++++++ docs/controller/enrollment/group.yml | 30 +++++++++++ pkg/controller/enrollment/controller.go | 6 ++- pkg/controller/enrollment/enroller.go | 17 +++--- pkg/controller/enrollment/sync.go | 1 - pkg/controller/internal/controller.go | 20 +++++++ 6 files changed, 135 insertions(+), 10 deletions(-) create mode 100644 docs/controller/enrollment/example.yml create mode 100644 docs/controller/enrollment/group.yml diff --git a/docs/controller/enrollment/example.yml b/docs/controller/enrollment/example.yml new file mode 100644 index 000000000..9dfbcaf86 --- /dev/null +++ b/docs/controller/enrollment/example.yml @@ -0,0 +1,71 @@ +# +# +# Example for enrollment controller +# +# Enrollment controller makes sure the instances in the source group (specified +# by the List field) are in sync with entries in the downstream instance plugin +# specified by Instance field. +# +# This example uses swarm as backend. So be sure your swarm on the same host +# is a leader via `docker swarm init`. +# +# +# 1. Start up all the plugins: +# INFRAKIT_MANAGER_BACKEND=swarm build/infrakit plugin start manager group \ +# enrollment:nfs \ +# simulator:nfs-auth simulator:simulator vanilla +# +# This starts up the group at 'us-east', one simulator at 'nfs' and another at 'simulator' +# +# 2. Commit the group to create the groups -- see the group.yml included. +# +# infrakit group controller commit -y docs/controller/enrollment/group.yml +# +# Verify the group has workers: +# +# infrakit group describe workers +# +# 3. Commit this file to create the enrollments +# +# infrakit nfs controller commit -y docs/controller/enrollment/example.yml +# +# 4. Verify that entries are in sync: check the group describe and nfs-auth/disk (that's +# the Plugin specified in the Instance section of this config. +# +# infrakit group describe workers # returns the list of nodes in the group +# infrakit infrakit nfs-auth/disk describe # returns the list of corresponding enrollments +# +# For each member of the group workers you should see a corresponding entry in the enrollments. +# Note that the instance ID of an enrollment (in nfs-auth/disk) will be different from the instance +# ID of a member in group/workers; however, the ID of a member of group/workers should show up as +# a label in the instance of nfs-auth/disk (as infrakit.enrollment.sourceID). +# +# +kind: enrollment +metadata: + name: nfs/workers # socket file = nfs and the name of control loop is 'workers' +properties: + List: group/workers # socket file = group and group id is 'workers' + Instance: + + # the name of a plugin that has disk as subtype. + Plugin: nfs-auth/disk + + # the entire Properties block here will be rendered and included as the downstream + # instance plugin's instance.Spec when Provision() is called. + Properties: + + # You can include template expressions in this block; however, you need to + # escape the braces. + host: \{\{.ID\}\} + iops: 10 +options: + # This expression is a template used to select the key from each source entry + # of instance.Description. Note here we escape the template tags + # so that template expressions don't get clobbered by infrakit template when + # using that as a preprocessor prior to committing. + SourceKeySelector: \{\{.ID\}\} + + # How often to run the sync. The string value here is in the format of Go's time.Duration. + # For example, 1m means 1 minute. + SyncInterval: 5s # seconds \ No newline at end of file diff --git a/docs/controller/enrollment/group.yml b/docs/controller/enrollment/group.yml new file mode 100644 index 000000000..72dcb40cb --- /dev/null +++ b/docs/controller/enrollment/group.yml @@ -0,0 +1,30 @@ +# +# A group of workers +# +# Start up -- plugin start should include manager, vanilla, simulator, and group +# Then commit +# +# infrakit group controller commit -y docs/controller/enrollment/group.yml +# +kind: group +metadata: + name: workers +properties: + Allocation: + Size: 5 + Flavor: + Plugin: vanilla + Properties: + Attachments: + - ID: attachid + Type: attachtype + Init: + - docker pull nginx:alpine + - docker run -d -p 80:80 nginx-alpine + Tags: + project: infrakit + tier: web + Instance: + Plugin: simulator/compute + Properties: + Note: custom field diff --git a/pkg/controller/enrollment/controller.go b/pkg/controller/enrollment/controller.go index 661977c28..cc30a2de0 100644 --- a/pkg/controller/enrollment/controller.go +++ b/pkg/controller/enrollment/controller.go @@ -10,7 +10,10 @@ import ( "github.com/docker/infrakit/pkg/types" ) -var log = logutil.New("module", "controller/enrollment") +var ( + log = logutil.New("module", "controller/enrollment") + debugV = logutil.V(200) +) // NewController returns a controller implementation func NewController(plugins func() discovery.Plugins, leader manager.Leadership, @@ -36,6 +39,7 @@ func NewTypedControllers(plugins func() discovery.Plugins, leader manager.Leader leader, // the constructor func(spec types.Spec) (internal.Managed, error) { + log.Debug("Creating managed object", "spec", spec) return newEnroller(plugins, leader, options), nil }, // the key function diff --git a/pkg/controller/enrollment/enroller.go b/pkg/controller/enrollment/enroller.go index 17e9179cd..fc479a78b 100644 --- a/pkg/controller/enrollment/enroller.go +++ b/pkg/controller/enrollment/enroller.go @@ -122,6 +122,10 @@ func (l *enroller) Plan(operation controller.Operation, spec types.Spec) (*types } func (l *enroller) updateSpec(spec types.Spec) error { + + l.lock.Lock() + defer l.lock.Unlock() + if spec.Options != nil { options := enrollment.Options{} if err := spec.Options.Decode(&options); err != nil { @@ -139,13 +143,16 @@ func (l *enroller) updateSpec(spec types.Spec) error { } l.spec = spec + // set identity + l.spec.Metadata.Identity = &types.Identity{ + ID: l.spec.Metadata.Name, + } return nil } // Enforce implements internal.Managed.Enforce func (l *enroller) Enforce(spec types.Spec) (*types.Object, error) { - l.lock.Lock() - defer l.lock.Unlock() + log.Debug("Enforce", "spec", spec, "V", debugV) if err := l.updateSpec(spec); err != nil { return nil, err @@ -167,9 +174,6 @@ func (l *enroller) Free() (*types.Object, error) { // Pause implements internal.Managed.Pause func (l *enroller) Pause() (*types.Object, error) { - l.lock.Lock() - defer l.lock.Unlock() - if l.Running() { l.Stop() } @@ -178,9 +182,6 @@ func (l *enroller) Pause() (*types.Object, error) { // Terminate implements internal.Managed.Terminate func (l *enroller) Terminate() (*types.Object, error) { - l.lock.Lock() - defer l.lock.Unlock() - o, err := l.object() if err != nil { return nil, err diff --git a/pkg/controller/enrollment/sync.go b/pkg/controller/enrollment/sync.go index 523057268..216e1b2aa 100644 --- a/pkg/controller/enrollment/sync.go +++ b/pkg/controller/enrollment/sync.go @@ -197,7 +197,6 @@ func (l *enroller) labels(n instance.Description) map[string]string { // destroy all the instances in the enrolled instance plugin func (l *enroller) destroy() error { - instancePlugin, err := l.getInstancePlugin(l.properties.Instance.Plugin) if err != nil { return err diff --git a/pkg/controller/internal/controller.go b/pkg/controller/internal/controller.go index 964ecb354..de37f596c 100644 --- a/pkg/controller/internal/controller.go +++ b/pkg/controller/internal/controller.go @@ -144,6 +144,8 @@ func (c *Controller) Commit(operation controller.Operation, spec types.Spec) (ob c.lock.Lock() defer c.lock.Unlock() + log.Debug("committing", "operation", operation, "spec", spec) + m := []**Managed{} copy := spec m, err = c.getManaged(&spec.Metadata, ©) @@ -151,6 +153,12 @@ func (c *Controller) Commit(operation controller.Operation, spec types.Spec) (ob return } + log.Debug("got managed", "operation", operation, "spec", spec, "m", m) + + if len(m) == 0 { + return types.Object{}, fmt.Errorf("no managed object found %v", spec.Metadata.Name) + } + // In the future maybe will consider wildcard commits... but this is highly discouraged at this time. if len(m) != 1 { err = fmt.Errorf("duplicate objects: %v", m) @@ -162,6 +170,8 @@ func (c *Controller) Commit(operation controller.Operation, spec types.Spec) (ob managed := *(m[0]) if (*managed).Running() { + log.Debug("creating new object to replace running instance.") + // Create a new object newManaged, err := c.alloc(spec) if err != nil { @@ -180,6 +190,7 @@ func (c *Controller) Commit(operation controller.Operation, spec types.Spec) (ob log.Debug("Swapped running managed object", "managed", m[0]) } + log.Debug("calling enforce", "spec", spec, "m", managed) o, e := (*managed).Enforce(spec) if o != nil { object = *o @@ -216,6 +227,15 @@ func (c *Controller) Describe(search *types.Metadata) (objects []types.Object, e if err != nil { return } + + if len(m) == 0 { + ss := fmt.Sprintf("%v", search) + if search != nil { + ss = fmt.Sprintf("%v", *search) + } + return nil, fmt.Errorf("no managed object found %v", ss) + } + objects = []types.Object{} for _, s := range m { o, err := (**s).Inspect() From 180399b6a906c5a51bfe68894ca602c6c0d632b6 Mon Sep 17 00:00:00 2001 From: David Chung Date: Sun, 10 Sep 2017 08:31:43 -0700 Subject: [PATCH 11/12] change to use scale command instead of set-size Signed-off-by: David Chung --- docs/controller/enrollment/example.yml | 12 +++- pkg/cli/v0/group/cmd.go | 22 +++---- pkg/cli/v0/group/scale.go | 91 ++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 14 deletions(-) create mode 100644 pkg/cli/v0/group/scale.go diff --git a/docs/controller/enrollment/example.yml b/docs/controller/enrollment/example.yml index 9dfbcaf86..485d25d02 100644 --- a/docs/controller/enrollment/example.yml +++ b/docs/controller/enrollment/example.yml @@ -23,7 +23,7 @@ # # Verify the group has workers: # -# infrakit group describe workers +# infrakit group/workers group describe # # 3. Commit this file to create the enrollments # @@ -32,7 +32,7 @@ # 4. Verify that entries are in sync: check the group describe and nfs-auth/disk (that's # the Plugin specified in the Instance section of this config. # -# infrakit group describe workers # returns the list of nodes in the group +# infrakit group/workers group describe # returns the list of nodes in the group # infrakit infrakit nfs-auth/disk describe # returns the list of corresponding enrollments # # For each member of the group workers you should see a corresponding entry in the enrollments. @@ -40,7 +40,15 @@ # ID of a member in group/workers; however, the ID of a member of group/workers should show up as # a label in the instance of nfs-auth/disk (as infrakit.enrollment.sourceID). # +# 5. Try scale up the workers group # +# infrakit group/workers group scale 10 +# +# After a while, verify enrollment: +# +# infrakit nfs-auth/disk describe + + kind: enrollment metadata: name: nfs/workers # socket file = nfs and the name of control loop is 'workers' diff --git a/pkg/cli/v0/group/cmd.go b/pkg/cli/v0/group/cmd.go index 8a20e3a92..bd70aa172 100644 --- a/pkg/cli/v0/group/cmd.go +++ b/pkg/cli/v0/group/cmd.go @@ -15,16 +15,15 @@ var log = logutil.New("module", "cli/v1/group") func init() { cli.Register(group.InterfaceSpec, []cli.CmdBuilder{ - //Group, - Ls, - Inspect, - Describe, - Commit, - Free, - Destroy, - Size, - SetSize, - DestroyInstances, + Group, + // Ls, + // Inspect, + // Describe, + // Commit, + // Free, + // Destroy, + // Scale, + // DestroyInstances, }) } @@ -42,8 +41,7 @@ func Group(name string, services *cli.Services) *cobra.Command { Commit(name, services), Free(name, services), Destroy(name, services), - Size(name, services), - SetSize(name, services), + Scale(name, services), DestroyInstances(name, services), ) diff --git a/pkg/cli/v0/group/scale.go b/pkg/cli/v0/group/scale.go new file mode 100644 index 000000000..c9c9abcb4 --- /dev/null +++ b/pkg/cli/v0/group/scale.go @@ -0,0 +1,91 @@ +package group + +import ( + "fmt" + "os" + "strconv" + + "github.com/docker/infrakit/pkg/cli" + "github.com/docker/infrakit/pkg/plugin" + "github.com/docker/infrakit/pkg/spi/group" + "github.com/spf13/cobra" +) + +// Scale returns the scale command +func Scale(name string, services *cli.Services) *cobra.Command { + + scale := &cobra.Command{ + Use: "scale [new-target]", + Short: "Returns size of the group if no args provided. Otherwise set the target size.", + RunE: func(cmd *cobra.Command, args []string) error { + + pluginName := plugin.Name(name) + _, gid := pluginName.GetLookupAndType() + + size := -1 + + if gid == "" { + // if gid is not known, then we need it to be provided + switch len(args) { + + case 0: + cmd.Usage() + os.Exit(1) + case 1: + gid = args[0] + case 2: + gid = args[0] + sz, err := strconv.Atoi(args[1]) + if err != nil { + return err + } + size = sz + default: + cmd.Usage() + os.Exit(1) + } + } else { + // if gid is not known, then we need it to be provided + switch len(args) { + + case 0: + size = -1 + case 1: + sz, err := strconv.Atoi(args[0]) + if err != nil { + return err + } + size = sz + default: + cmd.Usage() + os.Exit(1) + } + } + + groupPlugin, err := LoadPlugin(services.Plugins(), name) + if err != nil { + return nil + } + cli.MustNotNil(groupPlugin, "group plugin not found", "name", name) + + groupID := group.ID(gid) + target, err := groupPlugin.Size(groupID) + if err != nil { + return err + } + fmt.Printf("Group %v at %d instances", groupID, target) + + if size > -1 { + err = groupPlugin.SetSize(groupID, size) + if err != nil { + return err + } + fmt.Printf(", scale to %d", size) + } + + fmt.Println() + return nil + }, + } + return scale +} From 8a5377026386bc9d43e6172fc4ad419014d964a6 Mon Sep 17 00:00:00 2001 From: David Chung Date: Sun, 10 Sep 2017 09:17:26 -0700 Subject: [PATCH 12/12] use non-qualified group commands for simpler ux Signed-off-by: David Chung --- docs/controller/enrollment/example.yml | 6 +++--- pkg/cli/v0/group/cmd.go | 18 +++++++++--------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/docs/controller/enrollment/example.yml b/docs/controller/enrollment/example.yml index 485d25d02..b228bb40a 100644 --- a/docs/controller/enrollment/example.yml +++ b/docs/controller/enrollment/example.yml @@ -23,7 +23,7 @@ # # Verify the group has workers: # -# infrakit group/workers group describe +# infrakit group/workers describe # # 3. Commit this file to create the enrollments # @@ -32,7 +32,7 @@ # 4. Verify that entries are in sync: check the group describe and nfs-auth/disk (that's # the Plugin specified in the Instance section of this config. # -# infrakit group/workers group describe # returns the list of nodes in the group +# infrakit group/workers describe # returns the list of nodes in the group # infrakit infrakit nfs-auth/disk describe # returns the list of corresponding enrollments # # For each member of the group workers you should see a corresponding entry in the enrollments. @@ -42,7 +42,7 @@ # # 5. Try scale up the workers group # -# infrakit group/workers group scale 10 +# infrakit group/workers scale 10 # # After a while, verify enrollment: # diff --git a/pkg/cli/v0/group/cmd.go b/pkg/cli/v0/group/cmd.go index bd70aa172..d680756c5 100644 --- a/pkg/cli/v0/group/cmd.go +++ b/pkg/cli/v0/group/cmd.go @@ -15,15 +15,15 @@ var log = logutil.New("module", "cli/v1/group") func init() { cli.Register(group.InterfaceSpec, []cli.CmdBuilder{ - Group, - // Ls, - // Inspect, - // Describe, - // Commit, - // Free, - // Destroy, - // Scale, - // DestroyInstances, + // Group, + Ls, + Inspect, + Describe, + Commit, + Free, + Destroy, + Scale, + DestroyInstances, }) }