Skip to content

fix/target-allocator: check CRDs availability in the cluster #4118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jul 10, 2025
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 121 additions & 28 deletions cmd/otel-allocator/internal/watcher/promOperator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"

Expand Down Expand Up @@ -57,7 +59,7 @@ func NewPrometheusCRWatcher(ctx context.Context, logger logr.Logger, cfg allocat

factory := informers.NewMonitoringInformerFactories(allowList, denyList, mClient, allocatorconfig.DefaultResyncTime, nil)

monitoringInformers, err := getInformers(factory)
monitoringInformers, err := getInformers(factory, cfg.ClusterConfig, promLogger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -188,34 +190,101 @@ func getNamespaceInformer(ctx context.Context, allowList, denyList map[string]st

}

// checkCRDAvailability checks if a specific CRD is available in the cluster
func checkCRDAvailability(dcl discovery.DiscoveryInterface, groupVersion string, resourceName string) (bool, error) {
apiList, err := dcl.ServerGroups()
if err != nil {
return false, err
}

apiGroups := apiList.Groups
for _, group := range apiGroups {
if group.Name == groupVersion {
for _, version := range group.Versions {
resources, err := dcl.ServerResourcesForGroupVersion(version.GroupVersion)
if err != nil {
return false, err
}

for _, resource := range resources.APIResources {
if resource.Name == resourceName {
return true, nil
}
}
}
}
}

return false, nil
}

// getInformers returns a map of informers for the given resources.
func getInformers(factory informers.FactoriesForNamespaces) (map[string]*informers.ForResource, error) {
serviceMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ServiceMonitorName))
func getInformers(factory informers.FactoriesForNamespaces, clusterConfig *rest.Config, logger *slog.Logger) (map[string]*informers.ForResource, error) {
informersMap := make(map[string]*informers.ForResource)

// Get the discovery client
dcl, err := discovery.NewDiscoveryClientForConfig(clusterConfig)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create discovery client: %w", err)
}

podMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PodMonitorName))
// Check for ServiceMonitor availability
serviceMonitorAvailable, err := checkCRDAvailability(dcl, "monitoring.coreos.com", "servicemonitors")
if err != nil {
return nil, err
logger.Warn("Failed to check ServiceMonitor availability", "error", err)
} else if serviceMonitorAvailable {
serviceMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ServiceMonitorName))
if err != nil {
return nil, err
}
informersMap[monitoringv1.ServiceMonitorName] = serviceMonitorInformers
} else {
logger.Warn("ServiceMonitor CRD not available, skipping informer")
}

probeInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ProbeName))
// Check for PodMonitor availability
podMonitorAvailable, err := checkCRDAvailability(dcl, "monitoring.coreos.com", "podmonitors")
if err != nil {
return nil, err
logger.Warn("Failed to check PodMonitor availability", "error", err)
} else if podMonitorAvailable {
podMonitorInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.PodMonitorName))
if err != nil {
return nil, err
}
informersMap[monitoringv1.PodMonitorName] = podMonitorInformers
} else {
logger.Warn("PodMonitor CRD not available, skipping informer")
}

scrapeConfigInformers, err := informers.NewInformersForResource(factory, promv1alpha1.SchemeGroupVersion.WithResource(promv1alpha1.ScrapeConfigName))
// Check for Probe availability
probeAvailable, err := checkCRDAvailability(dcl, "monitoring.coreos.com", "probes")
if err != nil {
return nil, err
logger.Warn("Failed to check Probe availability", "error", err)
} else if probeAvailable {
probeInformers, err := informers.NewInformersForResource(factory, monitoringv1.SchemeGroupVersion.WithResource(monitoringv1.ProbeName))
if err != nil {
return nil, err
}
informersMap[monitoringv1.ProbeName] = probeInformers
} else {
logger.Warn("Probe CRD not available, skipping informer")
}

return map[string]*informers.ForResource{
monitoringv1.ServiceMonitorName: serviceMonitorInformers,
monitoringv1.PodMonitorName: podMonitorInformers,
monitoringv1.ProbeName: probeInformers,
promv1alpha1.ScrapeConfigName: scrapeConfigInformers,
}, nil
// Check for ScrapeConfig availability
scrapeConfigAvailable, err := checkCRDAvailability(dcl, "monitoring.coreos.com", "scrapeconfigs")
if err != nil {
logger.Warn("Failed to check ScrapeConfig availability", "error", err)
} else if scrapeConfigAvailable {
scrapeConfigInformers, err := informers.NewInformersForResource(factory, promv1alpha1.SchemeGroupVersion.WithResource(promv1alpha1.ScrapeConfigName))
if err != nil {
return nil, err
}
informersMap[promv1alpha1.ScrapeConfigName] = scrapeConfigInformers
} else {
logger.Warn("ScrapeConfig CRD not available, skipping informer")
}

return informersMap, nil
}

// Watch wrapped informers and wait for an initial sync.
Expand Down Expand Up @@ -267,7 +336,13 @@ func (w *PrometheusCRWatcher) Watch(upstreamEvents chan Event, upstreamErrors ch
w.logger.Info("Unable to watch namespaces since namespace informer is nil")
}

// Only attempt to sync informers that were actually created
for name, resource := range w.informers {
if resource == nil {
w.logger.Info("Skipping nil informer", "informer", name)
continue
}

resource.Start(w.stopChannel)

if ok := w.WaitForNamedCacheSync(name, resource.HasSynced); !ok {
Expand Down Expand Up @@ -351,24 +426,42 @@ func (w *PrometheusCRWatcher) LoadConfig(ctx context.Context) (*promconfig.Confi
promCfg := &promconfig.Config{}

if w.resourceSelector != nil {
serviceMonitorInstances, err := w.resourceSelector.SelectServiceMonitors(ctx, w.informers[monitoringv1.ServiceMonitorName].ListAllByNamespace)
if err != nil {
return nil, err
var serviceMonitorInstances map[string]*monitoringv1.ServiceMonitor
var podMonitorInstances map[string]*monitoringv1.PodMonitor
var probeInstances map[string]*monitoringv1.Probe
var scrapeConfigInstances map[string]*promv1alpha1.ScrapeConfig
var err error

// Only try to get ServiceMonitors if the informer exists
if informer, ok := w.informers[monitoringv1.ServiceMonitorName]; ok {
serviceMonitorInstances, err = w.resourceSelector.SelectServiceMonitors(ctx, informer.ListAllByNamespace)
if err != nil {
return nil, err
}
}

podMonitorInstances, err := w.resourceSelector.SelectPodMonitors(ctx, w.informers[monitoringv1.PodMonitorName].ListAllByNamespace)
if err != nil {
return nil, err
// Only try to get PodMonitors if the informer exists
if informer, ok := w.informers[monitoringv1.PodMonitorName]; ok {
podMonitorInstances, err = w.resourceSelector.SelectPodMonitors(ctx, informer.ListAllByNamespace)
if err != nil {
return nil, err
}
}

probeInstances, err := w.resourceSelector.SelectProbes(ctx, w.informers[monitoringv1.ProbeName].ListAllByNamespace)
if err != nil {
return nil, err
// Only try to get Probes if the informer exists
if informer, ok := w.informers[monitoringv1.ProbeName]; ok {
probeInstances, err = w.resourceSelector.SelectProbes(ctx, informer.ListAllByNamespace)
if err != nil {
return nil, err
}
}

scrapeConfigInstances, err := w.resourceSelector.SelectScrapeConfigs(ctx, w.informers[promv1alpha1.ScrapeConfigName].ListAllByNamespace)
if err != nil {
return nil, err
// Only try to get ScrapeConfigs if the informer exists
if informer, ok := w.informers[promv1alpha1.ScrapeConfigName]; ok {
scrapeConfigInstances, err = w.resourceSelector.SelectScrapeConfigs(ctx, informer.ListAllByNamespace)
if err != nil {
return nil, err
}
}

generatedConfig, err := w.configGenerator.GenerateServerConfiguration(
Expand Down
Loading