Merge current v2.11 into v3.6

This commit is contained in:
mmatur
2026-01-15 11:26:40 +01:00
parent 27b27e9b1f
commit 3315a9fbec
195 changed files with 1748 additions and 1852 deletions
+58 -58
View File
@@ -70,62 +70,6 @@ func (p *Provider) SetRouterTransform(routerTransform k8s.RouterTransform) {
p.routerTransform = routerTransform
}
func (p *Provider) applyRouterTransform(ctx context.Context, rt *dynamic.Router, ingress *netv1.Ingress) {
if p.routerTransform == nil {
return
}
err := p.routerTransform.Apply(ctx, rt, ingress)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("Apply router transform")
}
}
// EndpointIngress holds the endpoint information for the Kubernetes provider.
type EndpointIngress struct {
IP string `description:"IP used for Kubernetes Ingress endpoints." json:"ip,omitempty" toml:"ip,omitempty" yaml:"ip,omitempty"`
Hostname string `description:"Hostname used for Kubernetes Ingress endpoints." json:"hostname,omitempty" toml:"hostname,omitempty" yaml:"hostname,omitempty"`
PublishedService string `description:"Published Kubernetes Service to copy status from." json:"publishedService,omitempty" toml:"publishedService,omitempty" yaml:"publishedService,omitempty"`
}
func (p *Provider) newK8sClient(ctx context.Context) (*clientWrapper, error) {
_, err := labels.Parse(p.LabelSelector)
if err != nil {
return nil, fmt.Errorf("invalid ingress label selector: %q", p.LabelSelector)
}
logger := log.Ctx(ctx)
logger.Info().Msgf("ingress label selector is: %q", p.LabelSelector)
withEndpoint := ""
if p.Endpoint != "" {
withEndpoint = fmt.Sprintf(" with endpoint %v", p.Endpoint)
}
var cl *clientWrapper
switch {
case os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "":
logger.Info().Msgf("Creating in-cluster Provider client%s", withEndpoint)
cl, err = newInClusterClient(p.Endpoint)
case os.Getenv("KUBECONFIG") != "":
logger.Info().Msgf("Creating cluster-external Provider client from KUBECONFIG %s", os.Getenv("KUBECONFIG"))
cl, err = newExternalClusterClientFromFile(os.Getenv("KUBECONFIG"))
default:
logger.Info().Msgf("Creating cluster-external Provider client%s", withEndpoint)
cl, err = newExternalClusterClient(p.Endpoint, p.CertAuthFilePath, p.Token)
}
if err != nil {
return nil, err
}
cl.ingressLabelSelector = p.LabelSelector
cl.disableIngressClassInformer = p.DisableIngressClassLookup || p.DisableClusterScopeResources
cl.disableClusterScopeInformer = p.DisableClusterScopeResources
return cl, nil
}
// Init the provider.
func (p *Provider) Init() error {
return nil
@@ -213,6 +157,62 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
return nil
}
func (p *Provider) applyRouterTransform(ctx context.Context, rt *dynamic.Router, ingress *netv1.Ingress) {
if p.routerTransform == nil {
return
}
err := p.routerTransform.Apply(ctx, rt, ingress)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("Apply router transform")
}
}
// EndpointIngress holds the endpoint information for the Kubernetes provider.
type EndpointIngress struct {
IP string `description:"IP used for Kubernetes Ingress endpoints." json:"ip,omitempty" toml:"ip,omitempty" yaml:"ip,omitempty"`
Hostname string `description:"Hostname used for Kubernetes Ingress endpoints." json:"hostname,omitempty" toml:"hostname,omitempty" yaml:"hostname,omitempty"`
PublishedService string `description:"Published Kubernetes Service to copy status from." json:"publishedService,omitempty" toml:"publishedService,omitempty" yaml:"publishedService,omitempty"`
}
func (p *Provider) newK8sClient(ctx context.Context) (*clientWrapper, error) {
_, err := labels.Parse(p.LabelSelector)
if err != nil {
return nil, fmt.Errorf("invalid ingress label selector: %q", p.LabelSelector)
}
logger := log.Ctx(ctx)
logger.Info().Msgf("ingress label selector is: %q", p.LabelSelector)
withEndpoint := ""
if p.Endpoint != "" {
withEndpoint = fmt.Sprintf(" with endpoint %v", p.Endpoint)
}
var cl *clientWrapper
switch {
case os.Getenv("KUBERNETES_SERVICE_HOST") != "" && os.Getenv("KUBERNETES_SERVICE_PORT") != "":
logger.Info().Msgf("Creating in-cluster Provider client%s", withEndpoint)
cl, err = newInClusterClient(p.Endpoint)
case os.Getenv("KUBECONFIG") != "":
logger.Info().Msgf("Creating cluster-external Provider client from KUBECONFIG %s", os.Getenv("KUBECONFIG"))
cl, err = newExternalClusterClientFromFile(os.Getenv("KUBECONFIG"))
default:
logger.Info().Msgf("Creating cluster-external Provider client%s", withEndpoint)
cl, err = newExternalClusterClient(p.Endpoint, p.CertAuthFilePath, p.Token)
}
if err != nil {
return nil, err
}
cl.ingressLabelSelector = p.LabelSelector
cl.disableIngressClassInformer = p.DisableIngressClassLookup || p.DisableClusterScopeResources
cl.disableClusterScopeInformer = p.DisableClusterScopeResources
return cl, nil
}
func (p *Provider) loadConfigurationFromIngresses(ctx context.Context, client Client) *dynamic.Configuration {
conf := &dynamic.Configuration{
HTTP: &dynamic.HTTPConfiguration{
@@ -901,13 +901,13 @@ func buildStrictPrefixMatchingRule(path string) string {
return fmt.Sprintf("(Path(`%[1]s`) || PathPrefix(`%[1]s/`))", path)
}
func throttleEvents(ctx context.Context, throttleDuration time.Duration, pool *safe.Pool, eventsChan <-chan interface{}) chan interface{} {
func throttleEvents(ctx context.Context, throttleDuration time.Duration, pool *safe.Pool, eventsChan <-chan any) chan any {
if throttleDuration == 0 {
return nil
}
// Create a buffered channel to hold the pending event (if we're delaying processing the event due to throttling).
eventsChanBuffered := make(chan interface{}, 1)
eventsChanBuffered := make(chan any, 1)
// Run a goroutine that reads events from eventChan and does a
// non-blocking write to pendingEvent. This guarantees that writing to