Skip to content

Implement logic supporting Delete events #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions internal/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Event struct {
}

type ServerUpdateEvent struct {
id string
Type EventType
UpstreamName string
Servers []nginxClient.StreamUpstreamServer
}
Expand All @@ -37,9 +37,23 @@ func NewEvent(eventType EventType, service *v1.Service, previousService *v1.Serv
}
}

func NewServerUpdateEvent(upstreamName string, servers []nginxClient.StreamUpstreamServer) *ServerUpdateEvent {
func NewServerUpdateEvent(eventType EventType, upstreamName string, servers []nginxClient.StreamUpstreamServer) *ServerUpdateEvent {
return &ServerUpdateEvent{
Type: eventType,
UpstreamName: upstreamName,
Servers: servers,
}
}

func (e *ServerUpdateEvent) TypeName() string {
switch e.Type {
case Created:
return "Created"
case Updated:
return "Updated"
case Deleted:
return "Deleted"
default:
return "Unknown"
}
}
25 changes: 20 additions & 5 deletions internal/synchronization/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func NewSynchronizer() (*Synchronizer, error) {
}

func (s *Synchronizer) AddEvents(events core.ServerUpdateEvents) {
logrus.Debug(`Synchronizer::AddEvents adding %d events`, len(events))
logrus.Debugf(`Synchronizer::AddEvents adding %d events`, len(events))

// TODO: Add fan-out for multiple NginxClients
for _, event := range events {
s.AddEvent(event)
}
Expand Down Expand Up @@ -86,12 +88,25 @@ func (s *Synchronizer) ShutDown() {
func (s *Synchronizer) handleEvent(serverUpdateEvent *core.ServerUpdateEvent) error {
logrus.Debugf(`Synchronizer::handleEvent: %#v`, serverUpdateEvent)

_, _, _, err := s.NginxPlusClient.UpdateStreamServers(serverUpdateEvent.UpstreamName, serverUpdateEvent.Servers)
if err != nil {
return fmt.Errorf(`error occurred updating the nginx+ host: %w`, err)
switch serverUpdateEvent.Type {
case core.Created:
fallthrough
case core.Updated:
_, _, _, err := s.NginxPlusClient.UpdateStreamServers(serverUpdateEvent.UpstreamName, serverUpdateEvent.Servers)
if err != nil {
return fmt.Errorf(`error occurred updating the nginx+ upstream servers: %w`, err)
}
case core.Deleted:
// NOTE: Deleted events include a single server in the array
err := s.NginxPlusClient.DeleteStreamServer(serverUpdateEvent.UpstreamName, serverUpdateEvent.Servers[0].Server)
if err != nil {
return fmt.Errorf(`error occurred deleting the nginx+ upstream server: %w`, err)
}
default:
logrus.Warnf(`Synchronizer::handleEvent: unknown event type: %d`, serverUpdateEvent.Type)
}

logrus.Infof(`Synchronizer::handleEvent: successfully updated the nginx+ hosts for Ingress: "%s"`, serverUpdateEvent.UpstreamName)
logrus.Infof(`Synchronizer::handleEvent: successfully %s the nginx+ hosts for Ingress: "%s"`, serverUpdateEvent.TypeName(), serverUpdateEvent.UpstreamName)

return nil
}
Expand Down
29 changes: 22 additions & 7 deletions internal/translation/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func Translate(event *core.Event) (core.ServerUpdateEvents, error) {

portsOfInterest := filterPorts(event.Service.Spec.Ports)

return buildServerUpdateEvents(portsOfInterest, event.NodeIps)
return buildServerUpdateEvents(portsOfInterest, event)
}

func filterPorts(ports []v1.ServicePort) []v1.ServicePort {
Expand All @@ -35,19 +35,34 @@ func filterPorts(ports []v1.ServicePort) []v1.ServicePort {
return portsOfInterest
}

// TODO: Get the list of Node IPs from the Kubernetes API and fan out over the port
func buildServerUpdateEvents(ports []v1.ServicePort, nodeIps []string) (core.ServerUpdateEvents, error) {
// buildServerUpdateEvents builds a list of ServerUpdateEvents based on the event type
// The NGINX+ Client uses a list of servers for Created and Updated events; the client performs reconciliation between
// the list of servers in the NGINX+ Client call and the list of servers in NGINX+.
// The NGINX+ Client uses a single server for Deleted events; so the list of servers is broken up into individual events.
func buildServerUpdateEvents(ports []v1.ServicePort, event *core.Event) (core.ServerUpdateEvents, error) {
logrus.Debugf("Translate::buildServerUpdateEvents(ports=%#v)", ports)

upstreams := core.ServerUpdateEvents{}
updateEvents := core.ServerUpdateEvents{}
for _, port := range ports {
ingressName := fixIngressName(port.Name)
servers, _ := buildServers(nodeIps, port)
servers, _ := buildServers(event.NodeIps, port)

switch event.Type {
case core.Created:
fallthrough
case core.Updated:
updateEvents = append(updateEvents, core.NewServerUpdateEvent(event.Type, ingressName, servers))
case core.Deleted:
for _, server := range servers {
updateEvents = append(updateEvents, core.NewServerUpdateEvent(event.Type, ingressName, []nginxClient.StreamUpstreamServer{server}))
}
default:
logrus.Warnf(`Translator::buildServerUpdateEvents: unknown event type: %d`, event.Type)
}

upstreams = append(upstreams, core.NewServerUpdateEvent(ingressName, servers))
}

return upstreams, nil
return updateEvents, nil
}

func buildServers(nodeIps []string, port v1.ServicePort) ([]nginxClient.StreamUpstreamServer, error) {
Expand Down