Skip to content

Add fan-out support for multiple NGINX+ Edge Load-Balancers #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 38 additions & 2 deletions cmd/nginx-k8s-edge-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ package main
import (
"context"
"fmt"
"github.com/nginxinc/kubernetes-nginx-ingress/internal/config"
"github.com/nginxinc/kubernetes-nginx-ingress/internal/observation"
"github.com/nginxinc/kubernetes-nginx-ingress/internal/synchronization"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

func main() {
Expand All @@ -23,7 +26,22 @@ func run() error {
ctx := context.Background()
var err error

synchronizer, err := synchronization.NewSynchronizer()
k8sClient, err := buildKubernetesClient()
if err != nil {
return fmt.Errorf(`error building a Kubernetes client: %w`, err)
}

settings, err := config.NewSettings(ctx, k8sClient)
if err != nil {
return fmt.Errorf(`error occurred creating settings: %w`, err)
}

err = settings.Initialize()
if err != nil {
return fmt.Errorf(`error occurred initializing settings: %w`, err)
}

synchronizer, err := synchronization.NewSynchronizer(settings)
if err != nil {
return fmt.Errorf(`error initializing synchronizer: %w`, err)
}
Expand All @@ -36,7 +54,7 @@ func run() error {
handler := observation.NewHandler(synchronizer)
handler.Initialize()

watcher, err := observation.NewWatcher(ctx, handler)
watcher, err := observation.NewWatcher(ctx, handler, k8sClient)
if err != nil {
return fmt.Errorf(`error occurred creating a watcher: %w`, err)
}
Expand All @@ -46,6 +64,7 @@ func run() error {
return fmt.Errorf(`error occurred initializing the watcher: %w`, err)
}

go settings.Run()
go handler.Run(ctx.Done())
go synchronizer.Run(ctx.Done())

Expand All @@ -57,3 +76,20 @@ func run() error {
<-ctx.Done()
return nil
}

func buildKubernetesClient() (*kubernetes.Clientset, error) {
logrus.Debug("Watcher::buildKubernetesClient")
k8sConfig, err := rest.InClusterConfig()
if err == rest.ErrNotInCluster {
return nil, fmt.Errorf(`not running in a Cluster: %w`, err)
} else if err != nil {
return nil, fmt.Errorf(`error occurred getting the Cluster config: %w`, err)
}

client, err := kubernetes.NewForConfig(k8sConfig)
if err != nil {
return nil, fmt.Errorf(`error occurred creating a client: %w`, err)
}

return client, nil
}
8 changes: 8 additions & 0 deletions deployment/nkl-configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: v1
kind: ConfigMap
data:
nginx-hosts:
"http://10.1.1.4:9000/api,http://10.1.1.5:9000/api"
metadata:
name: nkl-config
namespace: nkl
8 changes: 4 additions & 4 deletions deployment/nkl-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ kind: Deployment
metadata:
name: nkl-deployment
labels:
app: nec
app: nkl
spec:
replicas: 1
selector:
matchLabels:
app: nec
app: nkl
template:
metadata:
labels:
app: nec
app: nkl
spec:
containers:
- name: nginx-k8s-edge-controller
env:
- name: NGINX_PLUS_HOST
value: "http://192.168.1.109:9000/api"
value: "http://10.1.1.4:9000/api"
image: ciroque/nginx-k8s-edge-controller:latest
imagePullPolicy: Always
serviceAccountName: nginx-k8s-edge-controller
6 changes: 6 additions & 0 deletions deployment/nkl-namespace.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
apiVersion: v1
kind: Namespace
metadata:
name: nkl
labels:
name: nkl
126 changes: 118 additions & 8 deletions internal/config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,131 @@
package config

import (
"errors"
"os"
"context"
"fmt"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"strings"
)

const (
ConfigMapsNamespace = "nkl"
ResyncPeriod = 0
)

type Settings struct {
NginxPlusHost string
ctx context.Context
NginxPlusHosts []string
k8sClient *kubernetes.Clientset
informer cache.SharedInformer
eventHandlerRegistration cache.ResourceEventHandlerRegistration
}

func NewSettings() (*Settings, error) {
func NewSettings(ctx context.Context, k8sClient *kubernetes.Clientset) (*Settings, error) {
config := new(Settings)

config.NginxPlusHost = os.Getenv("NGINX_PLUS_HOST")
if config.NginxPlusHost == "" {
return nil, errors.New("the NGINX_PLUS_HOST variable is not defined. This is required")
}
config.k8sClient = k8sClient
config.ctx = ctx

return config, nil
}

func (s *Settings) Initialize() error {
logrus.Info("Settings::Initialize")

var err error

informer, err := s.buildInformer()
if err != nil {
return fmt.Errorf(`error occurred building ConfigMap informer: %w`, err)
}

s.informer = informer

err = s.initializeEventListeners()
if err != nil {
return fmt.Errorf(`error occurred initializing event listeners: %w`, err)
}

return nil
}

func (s *Settings) Run() {
logrus.Debug("Settings::Run")

defer utilruntime.HandleCrash()

go s.informer.Run(s.ctx.Done())

<-s.ctx.Done()
}

func (s *Settings) buildInformer() (cache.SharedInformer, error) {
options := informers.WithNamespace(ConfigMapsNamespace)
factory := informers.NewSharedInformerFactoryWithOptions(s.k8sClient, ResyncPeriod, options)
informer := factory.Core().V1().ConfigMaps().Informer()

return informer, nil
}

func (s *Settings) initializeEventListeners() error {
logrus.Debug("Settings::initializeEventListeners")

var err error

handlers := cache.ResourceEventHandlerFuncs{
AddFunc: s.handleAddEvent,
UpdateFunc: s.handleUpdateEvent,
DeleteFunc: s.handleDeleteEvent,
}

s.eventHandlerRegistration, err = s.informer.AddEventHandler(handlers)
if err != nil {
return fmt.Errorf(`error occurred registering event handlers: %w`, err)
}

return nil
}

func (s *Settings) handleAddEvent(obj interface{}) {
logrus.Debug("Settings::handleAddEvent")

s.handleUpdateEvent(obj, nil)
}

func (s *Settings) handleDeleteEvent(_ interface{}) {
logrus.Debug("Settings::handleDeleteEvent")

s.updateHosts([]string{})
}

func (s *Settings) handleUpdateEvent(obj interface{}, _ interface{}) {
logrus.Debug("Settings::handleUpdateEvent")

configMap, ok := obj.(*corev1.ConfigMap)
if !ok {
logrus.Errorf("Settings::handleUpdateEvent: could not convert obj to ConfigMap")
return
}

hosts, found := configMap.Data["nginx-hosts"]
if !found {
logrus.Errorf("Settings::handleUpdateEvent: nginx-hosts key not found in ConfigMap")
return
}

newHosts := s.parseHosts(hosts)
s.updateHosts(newHosts)
}

func (s *Settings) parseHosts(hosts string) []string {
return strings.Split(hosts, ",")
}

func (s *Settings) updateHosts(hosts []string) {
s.NginxPlusHosts = hosts
}
12 changes: 12 additions & 0 deletions internal/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Event struct {
}

type ServerUpdateEvent struct {
Id string
NginxHost string
Type EventType
UpstreamName string
Servers []nginxClient.StreamUpstreamServer
Expand All @@ -45,6 +47,16 @@ func NewServerUpdateEvent(eventType EventType, upstreamName string, servers []ng
}
}

func ServerUpdateEventWithIdAndHost(event *ServerUpdateEvent, id string, nginxHost string) *ServerUpdateEvent {
return &ServerUpdateEvent{
Id: id,
NginxHost: nginxHost,
Type: event.Type,
UpstreamName: event.UpstreamName,
Servers: event.Servers,
}
}

func (e *ServerUpdateEvent) TypeName() string {
switch e.Type {
case Created:
Expand Down
29 changes: 4 additions & 25 deletions internal/observation/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"time"
)
Expand All @@ -30,9 +29,10 @@ type Watcher struct {
informer cache.SharedIndexInformer
}

func NewWatcher(ctx context.Context, handler *Handler) (*Watcher, error) {
func NewWatcher(ctx context.Context, handler *Handler, k8sClient *kubernetes.Clientset) (*Watcher, error) {
return &Watcher{
ctx: ctx,
client: k8sClient,
handler: handler,
}, nil
}
Expand All @@ -41,11 +41,6 @@ func (w *Watcher) Initialize() error {
logrus.Debug("Watcher::Initialize")
var err error

w.client, err = w.buildKubernetesClient()
if err != nil {
return fmt.Errorf(`initalization error: %w`, err)
}

w.informer, err = w.buildInformer()
if err != nil {
return fmt.Errorf(`initialization error: %w`, err)
Expand Down Expand Up @@ -129,23 +124,6 @@ func (w *Watcher) buildInformer() (cache.SharedIndexInformer, error) {
return informer, nil
}

func (w *Watcher) buildKubernetesClient() (*kubernetes.Clientset, error) {
logrus.Debug("Watcher::buildKubernetesClient")
k8sConfig, err := rest.InClusterConfig()
if err == rest.ErrNotInCluster {
return nil, fmt.Errorf(`not running in a Cluster: %w`, err)
} else if err != nil {
return nil, fmt.Errorf(`error occurred getting the Cluster config: %w`, err)
}

client, err := kubernetes.NewForConfig(k8sConfig)
if err != nil {
return nil, fmt.Errorf(`error occurred creating a client: %w`, err)
}

return client, nil
}

func (w *Watcher) initializeEventListeners() error {
logrus.Debug("Watcher::initializeEventListeners")
var err error
Expand Down Expand Up @@ -186,7 +164,8 @@ func (w *Watcher) retrieveNodeIps() ([]string, error) {
}
}

logrus.Infof("Watcher::retrieveNodeIps duration: %d", time.Since(started).Nanoseconds())
logrus.Debugf("Watcher::retrieveNodeIps duration: %d", time.Since(started).Nanoseconds())

return nodeIps, nil
}

Expand Down
31 changes: 31 additions & 0 deletions internal/synchronization/rand.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2023 f5 Inc. All rights reserved.
// Use of this source code is governed by the Apache
// license that can be found in the LICENSE file.

package synchronization

import (
"math/rand"
"time"
)

var charset = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
var number = []byte("0123456789")
var alphaNumeric = append(charset, number...)

// RandomString where n is the length of random string we want to generate
func RandomString(n int) string {
b := make([]byte, n)
for i := range b {
// randomly select 1 character from given charset
b[i] = alphaNumeric[rand.Intn(len(alphaNumeric))]
}
return string(b)
}

func RandomMilliseconds(min, max int) time.Duration {
randomizer := rand.New(rand.NewSource(time.Now().UnixNano()))
random := randomizer.Intn(max-min) + min

return time.Millisecond * time.Duration(random)
}
Loading