diff --git a/internal/core/events.go b/internal/core/events.go index b83dc09..0da44e8 100644 --- a/internal/core/events.go +++ b/internal/core/events.go @@ -21,7 +21,7 @@ type Event struct { } type ServerUpdateEvent struct { - id string + Type EventType UpstreamName string Servers []nginxClient.StreamUpstreamServer } @@ -37,9 +37,23 @@ func NewEvent(eventType EventType, service *v1.Service, previousService *v1.Serv } } -func NewServerUpdateEvent(upstreamName string, servers []nginxClient.StreamUpstreamServer) *ServerUpdateEvent { +func NewServerUpdateEvent(eventType EventType, upstreamName string, servers []nginxClient.StreamUpstreamServer) *ServerUpdateEvent { return &ServerUpdateEvent{ + Type: eventType, UpstreamName: upstreamName, Servers: servers, } } + +func (e *ServerUpdateEvent) TypeName() string { + switch e.Type { + case Created: + return "Created" + case Updated: + return "Updated" + case Deleted: + return "Deleted" + default: + return "Unknown" + } +} diff --git a/internal/synchronization/synchronizer.go b/internal/synchronization/synchronizer.go index 825149b..b059ac5 100644 --- a/internal/synchronization/synchronizer.go +++ b/internal/synchronization/synchronizer.go @@ -34,7 +34,9 @@ func NewSynchronizer() (*Synchronizer, error) { } func (s *Synchronizer) AddEvents(events core.ServerUpdateEvents) { - logrus.Debug(`Synchronizer::AddEvents adding %d events`, len(events)) + logrus.Debugf(`Synchronizer::AddEvents adding %d events`, len(events)) + + // TODO: Add fan-out for multiple NginxClients for _, event := range events { s.AddEvent(event) } @@ -86,12 +88,25 @@ func (s *Synchronizer) ShutDown() { func (s *Synchronizer) handleEvent(serverUpdateEvent *core.ServerUpdateEvent) error { logrus.Debugf(`Synchronizer::handleEvent: %#v`, serverUpdateEvent) - _, _, _, err := s.NginxPlusClient.UpdateStreamServers(serverUpdateEvent.UpstreamName, serverUpdateEvent.Servers) - if err != nil { - return fmt.Errorf(`error occurred updating the nginx+ host: %w`, err) + switch serverUpdateEvent.Type { + case core.Created: + fallthrough + case core.Updated: + _, _, _, err := s.NginxPlusClient.UpdateStreamServers(serverUpdateEvent.UpstreamName, serverUpdateEvent.Servers) + if err != nil { + return fmt.Errorf(`error occurred updating the nginx+ upstream servers: %w`, err) + } + case core.Deleted: + // NOTE: Deleted events include a single server in the array + err := s.NginxPlusClient.DeleteStreamServer(serverUpdateEvent.UpstreamName, serverUpdateEvent.Servers[0].Server) + if err != nil { + return fmt.Errorf(`error occurred deleting the nginx+ upstream server: %w`, err) + } + default: + logrus.Warnf(`Synchronizer::handleEvent: unknown event type: %d`, serverUpdateEvent.Type) } - logrus.Infof(`Synchronizer::handleEvent: successfully updated the nginx+ hosts for Ingress: "%s"`, serverUpdateEvent.UpstreamName) + logrus.Infof(`Synchronizer::handleEvent: successfully %s the nginx+ hosts for Ingress: "%s"`, serverUpdateEvent.TypeName(), serverUpdateEvent.UpstreamName) return nil } diff --git a/internal/translation/translator.go b/internal/translation/translator.go index 937eada..02b5df5 100644 --- a/internal/translation/translator.go +++ b/internal/translation/translator.go @@ -20,7 +20,7 @@ func Translate(event *core.Event) (core.ServerUpdateEvents, error) { portsOfInterest := filterPorts(event.Service.Spec.Ports) - return buildServerUpdateEvents(portsOfInterest, event.NodeIps) + return buildServerUpdateEvents(portsOfInterest, event) } func filterPorts(ports []v1.ServicePort) []v1.ServicePort { @@ -35,19 +35,34 @@ func filterPorts(ports []v1.ServicePort) []v1.ServicePort { return portsOfInterest } -// TODO: Get the list of Node IPs from the Kubernetes API and fan out over the port -func buildServerUpdateEvents(ports []v1.ServicePort, nodeIps []string) (core.ServerUpdateEvents, error) { +// buildServerUpdateEvents builds a list of ServerUpdateEvents based on the event type +// The NGINX+ Client uses a list of servers for Created and Updated events; the client performs reconciliation between +// the list of servers in the NGINX+ Client call and the list of servers in NGINX+. +// The NGINX+ Client uses a single server for Deleted events; so the list of servers is broken up into individual events. +func buildServerUpdateEvents(ports []v1.ServicePort, event *core.Event) (core.ServerUpdateEvents, error) { logrus.Debugf("Translate::buildServerUpdateEvents(ports=%#v)", ports) - upstreams := core.ServerUpdateEvents{} + updateEvents := core.ServerUpdateEvents{} for _, port := range ports { ingressName := fixIngressName(port.Name) - servers, _ := buildServers(nodeIps, port) + servers, _ := buildServers(event.NodeIps, port) + + switch event.Type { + case core.Created: + fallthrough + case core.Updated: + updateEvents = append(updateEvents, core.NewServerUpdateEvent(event.Type, ingressName, servers)) + case core.Deleted: + for _, server := range servers { + updateEvents = append(updateEvents, core.NewServerUpdateEvent(event.Type, ingressName, []nginxClient.StreamUpstreamServer{server})) + } + default: + logrus.Warnf(`Translator::buildServerUpdateEvents: unknown event type: %d`, event.Type) + } - upstreams = append(upstreams, core.NewServerUpdateEvent(ingressName, servers)) } - return upstreams, nil + return updateEvents, nil } func buildServers(nodeIps []string, port v1.ServicePort) ([]nginxClient.StreamUpstreamServer, error) {