diff --git a/cmd/nginx-k8s-edge-controller/main.go b/cmd/nginx-k8s-edge-controller/main.go index 7e82e4a..f8ef3eb 100644 --- a/cmd/nginx-k8s-edge-controller/main.go +++ b/cmd/nginx-k8s-edge-controller/main.go @@ -14,6 +14,7 @@ import ( "github.com/sirupsen/logrus" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/util/workqueue" ) func main() { @@ -42,20 +43,24 @@ func run() error { return fmt.Errorf(`error occurred initializing settings: %w`, err) } - synchronizer, err := synchronization.NewSynchronizer(settings) + synchronizerWorkqueue, err := buildWorkQueue(settings.Synchronizer.WorkQueueSettings) if err != nil { - return fmt.Errorf(`error initializing synchronizer: %w`, err) + return fmt.Errorf(`error occurred building a workqueue: %w`, err) } - err = synchronizer.Initialize() + synchronizer, err := synchronization.NewSynchronizer(settings, synchronizerWorkqueue) if err != nil { return fmt.Errorf(`error initializing synchronizer: %w`, err) } - handler := observation.NewHandler(synchronizer) - handler.Initialize() + handlerWorkqueue, err := buildWorkQueue(settings.Synchronizer.WorkQueueSettings) + if err != nil { + return fmt.Errorf(`error occurred building a workqueue: %w`, err) + } + + handler := observation.NewHandler(settings, synchronizer, handlerWorkqueue) - watcher, err := observation.NewWatcher(ctx, handler, k8sClient) + watcher, err := observation.NewWatcher(settings, handler) if err != nil { return fmt.Errorf(`error occurred creating a watcher: %w`, err) } @@ -97,3 +102,10 @@ func buildKubernetesClient() (*kubernetes.Clientset, error) { return client, nil } + +func buildWorkQueue(settings configuration.WorkQueueSettings) (workqueue.RateLimitingInterface, error) { + logrus.Debug("Watcher::buildSynchronizerWorkQueue") + + rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(settings.RateLimiterBase, settings.RateLimiterMax) + return workqueue.NewNamedRateLimitingQueue(rateLimiter, settings.Name), nil +} diff --git a/internal/configuration/settings.go b/internal/configuration/settings.go index dbf77ab..aa5b614 100644 --- a/internal/configuration/settings.go +++ b/internal/configuration/settings.go @@ -14,6 +14,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "strings" + "time" ) const ( @@ -21,21 +22,74 @@ const ( ResyncPeriod = 0 ) +type WorkQueueSettings struct { + Name string + RateLimiterBase time.Duration + RateLimiterMax time.Duration +} + +type HandlerSettings struct { + RetryCount int + Threads int + WorkQueueSettings WorkQueueSettings +} + +type WatcherSettings struct { + NginxIngressNamespace string + ResyncPeriod time.Duration +} + +type SynchronizerSettings struct { + MaxMillisecondsJitter int + MinMillisecondsJitter int + RetryCount int + Threads int + WorkQueueSettings WorkQueueSettings +} + type Settings struct { - ctx context.Context + Context context.Context NginxPlusHosts []string - k8sClient *kubernetes.Clientset + K8sClient *kubernetes.Clientset informer cache.SharedInformer eventHandlerRegistration cache.ResourceEventHandlerRegistration + + Handler HandlerSettings + Synchronizer SynchronizerSettings + Watcher WatcherSettings } func NewSettings(ctx context.Context, k8sClient *kubernetes.Clientset) (*Settings, error) { - config := new(Settings) - - config.k8sClient = k8sClient - config.ctx = ctx + settings := &Settings{ + Context: ctx, + K8sClient: k8sClient, + Handler: HandlerSettings{ + RetryCount: 5, + Threads: 1, + WorkQueueSettings: WorkQueueSettings{ + RateLimiterBase: time.Second * 2, + RateLimiterMax: time.Second * 60, + Name: "nkl-handler", + }, + }, + Synchronizer: SynchronizerSettings{ + MaxMillisecondsJitter: 750, + MinMillisecondsJitter: 250, + RetryCount: 5, + Threads: 1, + WorkQueueSettings: WorkQueueSettings{ + RateLimiterBase: time.Second * 2, + RateLimiterMax: time.Second * 60, + Name: "nkl-synchronizer", + }, + }, + Watcher: WatcherSettings{ + NginxIngressNamespace: "nginx-ingress", + ResyncPeriod: 0, + }, + } - return config, nil + return settings, nil } func (s *Settings) Initialize() error { @@ -63,14 +117,14 @@ func (s *Settings) Run() { defer utilruntime.HandleCrash() - go s.informer.Run(s.ctx.Done()) + go s.informer.Run(s.Context.Done()) - <-s.ctx.Done() + <-s.Context.Done() } func (s *Settings) buildInformer() (cache.SharedInformer, error) { options := informers.WithNamespace(ConfigMapsNamespace) - factory := informers.NewSharedInformerFactoryWithOptions(s.k8sClient, ResyncPeriod, options) + factory := informers.NewSharedInformerFactoryWithOptions(s.K8sClient, ResyncPeriod, options) informer := factory.Core().V1().ConfigMaps().Informer() return informer, nil diff --git a/internal/core/events_test.go b/internal/core/events_test.go new file mode 100644 index 0000000..5c85b0b --- /dev/null +++ b/internal/core/events_test.go @@ -0,0 +1,60 @@ +package core + +import ( + nginxClient "github.com/nginxinc/nginx-plus-go-client/client" + "testing" +) + +func TestServerUpdateEventWithIdAndHost(t *testing.T) { + event := NewServerUpdateEvent(Created, "upstream", []nginxClient.StreamUpstreamServer{}) + + if event.Id != "" { + t.Errorf("expected empty Id, got %s", event.Id) + } + + if event.NginxHost != "" { + t.Errorf("expected empty NginxHost, got %s", event.NginxHost) + } + + eventWithIdAndHost := ServerUpdateEventWithIdAndHost(event, "id", "host") + + if eventWithIdAndHost.Id != "id" { + t.Errorf("expected Id to be 'id', got %s", eventWithIdAndHost.Id) + } + + if eventWithIdAndHost.NginxHost != "host" { + t.Errorf("expected NginxHost to be 'host', got %s", eventWithIdAndHost.NginxHost) + } +} + +func TestTypeNameCreated(t *testing.T) { + event := NewServerUpdateEvent(Created, "upstream", []nginxClient.StreamUpstreamServer{}) + + if event.TypeName() != "Created" { + t.Errorf("expected 'Created', got %s", event.TypeName()) + } +} + +func TestTypeNameUpdated(t *testing.T) { + event := NewServerUpdateEvent(Updated, "upstream", []nginxClient.StreamUpstreamServer{}) + + if event.TypeName() != "Updated" { + t.Errorf("expected 'Updated', got %s", event.TypeName()) + } +} + +func TestTypeNameDeleted(t *testing.T) { + event := NewServerUpdateEvent(Deleted, "upstream", []nginxClient.StreamUpstreamServer{}) + + if event.TypeName() != "Deleted" { + t.Errorf("expected 'Deleted', got %s", event.TypeName()) + } +} + +func TestTypeNameUnknown(t *testing.T) { + event := NewServerUpdateEvent(EventType(100), "upstream", []nginxClient.StreamUpstreamServer{}) + + if event.TypeName() != "Unknown" { + t.Errorf("expected 'Unknown', got %s", event.TypeName()) + } +} diff --git a/internal/observation/handler.go b/internal/observation/handler.go index 412a438..53cbf69 100644 --- a/internal/observation/handler.go +++ b/internal/observation/handler.go @@ -6,28 +6,31 @@ package observation import ( "fmt" + "github.com/nginxinc/kubernetes-nginx-ingress/internal/configuration" "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" "github.com/nginxinc/kubernetes-nginx-ingress/internal/synchronization" "github.com/nginxinc/kubernetes-nginx-ingress/internal/translation" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" - "time" ) -const RateLimiterBase = time.Second * 2 -const RateLimiterMax = time.Second * 60 -const RetryCount = 5 -const Threads = 1 -const WatcherQueueName = `nkl-handler` +type HandlerInterface interface { + AddRateLimitedEvent(event *core.Event) + Run(stopCh <-chan struct{}) + ShutDown() +} type Handler struct { eventQueue workqueue.RateLimitingInterface - synchronizer *synchronization.Synchronizer + settings *configuration.Settings + synchronizer synchronization.Interface } -func NewHandler(synchronizer *synchronization.Synchronizer) *Handler { +func NewHandler(settings *configuration.Settings, synchronizer synchronization.Interface, eventQueue workqueue.RateLimitingInterface) *Handler { return &Handler{ + eventQueue: eventQueue, + settings: settings, synchronizer: synchronizer, } } @@ -37,15 +40,10 @@ func (h *Handler) AddRateLimitedEvent(event *core.Event) { h.eventQueue.AddRateLimited(event) } -func (h *Handler) Initialize() { - rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(RateLimiterBase, RateLimiterMax) - h.eventQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, WatcherQueueName) -} - func (h *Handler) Run(stopCh <-chan struct{}) { logrus.Debug("Handler::Run") - for i := 0; i < Threads; i++ { + for i := 0; i < h.settings.Handler.Threads; i++ { go wait.Until(h.worker, 0, stopCh) } @@ -97,7 +95,7 @@ func (h *Handler) withRetry(err error, event *core.Event) { logrus.Debug("Handler::withRetry") if err != nil { // TODO: Add Telemetry - if h.eventQueue.NumRequeues(event) < RetryCount { // TODO: Make this configurable + if h.eventQueue.NumRequeues(event) < h.settings.Handler.RetryCount { h.eventQueue.AddRateLimited(event) logrus.Infof(`Handler::withRetry: requeued event: %#v; error: %v`, event, err) } else { diff --git a/internal/observation/handler_test.go b/internal/observation/handler_test.go new file mode 100644 index 0000000..e2e4f15 --- /dev/null +++ b/internal/observation/handler_test.go @@ -0,0 +1,54 @@ +package observation + +import ( + "context" + "fmt" + "github.com/nginxinc/kubernetes-nginx-ingress/internal/configuration" + "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" + "github.com/nginxinc/kubernetes-nginx-ingress/test/mocks" + v1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/workqueue" + "testing" +) + +func TestHandler_AddsEventToSynchronizer(t *testing.T) { + _, _, synchronizer, handler, err := buildHandler() + if err != nil { + t.Errorf(`should have been no error, %v`, err) + } + + event := &core.Event{ + Type: core.Created, + Service: &v1.Service{ + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: "nkl-back", + }, + }, + }, + }, + } + + handler.AddRateLimitedEvent(event) + + handler.handleNextEvent() + + if len(synchronizer.Events) != 1 { + t.Errorf(`handler.AddRateLimitedEvent did not add the event to the queue`) + } +} + +func buildHandler() (*configuration.Settings, workqueue.RateLimitingInterface, *mocks.MockSynchronizer, *Handler, error) { + settings, err := configuration.NewSettings(context.Background(), nil) + if err != nil { + return nil, nil, nil, nil, fmt.Errorf(`should have been no error, %v`, err) + } + + eventQueue := &mocks.MockRateLimiter{} + synchronizer := &mocks.MockSynchronizer{} + + handler := NewHandler(settings, synchronizer, eventQueue) + + return settings, eventQueue, synchronizer, handler, nil +} diff --git a/internal/observation/watcher.go b/internal/observation/watcher.go index c9019ee..876f43a 100644 --- a/internal/observation/watcher.go +++ b/internal/observation/watcher.go @@ -5,35 +5,30 @@ package observation import ( - "context" + "errors" "fmt" + "github.com/nginxinc/kubernetes-nginx-ingress/internal/configuration" "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" "github.com/sirupsen/logrus" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "time" ) -const NginxIngressNamespace = "nginx-ingress" -const ResyncPeriod = 0 - type Watcher struct { - ctx context.Context - client *kubernetes.Clientset eventHandlerRegistration interface{} - handler *Handler + handler HandlerInterface informer cache.SharedIndexInformer + settings *configuration.Settings } -func NewWatcher(ctx context.Context, handler *Handler, k8sClient *kubernetes.Clientset) (*Watcher, error) { +func NewWatcher(settings *configuration.Settings, handler HandlerInterface) (*Watcher, error) { return &Watcher{ - ctx: ctx, - client: k8sClient, - handler: handler, + handler: handler, + settings: settings, }, nil } @@ -56,16 +51,21 @@ func (w *Watcher) Initialize() error { func (w *Watcher) Watch() error { logrus.Debug("Watcher::Watch") + + if w.informer == nil { + return errors.New("error: Initialize must be called before Watch") + } + defer utilruntime.HandleCrash() defer w.handler.ShutDown() - go w.informer.Run(w.ctx.Done()) + go w.informer.Run(w.settings.Context.Done()) - if !cache.WaitForNamedCacheSync(WatcherQueueName, w.ctx.Done(), w.informer.HasSynced) { + if !cache.WaitForNamedCacheSync(w.settings.Handler.WorkQueueSettings.Name, w.settings.Context.Done(), w.informer.HasSynced) { return fmt.Errorf(`error occurred waiting for the cache to sync`) } - <-w.ctx.Done() + <-w.settings.Context.Done() return nil } @@ -117,8 +117,8 @@ func (w *Watcher) buildEventHandlerForUpdate() func(interface{}, interface{}) { func (w *Watcher) buildInformer() (cache.SharedIndexInformer, error) { logrus.Debug("Watcher::buildInformer") - options := informers.WithNamespace(NginxIngressNamespace) - factory := informers.NewSharedInformerFactoryWithOptions(w.client, ResyncPeriod, options) + options := informers.WithNamespace(w.settings.Watcher.NginxIngressNamespace) + factory := informers.NewSharedInformerFactoryWithOptions(w.settings.K8sClient, w.settings.Watcher.ResyncPeriod, options) informer := factory.Core().V1().Services().Informer() return informer, nil @@ -148,7 +148,7 @@ func (w *Watcher) retrieveNodeIps() ([]string, error) { var nodeIps []string - nodes, err := w.client.CoreV1().Nodes().List(w.ctx, metav1.ListOptions{}) + nodes, err := w.settings.K8sClient.CoreV1().Nodes().List(w.settings.Context, metav1.ListOptions{}) if err != nil { logrus.Errorf(`error occurred retrieving the list of nodes: %v`, err) return nil, err diff --git a/internal/observation/watcher_test.go b/internal/observation/watcher_test.go index 6a69c7d..a3d810e 100644 --- a/internal/observation/watcher_test.go +++ b/internal/observation/watcher_test.go @@ -6,34 +6,23 @@ package observation import ( "context" - "github.com/nginxinc/kubernetes-nginx-ingress/internal/synchronization" - "os" + "github.com/nginxinc/kubernetes-nginx-ingress/internal/configuration" + "github.com/nginxinc/kubernetes-nginx-ingress/test/mocks" + "k8s.io/client-go/kubernetes" "testing" ) -func TestNewWatcher(t *testing.T) { - const EnvVarName = "NGINX_PLUS_HOST" - const ExpectedValue = "https://demo.nginx.com/api" - - synchronizer, err := synchronization.NewSynchronizer() - if err != nil { - t.Fatalf(`should have been no error, %v`, err) +func TestWatcher_MustInitialize(t *testing.T) { + watcher, _ := buildWatcher() + if err := watcher.Watch(); err == nil { + t.Errorf("Expected error, got %s", err) } +} - defer os.Unsetenv(EnvVarName) - os.Setenv(EnvVarName, ExpectedValue) - - ctx := context.Background() - - handler := NewHandler(synchronizer) - handler.Initialize() - - watcher, err := NewWatcher(ctx, handler) - if err != nil { - t.Fatalf(`failed to create watcher: %v`, err) - } +func buildWatcher() (*Watcher, error) { + k8sClient := &kubernetes.Clientset{} + settings, _ := configuration.NewSettings(context.Background(), k8sClient) + handler := &mocks.MockHandler{} - if watcher == nil { - t.Fatal("watcher should not be nil") - } + return NewWatcher(settings, handler) } diff --git a/internal/probation/check_test.go b/internal/probation/check_test.go new file mode 100644 index 0000000..59eb961 --- /dev/null +++ b/internal/probation/check_test.go @@ -0,0 +1,28 @@ +// Copyright 2023 f5 Inc. All rights reserved. +// Use of this source code is governed by the Apache +// license that can be found in the LICENSE file. + +package probation + +import "testing" + +func TestCheck_LiveCheck(t *testing.T) { + check := LiveCheck{} + if !check.Check() { + t.Errorf("LiveCheck should return true") + } +} + +func TestCheck_ReadyCheck(t *testing.T) { + check := ReadyCheck{} + if !check.Check() { + t.Errorf("ReadyCheck should return true") + } +} + +func TestCheck_StartupCheck(t *testing.T) { + check := StartupCheck{} + if !check.Check() { + t.Errorf("StartupCheck should return true") + } +} diff --git a/internal/probation/server.go b/internal/probation/server.go index 78a1c75..6c26fea 100644 --- a/internal/probation/server.go +++ b/internal/probation/server.go @@ -58,18 +58,18 @@ func (hs *HealthServer) Stop() { } func (hs *HealthServer) HandleLive(writer http.ResponseWriter, request *http.Request) { - handleProbe(writer, request, &hs.LiveCheck) + hs.handleProbe(writer, request, &hs.LiveCheck) } func (hs *HealthServer) HandleReady(writer http.ResponseWriter, request *http.Request) { - handleProbe(writer, request, &hs.ReadyCheck) + hs.handleProbe(writer, request, &hs.ReadyCheck) } func (hs *HealthServer) HandleStartup(writer http.ResponseWriter, request *http.Request) { - handleProbe(writer, request, &hs.StartupCheck) + hs.handleProbe(writer, request, &hs.StartupCheck) } -func handleProbe(writer http.ResponseWriter, _ *http.Request, check Check) { +func (hs *HealthServer) handleProbe(writer http.ResponseWriter, _ *http.Request, check Check) { if check.Check() { writer.WriteHeader(http.StatusOK) diff --git a/internal/probation/server_test.go b/internal/probation/server_test.go new file mode 100644 index 0000000..77762fa --- /dev/null +++ b/internal/probation/server_test.go @@ -0,0 +1,52 @@ +// Copyright 2023 f5 Inc. All rights reserved. +// Use of this source code is governed by the Apache +// license that can be found in the LICENSE file. + +package probation + +import ( + "github.com/nginxinc/kubernetes-nginx-ingress/test/mocks" + "testing" +) + +func TestHealthServer_HandleLive(t *testing.T) { + server := NewHealthServer() + writer := mocks.NewMockResponseWriter() + server.HandleLive(writer, nil) + + if string(writer.Body()) != Ok { + t.Errorf("HandleLive should return %s", Ok) + } +} + +func TestHealthServer_HandleReady(t *testing.T) { + server := NewHealthServer() + writer := mocks.NewMockResponseWriter() + server.HandleReady(writer, nil) + + if string(writer.Body()) != Ok { + t.Errorf("HandleReady should return %s", Ok) + } +} + +func TestHealthServer_HandleStartup(t *testing.T) { + server := NewHealthServer() + writer := mocks.NewMockResponseWriter() + server.HandleStartup(writer, nil) + + if string(writer.Body()) != Ok { + t.Errorf("HandleStartup should return %s", Ok) + } +} + +func TestHealthServer_HandleFailCheck(t *testing.T) { + failCheck := mocks.NewMockCheck(false) + server := NewHealthServer() + writer := mocks.NewMockResponseWriter() + server.handleProbe(writer, nil, failCheck) + + body := string(writer.Body()) + if body != "Service Not Available" { + t.Errorf("Expected 'Service Not Available', got %v", body) + } +} diff --git a/internal/synchronization/synchronizer.go b/internal/synchronization/synchronizer.go index 30ec658..edf61d5 100644 --- a/internal/synchronization/synchronizer.go +++ b/internal/synchronization/synchronizer.go @@ -13,30 +13,26 @@ import ( "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" - "time" ) -const ( - // MaxMillisecondsJitter and MinMillisecondsJitter are used to randomize the rate limiter, - // creating headroom for calls to the NGINX edge hosts. - MaxMillisecondsJitter = 750 - MinMillisecondsJitter = 250 - - RateLimiterBase = time.Second * 2 - RateLimiterMax = time.Second * 60 - RetryCount = 5 - Threads = 1 - SynchronizerQueueName = `nkl-synchronizer` -) +type Interface interface { + AddEvents(events core.ServerUpdateEvents) + AddEvent(event *core.ServerUpdateEvent) + Run(stopCh <-chan struct{}) + ShutDown() +} type Synchronizer struct { eventQueue workqueue.RateLimitingInterface settings *configuration.Settings } -func NewSynchronizer(settings *configuration.Settings) (*Synchronizer, error) { - synchronizer := Synchronizer{} - synchronizer.settings = settings +func NewSynchronizer(settings *configuration.Settings, eventQueue workqueue.RateLimitingInterface) (*Synchronizer, error) { + synchronizer := Synchronizer{ + eventQueue: eventQueue, + settings: settings, + } + return &synchronizer, nil } @@ -63,22 +59,14 @@ func (s *Synchronizer) AddEvent(event *core.ServerUpdateEvent) { return } - s.eventQueue.AddAfter(event, RandomMilliseconds(MinMillisecondsJitter, MaxMillisecondsJitter)) -} - -func (s *Synchronizer) Initialize() error { - logrus.Debug(`Synchronizer::Initialize`) - - rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(RateLimiterBase, RateLimiterMax) - s.eventQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, SynchronizerQueueName) - - return nil + after := RandomMilliseconds(s.settings.Synchronizer.MinMillisecondsJitter, s.settings.Synchronizer.MaxMillisecondsJitter) + s.eventQueue.AddAfter(event, after) } func (s *Synchronizer) Run(stopCh <-chan struct{}) { logrus.Debug(`Synchronizer::Run`) - for i := 0; i < Threads; i++ { + for i := 0; i < s.settings.Synchronizer.Threads; i++ { go wait.Until(s.worker, 0, stopCh) } @@ -213,7 +201,7 @@ func (s *Synchronizer) withRetry(err error, event *core.ServerUpdateEvent) { logrus.Debug("Synchronizer::withRetry") if err != nil { // TODO: Add Telemetry - if s.eventQueue.NumRequeues(event) < RetryCount { // TODO: Make this configurable + if s.eventQueue.NumRequeues(event) < s.settings.Synchronizer.RetryCount { // TODO: Make this configurable s.eventQueue.AddRateLimited(event) logrus.Infof(`Synchronizer::withRetry: requeued event: %s; error: %v`, event.Id, err) } else { diff --git a/internal/synchronization/synchronizer_test.go b/internal/synchronization/synchronizer_test.go index e0c1a8c..b59c66f 100644 --- a/internal/synchronization/synchronizer_test.go +++ b/internal/synchronization/synchronizer_test.go @@ -5,11 +5,169 @@ package synchronization import ( + "context" + "fmt" + "github.com/nginxinc/kubernetes-nginx-ingress/internal/configuration" + "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" + "github.com/nginxinc/kubernetes-nginx-ingress/test/mocks" "testing" ) -func TestNewNginxPlusSynchronizer(t *testing.T) { - synchronizer, err := NewSynchronizer() +func TestSynchronizer_NewSynchronizer(t *testing.T) { + settings, err := configuration.NewSettings(context.Background(), nil) + + rateLimiter := &mocks.MockRateLimiter{} + + synchronizer, err := NewSynchronizer(settings, rateLimiter) + if err != nil { + t.Fatalf(`should have been no error, %v`, err) + } + + if synchronizer == nil { + t.Fatal("should have an Synchronizer instance") + } +} + +func TestSynchronizer_AddEventNoHosts(t *testing.T) { + const expectedEventCount = 0 + event := &core.ServerUpdateEvent{ + Id: "", + NginxHost: "", + Type: 0, + UpstreamName: "", + Servers: nil, + } + settings, err := configuration.NewSettings(context.Background(), nil) + rateLimiter := &mocks.MockRateLimiter{} + + synchronizer, err := NewSynchronizer(settings, rateLimiter) + if err != nil { + t.Fatalf(`should have been no error, %v`, err) + } + + if synchronizer == nil { + t.Fatal("should have an Synchronizer instance") + } + + // NOTE: Ideally we have a custom logger that can be mocked to capture the log message + // and assert a warning was logged that the NGINX Plus host was not specified. + synchronizer.AddEvent(event) + actualEventCount := rateLimiter.Len() + if actualEventCount != expectedEventCount { + t.Fatalf(`expected %v events, got %v`, expectedEventCount, actualEventCount) + } +} + +func TestSynchronizer_AddEventOneHost(t *testing.T) { + const expectedEventCount = 1 + events := buildEvents(1) + settings, err := configuration.NewSettings(context.Background(), nil) + settings.NginxPlusHosts = []string{"https://localhost:8080"} + rateLimiter := &mocks.MockRateLimiter{} + + synchronizer, err := NewSynchronizer(settings, rateLimiter) + if err != nil { + t.Fatalf(`should have been no error, %v`, err) + } + + if synchronizer == nil { + t.Fatal("should have an Synchronizer instance") + } + + synchronizer.AddEvent(events[0]) + actualEventCount := rateLimiter.Len() + if actualEventCount != expectedEventCount { + t.Fatalf(`expected %v events, got %v`, expectedEventCount, actualEventCount) + } +} + +func TestSynchronizer_AddEventManyHosts(t *testing.T) { + const expectedEventCount = 1 + events := buildEvents(1) + settings, err := configuration.NewSettings(context.Background(), nil) + settings.NginxPlusHosts = []string{ + "https://localhost:8080", + "https://localhost:8081", + "https://localhost:8082", + } + rateLimiter := &mocks.MockRateLimiter{} + + synchronizer, err := NewSynchronizer(settings, rateLimiter) + if err != nil { + t.Fatalf(`should have been no error, %v`, err) + } + + if synchronizer == nil { + t.Fatal("should have an Synchronizer instance") + } + + synchronizer.AddEvent(events[0]) + actualEventCount := rateLimiter.Len() + if actualEventCount != expectedEventCount { + t.Fatalf(`expected %v events, got %v`, expectedEventCount, actualEventCount) + } +} + +func TestSynchronizer_AddEventsNoHosts(t *testing.T) { + const expectedEventCount = 0 + events := buildEvents(4) + settings, err := configuration.NewSettings(context.Background(), nil) + rateLimiter := &mocks.MockRateLimiter{} + + synchronizer, err := NewSynchronizer(settings, rateLimiter) + if err != nil { + t.Fatalf(`should have been no error, %v`, err) + } + + if synchronizer == nil { + t.Fatal("should have an Synchronizer instance") + } + + // NOTE: Ideally we have a custom logger that can be mocked to capture the log message + // and assert a warning was logged that the NGINX Plus host was not specified. + synchronizer.AddEvents(events) + actualEventCount := rateLimiter.Len() + if actualEventCount != expectedEventCount { + t.Fatalf(`expected %v events, got %v`, expectedEventCount, actualEventCount) + } +} + +func TestSynchronizer_AddEventsOneHost(t *testing.T) { + const expectedEventCount = 4 + events := buildEvents(4) + settings, err := configuration.NewSettings(context.Background(), nil) + settings.NginxPlusHosts = []string{"https://localhost:8080"} + rateLimiter := &mocks.MockRateLimiter{} + + synchronizer, err := NewSynchronizer(settings, rateLimiter) + if err != nil { + t.Fatalf(`should have been no error, %v`, err) + } + + if synchronizer == nil { + t.Fatal("should have an Synchronizer instance") + } + + synchronizer.AddEvents(events) + actualEventCount := rateLimiter.Len() + if actualEventCount != expectedEventCount { + t.Fatalf(`expected %v events, got %v`, expectedEventCount, actualEventCount) + } +} + +func TestSynchronizer_AddEventsManyHosts(t *testing.T) { + const eventCount = 4 + events := buildEvents(eventCount) + rateLimiter := &mocks.MockRateLimiter{} + settings, err := configuration.NewSettings(context.Background(), nil) + settings.NginxPlusHosts = []string{ + "https://localhost:8080", + "https://localhost:8081", + "https://localhost:8082", + } + expectedEventCount := eventCount * len(settings.NginxPlusHosts) + + synchronizer, err := NewSynchronizer(settings, rateLimiter) if err != nil { t.Fatalf(`should have been no error, %v`, err) } @@ -17,4 +175,24 @@ func TestNewNginxPlusSynchronizer(t *testing.T) { if synchronizer == nil { t.Fatal("should have an Synchronizer instance") } + + synchronizer.AddEvents(events) + actualEventCount := rateLimiter.Len() + if actualEventCount != expectedEventCount { + t.Fatalf(`expected %v events, got %v`, expectedEventCount, actualEventCount) + } +} + +func buildEvents(count int) core.ServerUpdateEvents { + events := make(core.ServerUpdateEvents, count) + for i := 0; i < count; i++ { + events[i] = &core.ServerUpdateEvent{ + Id: fmt.Sprintf("id-%v", i), + NginxHost: "https://localhost:8080", + Type: 0, + UpstreamName: "", + Servers: nil, + } + } + return events } diff --git a/internal/translation/translation_test.go b/internal/translation/translation_test.go index 1e75ca7..a37163e 100644 --- a/internal/translation/translation_test.go +++ b/internal/translation/translation_test.go @@ -4,183 +4,673 @@ package translation -// -//import ( -// "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" -// v1 "k8s.io/api/networking/v1" -// "testing" -//) -// -//func TestTranslateNoAddresses(t *testing.T) { -// const expectedUpstreams = 0 -// -// ingress := &v1.Ingress{} -// previousIngress := &v1.Ingress{} -// -// event := core.NewEvent(core.Created, ingress, previousIngress) -// updatedEvent, err := Translate(&event) -// if err != nil { -// t.Fatalf("Translate() error = %v", err) -// } -// -// actualUpstreams := len(updatedEvent.NginxUpstreams) -// if actualUpstreams != expectedUpstreams { -// t.Fatalf("expected %v upstreams, got %v", expectedUpstreams, actualUpstreams) -// } -//} -// -//func TestTranslateOneIp(t *testing.T) { -// const expectedUpstreams = 1 -// -// lbIngress := v1.IngressLoadBalancerIngress{ -// IP: "127.0.0.1", -// } -// -// ingress := &v1.Ingress{ -// Status: v1.IngressStatus{ -// LoadBalancer: v1.IngressLoadBalancerStatus{ -// Ingress: []v1.IngressLoadBalancerIngress{ -// lbIngress, -// }, -// }, -// }, -// } -// -// previousIngress := &v1.Ingress{} -// -// event := core.NewEvent(core.Created, ingress, previousIngress) -// updatedEvent, err := Translate(&event) -// if err != nil { -// t.Fatalf("Translate() error = %v", err) -// } -// -// actualUpstreams := len(updatedEvent.NginxUpstreams) -// if actualUpstreams != expectedUpstreams { -// t.Fatalf("expected %v upstreams, got %v", expectedUpstreams, actualUpstreams) -// } -//} -// -//func TestTranslateOneHost(t *testing.T) { -// const expectedUpstreams = 1 -// -// lbIngress := v1.IngressLoadBalancerIngress{ -// Hostname: "www.example.com", -// } -// -// ingress := &v1.Ingress{ -// Status: v1.IngressStatus{ -// LoadBalancer: v1.IngressLoadBalancerStatus{ -// Ingress: []v1.IngressLoadBalancerIngress{ -// lbIngress, -// }, -// }, -// }, -// } -// -// previousIngress := &v1.Ingress{} -// -// event := core.NewEvent(core.Created, ingress, previousIngress) -// updatedEvent, err := Translate(&event) -// if err != nil { -// t.Fatalf("Translate() error = %v", err) -// } -// -// actualUpstreams := len(updatedEvent.NginxUpstreams) -// if actualUpstreams != expectedUpstreams { -// t.Fatalf("expected %v upstreams, got %v", expectedUpstreams, actualUpstreams) -// } -//} -// -//func TestTranslateOneHostAndOneIP(t *testing.T) { -// const expectedUpstreams = 2 -// -// nodeIps := []string{"192.168.1.1"} -// -// lbHostnameIngress := v1.IngressLoadBalancerIngress{ -// Hostname: "www.example.com", -// } -// -// lbIPIngress := v1.IngressLoadBalancerIngress{ -// IP: "127.0.0.1", -// } -// -// ingress := &v1.Ingress{ -// Status: v1.IngressStatus{ -// LoadBalancer: v1.IngressLoadBalancerStatus{ -// Ingress: []v1.IngressLoadBalancerIngress{ -// lbHostnameIngress, -// lbIPIngress, -// }, -// }, -// }, -// } -// -// previousIngress := &v1.Ingress{} -// -// event := core.NewEvent(core.Created, ingress, previousIngress, nodeIps) -// updatedEvent, err := Translate(&event) -// if err != nil { -// t.Fatalf("Translate() error = %v", err) -// } -// -// actualUpstreams := len(updatedEvent.NginxUpstreams) -// if actualUpstreams != expectedUpstreams { -// t.Fatalf("expected %v upstreams, got %v", expectedUpstreams, actualUpstreams) -// } -//} -// -//func TestTranslateMulitpleRoutes(t *testing.T) { -// const expectedUpstreams = 6 -// -// lbHostnameIngress := v1.IngressLoadBalancerIngress{ -// Hostname: "www.example.com", -// } -// -// lbHostnameIngress1 := v1.IngressLoadBalancerIngress{ -// Hostname: "www.example.net", -// } -// -// lbHostnameIngress2 := v1.IngressLoadBalancerIngress{ -// Hostname: "www.example.org", -// } -// -// lbHostnameIngress3 := v1.IngressLoadBalancerIngress{ -// Hostname: "www.acme.com", -// } -// -// lbIPIngress := v1.IngressLoadBalancerIngress{ -// IP: "127.0.0.1", -// } -// -// lbIPIngress1 := v1.IngressLoadBalancerIngress{ -// IP: "192.168.0.1", -// } -// -// ingress := &v1.Ingress{ -// Status: v1.IngressStatus{ -// LoadBalancer: v1.IngressLoadBalancerStatus{ -// Ingress: []v1.IngressLoadBalancerIngress{ -// lbHostnameIngress, -// lbHostnameIngress1, -// lbHostnameIngress2, -// lbHostnameIngress3, -// lbIPIngress, -// lbIPIngress1, -// }, -// }, -// }, -// } -// -// previousIngress := &v1.Ingress{} -// -// event := core.NewEvent(core.Created, ingress, previousIngress) -// updatedEvent, err := Translate(&event) -// if err != nil { -// t.Fatalf("Translate() error = %v", err) -// } -// -// actualUpstreams := len(updatedEvent.NginxUpstreams) -// if actualUpstreams != expectedUpstreams { -// t.Fatalf("expected %v upstreams, got %v", expectedUpstreams, actualUpstreams) -// } -//} +import ( + "fmt" + "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" + v1 "k8s.io/api/core/v1" + "math/rand" + "testing" + "time" +) + +const ( + AssertionFailureFormat = "expected %v events, got %v" + ManyNodes = 7 + NoNodes = 0 + OneNode = 1 + TranslateErrorFormat = "Translate() error = %v" +) + +/* + * Created Event Tests + */ + +func TestCreatedTranslateNoPorts(t *testing.T) { + const expectedEventCount = 0 + + service := defaultService() + event := buildCreatedEvent(service, OneNode) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } +} + +func TestCreatedTranslateNoInterestingPorts(t *testing.T) { + const expectedEventCount = 0 + const portCount = 1 + + ports := generateUpdatablePorts(portCount, 0) + service := serviceWithPorts(ports) + event := buildCreatedEvent(service, OneNode) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } +} + +func TestCreatedTranslateOneInterestingPort(t *testing.T) { + const expectedEventCount = 1 + const portCount = 1 + + ports := generatePorts(portCount) + service := serviceWithPorts(ports) + event := buildCreatedEvent(service, OneNode) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, OneNode, translatedEvents) +} + +func TestCreatedTranslateManyInterestingPorts(t *testing.T) { + const expectedEventCount = 4 + const portCount = 4 + + ports := generatePorts(portCount) + service := serviceWithPorts(ports) + event := buildCreatedEvent(service, OneNode) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, OneNode, translatedEvents) +} + +func TestCreatedTranslateManyMixedPorts(t *testing.T) { + const expectedEventCount = 2 + const portCount = 6 + const updatablePortCount = 2 + + ports := generateUpdatablePorts(portCount, updatablePortCount) + service := serviceWithPorts(ports) + event := buildCreatedEvent(service, OneNode) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, OneNode, translatedEvents) +} + +func TestCreatedTranslateManyMixedPortsAndManyNodes(t *testing.T) { + const expectedEventCount = 2 + const portCount = 6 + const updatablePortCount = 2 + + ports := generateUpdatablePorts(portCount, updatablePortCount) + service := serviceWithPorts(ports) + event := buildCreatedEvent(service, ManyNodes) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, ManyNodes, translatedEvents) +} + +/* + * Updated Event Tests + */ + +func TestUpdatedTranslateNoPorts(t *testing.T) { + const expectedEventCount = 0 + + service := defaultService() + event := buildUpdatedEvent(service, OneNode) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } +} + +func TestUpdatedTranslateNoInterestingPorts(t *testing.T) { + const expectedEventCount = 0 + const portCount = 1 + + ports := generateUpdatablePorts(portCount, 0) + service := serviceWithPorts(ports) + event := buildUpdatedEvent(service, OneNode) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } +} + +func TestUpdatedTranslateOneInterestingPort(t *testing.T) { + const expectedEventCount = 1 + const portCount = 1 + + ports := generatePorts(portCount) + service := serviceWithPorts(ports) + event := buildUpdatedEvent(service, OneNode) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, OneNode, translatedEvents) +} + +func TestUpdatedTranslateManyInterestingPorts(t *testing.T) { + const expectedEventCount = 4 + const portCount = 4 + + ports := generatePorts(portCount) + service := serviceWithPorts(ports) + event := buildUpdatedEvent(service, OneNode) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, OneNode, translatedEvents) +} + +func TestUpdatedTranslateManyMixedPorts(t *testing.T) { + const expectedEventCount = 2 + const portCount = 6 + const updatablePortCount = 2 + + ports := generateUpdatablePorts(portCount, updatablePortCount) + service := serviceWithPorts(ports) + event := buildUpdatedEvent(service, OneNode) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, OneNode, translatedEvents) +} + +func TestUpdatedTranslateManyMixedPortsAndManyNodes(t *testing.T) { + const expectedEventCount = 2 + const portCount = 6 + const updatablePortCount = 2 + + ports := generateUpdatablePorts(portCount, updatablePortCount) + service := serviceWithPorts(ports) + event := buildUpdatedEvent(service, ManyNodes) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, ManyNodes, translatedEvents) +} + +/* + * Deleted Event Tests + */ + +func TestDeletedTranslateNoPortsAndNoNodes(t *testing.T) { + const expectedEventCount = 0 + + service := defaultService() + event := buildDeletedEvent(service, NoNodes) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, ManyNodes, translatedEvents) +} + +func TestDeletedTranslateNoInterestingPortsAndNoNodes(t *testing.T) { + const expectedEventCount = 0 + const portCount = 1 + + ports := generateUpdatablePorts(portCount, 0) + service := serviceWithPorts(ports) + event := buildDeletedEvent(service, NoNodes) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, ManyNodes, translatedEvents) +} + +func TestDeletedTranslateOneInterestingPortAndNoNodes(t *testing.T) { + const expectedEventCount = 0 + const portCount = 1 + + ports := generatePorts(portCount) + service := serviceWithPorts(ports) + event := buildDeletedEvent(service, NoNodes) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, ManyNodes, translatedEvents) +} + +func TestDeletedTranslateManyInterestingPortsAndNoNodes(t *testing.T) { + const expectedEventCount = 0 + const portCount = 4 + + ports := generatePorts(portCount) + service := serviceWithPorts(ports) + event := buildDeletedEvent(service, NoNodes) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, ManyNodes, translatedEvents) +} + +func TestDeletedTranslateManyMixedPortsAndNoNodes(t *testing.T) { + const expectedEventCount = 0 + const portCount = 6 + const updatablePortCount = 2 + + ports := generateUpdatablePorts(portCount, updatablePortCount) + service := serviceWithPorts(ports) + event := buildDeletedEvent(service, NoNodes) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, ManyNodes, translatedEvents) +} + +func TestDeletedTranslateNoPortsAndOneNode(t *testing.T) { + const expectedEventCount = 0 + + service := defaultService() + event := buildDeletedEvent(service, OneNode) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, ManyNodes, translatedEvents) +} + +func TestDeletedTranslateNoInterestingPortsAndOneNode(t *testing.T) { + const expectedEventCount = 0 + const portCount = 1 + + ports := generateUpdatablePorts(portCount, 0) + service := serviceWithPorts(ports) + event := buildDeletedEvent(service, OneNode) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, ManyNodes, translatedEvents) +} + +func TestDeletedTranslateOneInterestingPortAndOneNode(t *testing.T) { + const expectedEventCount = 1 + const portCount = 1 + + ports := generatePorts(portCount) + service := serviceWithPorts(ports) + event := buildDeletedEvent(service, OneNode) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, OneNode, translatedEvents) +} + +func TestDeletedTranslateManyInterestingPortsAndOneNode(t *testing.T) { + const expectedEventCount = 4 + const portCount = 4 + + ports := generatePorts(portCount) + service := serviceWithPorts(ports) + event := buildDeletedEvent(service, OneNode) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, OneNode, translatedEvents) +} + +func TestDeletedTranslateManyMixedPortsAndOneNode(t *testing.T) { + const expectedEventCount = 2 + const portCount = 6 + const updatablePortCount = 2 + + ports := generateUpdatablePorts(portCount, updatablePortCount) + service := serviceWithPorts(ports) + event := buildDeletedEvent(service, OneNode) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, OneNode, translatedEvents) +} + +func TestDeletedTranslateNoPortsAndManyNodes(t *testing.T) { + const expectedEventCount = 0 + + service := defaultService() + event := buildDeletedEvent(service, ManyNodes) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, ManyNodes, translatedEvents) +} + +func TestDeletedTranslateNoInterestingPortsAndManyNodes(t *testing.T) { + const portCount = 1 + const updatablePortCount = 0 + const expectedEventCount = updatablePortCount * ManyNodes + + ports := generateUpdatablePorts(portCount, updatablePortCount) + service := serviceWithPorts(ports) + event := buildDeletedEvent(service, ManyNodes) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, ManyNodes, translatedEvents) +} + +func TestDeletedTranslateOneInterestingPortAndManyNodes(t *testing.T) { + const portCount = 1 + const expectedEventCount = portCount * ManyNodes + + ports := generatePorts(portCount) + service := serviceWithPorts(ports) + event := buildDeletedEvent(service, ManyNodes) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, OneNode, translatedEvents) +} + +func TestDeletedTranslateManyInterestingPortsAndManyNodes(t *testing.T) { + const portCount = 4 + const expectedEventCount = portCount * ManyNodes + + ports := generatePorts(portCount) + service := serviceWithPorts(ports) + event := buildDeletedEvent(service, ManyNodes) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, OneNode, translatedEvents) +} + +func TestDeletedTranslateManyMixedPortsAndManyNodes(t *testing.T) { + const portCount = 6 + const updatablePortCount = 2 + const expectedEventCount = updatablePortCount * ManyNodes + + ports := generateUpdatablePorts(portCount, updatablePortCount) + service := serviceWithPorts(ports) + event := buildDeletedEvent(service, ManyNodes) + + translatedEvents, err := Translate(&event) + if err != nil { + t.Fatalf(TranslateErrorFormat, err) + } + + actualEventCount := len(translatedEvents) + if actualEventCount != expectedEventCount { + t.Fatalf(AssertionFailureFormat, expectedEventCount, actualEventCount) + } + + assertExpectedServerCount(t, OneNode, translatedEvents) +} + +func assertExpectedServerCount(t *testing.T, expectedCount int, events core.ServerUpdateEvents) { + for _, translatedEvent := range events { + serverCount := len(translatedEvent.Servers) + if serverCount != expectedCount { + t.Fatalf("expected %d servers, got %d", expectedCount, serverCount) + } + } +} + +func defaultService() *v1.Service { + return &v1.Service{} +} + +func serviceWithPorts(ports []v1.ServicePort) *v1.Service { + return &v1.Service{ + Spec: v1.ServiceSpec{ + Ports: ports, + }, + } +} + +func buildCreatedEvent(service *v1.Service, nodeCount int) core.Event { + return buildEvent(core.Created, service, nodeCount) +} + +func buildDeletedEvent(service *v1.Service, nodeCount int) core.Event { + return buildEvent(core.Deleted, service, nodeCount) +} + +func buildUpdatedEvent(service *v1.Service, nodeCount int) core.Event { + return buildEvent(core.Updated, service, nodeCount) +} + +func buildEvent(eventType core.EventType, service *v1.Service, nodeCount int) core.Event { + previousService := defaultService() + + nodeIps := generateNodeIps(nodeCount) + + return core.NewEvent(eventType, service, previousService, nodeIps) +} + +func generateNodeIps(count int) []string { + var nodeIps []string + + for i := 0; i < count; i++ { + nodeIps = append(nodeIps, fmt.Sprintf("10.0.0.%v", i)) + } + + return nodeIps +} + +func generatePorts(portCount int) []v1.ServicePort { + return generateUpdatablePorts(portCount, portCount) +} + +// This is probably A Little Bit of Too Muchâ„¢, but helps to ensure ordering is not a factor. +func generateUpdatablePorts(portCount int, updatableCount int) []v1.ServicePort { + var ports []v1.ServicePort + + updatable := make([]string, updatableCount) + nonupdatable := make([]string, portCount-updatableCount) + + for i := range updatable { + updatable[i] = NklPrefix + } + + for j := range nonupdatable { + nonupdatable[j] = "olm-" + } + + prefixes := append(updatable, nonupdatable...) + + source := rand.NewSource(time.Now().UnixNano()) + random := rand.New(source) + random.Shuffle(len(prefixes), func(i, j int) { prefixes[i], prefixes[j] = prefixes[j], prefixes[i] }) + + for i, prefix := range prefixes { + ports = append(ports, v1.ServicePort{ + Name: fmt.Sprintf("%sport-%d", prefix, i), + }) + } + + return ports +} diff --git a/internal/translation/translator.go b/internal/translation/translator.go index 02b5df5..d37b161 100644 --- a/internal/translation/translator.go +++ b/internal/translation/translator.go @@ -42,7 +42,7 @@ func filterPorts(ports []v1.ServicePort) []v1.ServicePort { func buildServerUpdateEvents(ports []v1.ServicePort, event *core.Event) (core.ServerUpdateEvents, error) { logrus.Debugf("Translate::buildServerUpdateEvents(ports=%#v)", ports) - updateEvents := core.ServerUpdateEvents{} + events := core.ServerUpdateEvents{} for _, port := range ports { ingressName := fixIngressName(port.Name) servers, _ := buildServers(event.NodeIps, port) @@ -51,10 +51,10 @@ func buildServerUpdateEvents(ports []v1.ServicePort, event *core.Event) (core.Se case core.Created: fallthrough case core.Updated: - updateEvents = append(updateEvents, core.NewServerUpdateEvent(event.Type, ingressName, servers)) + events = append(events, core.NewServerUpdateEvent(event.Type, ingressName, servers)) case core.Deleted: for _, server := range servers { - updateEvents = append(updateEvents, core.NewServerUpdateEvent(event.Type, ingressName, []nginxClient.StreamUpstreamServer{server})) + events = append(events, core.NewServerUpdateEvent(event.Type, ingressName, []nginxClient.StreamUpstreamServer{server})) } default: logrus.Warnf(`Translator::buildServerUpdateEvents: unknown event type: %d`, event.Type) @@ -62,7 +62,7 @@ func buildServerUpdateEvents(ports []v1.ServicePort, event *core.Event) (core.Se } - return updateEvents, nil + return events, nil } func buildServers(nodeIps []string, port v1.ServicePort) ([]nginxClient.StreamUpstreamServer, error) { diff --git a/test/mocks/mock_check.go b/test/mocks/mock_check.go new file mode 100644 index 0000000..17a4104 --- /dev/null +++ b/test/mocks/mock_check.go @@ -0,0 +1,17 @@ +// Copyright 2023 f5 Inc. All rights reserved. +// Use of this source code is governed by the Apache +// license that can be found in the LICENSE file. + +package mocks + +type MockCheck struct { + result bool +} + +func NewMockCheck(result bool) *MockCheck { + return &MockCheck{result: result} +} + +func (m *MockCheck) Check() bool { + return m.result +} diff --git a/test/mocks/mock_handler.go b/test/mocks/mock_handler.go new file mode 100644 index 0000000..b57b410 --- /dev/null +++ b/test/mocks/mock_handler.go @@ -0,0 +1,26 @@ +// Copyright 2023 f5 Inc. All rights reserved. +// Use of this source code is governed by the Apache +// license that can be found in the LICENSE file. + +package mocks + +import "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" + +type MockHandler struct { +} + +func (h *MockHandler) AddRateLimitedEvent(_ *core.Event) { + +} + +func (h *MockHandler) Initialize() { + +} + +func (h *MockHandler) Run(_ <-chan struct{}) { + +} + +func (h *MockHandler) ShutDown() { + +} diff --git a/test/mocks/mock_ratelimitinginterface.go b/test/mocks/mock_ratelimitinginterface.go new file mode 100644 index 0000000..0a15163 --- /dev/null +++ b/test/mocks/mock_ratelimitinginterface.go @@ -0,0 +1,56 @@ +// Copyright 2023 f5 Inc. All rights reserved. +// Use of this source code is governed by the Apache +// license that can be found in the LICENSE file. + +package mocks + +import "time" + +type MockRateLimiter struct { + items []interface{} +} + +func (m *MockRateLimiter) Add(_ interface{}) { +} + +func (m *MockRateLimiter) Len() int { + return len(m.items) +} + +func (m *MockRateLimiter) Get() (item interface{}, shutdown bool) { + if len(m.items) > 0 { + item = m.items[0] + m.items = m.items[1:] + return item, false + } + return nil, false +} + +func (m *MockRateLimiter) Done(_ interface{}) { +} + +func (m *MockRateLimiter) ShutDown() { +} + +func (m *MockRateLimiter) ShutDownWithDrain() { +} + +func (m *MockRateLimiter) ShuttingDown() bool { + return true +} + +func (m *MockRateLimiter) AddAfter(item interface{}, _ time.Duration) { + m.items = append(m.items, item) +} + +func (m *MockRateLimiter) AddRateLimited(item interface{}) { + m.items = append(m.items, item) +} + +func (m *MockRateLimiter) Forget(_ interface{}) { + +} + +func (m *MockRateLimiter) NumRequeues(_ interface{}) int { + return 0 +} diff --git a/test/mocks/mock_responsewriter.go b/test/mocks/mock_responsewriter.go new file mode 100644 index 0000000..f0876e1 --- /dev/null +++ b/test/mocks/mock_responsewriter.go @@ -0,0 +1,32 @@ +// Copyright 2023 f5 Inc. All rights reserved. +// Use of this source code is governed by the Apache +// license that can be found in the LICENSE file. + +package mocks + +import "net/http" + +type MockResponseWriter struct { + body []byte +} + +func NewMockResponseWriter() *MockResponseWriter { + return &MockResponseWriter{} +} + +func (m *MockResponseWriter) Header() http.Header { + return nil +} + +func (m *MockResponseWriter) Write(body []byte) (int, error) { + m.body = append(m.body, body...) + return len(m.body), nil +} + +func (m *MockResponseWriter) WriteHeader(int) { + +} + +func (m *MockResponseWriter) Body() []byte { + return m.body +} diff --git a/test/mocks/mock_synchronizer.go b/test/mocks/mock_synchronizer.go new file mode 100644 index 0000000..6785837 --- /dev/null +++ b/test/mocks/mock_synchronizer.go @@ -0,0 +1,33 @@ +// Copyright 2023 f5 Inc. All rights reserved. +// Use of this source code is governed by the Apache +// license that can be found in the LICENSE file. + +package mocks + +import "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" + +type MockSynchronizer struct { + Events []core.ServerUpdateEvent +} + +func (s *MockSynchronizer) AddEvents(events core.ServerUpdateEvents) { + for _, event := range events { + s.Events = append(s.Events, *event) + } +} + +func (s *MockSynchronizer) AddEvent(event *core.ServerUpdateEvent) { + s.Events = append(s.Events, *event) +} + +func (s *MockSynchronizer) Initialize() error { + return nil +} + +func (s *MockSynchronizer) Run(stopCh <-chan struct{}) { + <-stopCh +} + +func (s *MockSynchronizer) ShutDown() { + +}