mirror of
https://github.com/traefik/traefik
synced 2026-02-03 08:50:32 +00:00
Merge branch v3.6 into master
This commit is contained in:
@@ -466,42 +466,52 @@ func TestServiceTCPHealthChecker_Launch(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
lb := &testLoadBalancer{}
|
||||
// Create load balancer with event channel for synchronization.
|
||||
lb := &testLoadBalancer{
|
||||
RWMutex: &sync.RWMutex{},
|
||||
eventCh: make(chan struct{}, len(test.server.StatusSequence)+5),
|
||||
}
|
||||
serviceInfo := &truntime.TCPServiceInfo{}
|
||||
|
||||
service := NewServiceTCPHealthChecker(ctx, test.config, lb, serviceInfo, targets, "serviceName")
|
||||
|
||||
go service.Launch(ctx)
|
||||
|
||||
// How much time to wait for the health check to actually complete.
|
||||
deadline := time.Now().Add(200 * time.Millisecond)
|
||||
// TLS handshake can take much longer.
|
||||
// Timeout for each event - TLS handshake can take longer.
|
||||
eventTimeout := 500 * time.Millisecond
|
||||
if test.server.TLS {
|
||||
deadline = time.Now().Add(1000 * time.Millisecond)
|
||||
eventTimeout = 2 * time.Second
|
||||
}
|
||||
|
||||
// Wait for all health checks to complete deterministically
|
||||
// Wait for health check events using channel synchronization.
|
||||
// Iterate over StatusSequence to release each connection via Next().
|
||||
for i := range test.server.StatusSequence {
|
||||
test.server.Next()
|
||||
|
||||
initialUpserted := lb.numUpsertedServers
|
||||
initialRemoved := lb.numRemovedServers
|
||||
|
||||
for time.Now().Before(deadline) {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
if lb.numUpsertedServers > initialUpserted || lb.numRemovedServers > initialRemoved {
|
||||
// Stop the health checker immediately after the last expected sequence completes
|
||||
// to prevent extra health checks from firing and modifying the counters.
|
||||
if i == len(test.server.StatusSequence)-1 {
|
||||
cancel()
|
||||
}
|
||||
break
|
||||
select {
|
||||
case <-lb.eventCh:
|
||||
// Event received
|
||||
// On the last iteration, stop the health checker immediately
|
||||
// to prevent extra checks from modifying the counters.
|
||||
if i == len(test.server.StatusSequence)-1 {
|
||||
test.server.Close()
|
||||
cancel()
|
||||
}
|
||||
case <-time.After(eventTimeout):
|
||||
t.Fatalf("timeout waiting for health check event %d/%d", i+1, len(test.server.StatusSequence))
|
||||
}
|
||||
}
|
||||
|
||||
assert.Equal(t, test.expNumRemovedServers, lb.numRemovedServers, "removed servers")
|
||||
assert.Equal(t, test.expNumUpsertedServers, lb.numUpsertedServers, "upserted servers")
|
||||
// Small delay to let goroutines clean up.
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
lb.RLock()
|
||||
removedServers := lb.numRemovedServers
|
||||
upsertedServers := lb.numUpsertedServers
|
||||
lb.RUnlock()
|
||||
|
||||
assert.Equal(t, test.expNumRemovedServers, removedServers, "removed servers")
|
||||
assert.Equal(t, test.expNumUpsertedServers, upsertedServers, "upserted servers")
|
||||
assert.Equal(t, map[string]string{test.server.Addr.String(): test.targetStatus}, serviceInfo.GetAllStatus())
|
||||
})
|
||||
}
|
||||
@@ -597,6 +607,8 @@ type sequencedTCPServer struct {
|
||||
StatusSequence []tcpMockSequence
|
||||
TLS bool
|
||||
release chan struct{}
|
||||
mu sync.Mutex
|
||||
listener net.Listener
|
||||
}
|
||||
|
||||
func newTCPServer(t *testing.T, tlsEnabled bool, statusSequence ...tcpMockSequence) *sequencedTCPServer {
|
||||
@@ -624,17 +636,28 @@ func (s *sequencedTCPServer) Next() {
|
||||
s.release <- struct{}{}
|
||||
}
|
||||
|
||||
func (s *sequencedTCPServer) Close() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.listener != nil {
|
||||
s.listener.Close()
|
||||
s.listener = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *sequencedTCPServer) Start(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
go func() {
|
||||
var listener net.Listener
|
||||
|
||||
for _, seq := range s.StatusSequence {
|
||||
<-s.release
|
||||
if listener != nil {
|
||||
listener.Close()
|
||||
|
||||
s.mu.Lock()
|
||||
if s.listener != nil {
|
||||
s.listener.Close()
|
||||
s.listener = nil
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
if !seq.accept {
|
||||
continue
|
||||
@@ -643,7 +666,7 @@ func (s *sequencedTCPServer) Start(t *testing.T) {
|
||||
lis, err := net.ListenTCP("tcp", s.Addr)
|
||||
require.NoError(t, err)
|
||||
|
||||
listener = lis
|
||||
var listener net.Listener = lis
|
||||
|
||||
if s.TLS {
|
||||
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
|
||||
@@ -670,8 +693,18 @@ func (s *sequencedTCPServer) Start(t *testing.T) {
|
||||
)
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.listener = listener
|
||||
s.mu.Unlock()
|
||||
|
||||
conn, err := listener.Accept()
|
||||
require.NoError(t, err)
|
||||
if err != nil {
|
||||
// Listener was closed during shutdown - this is expected behavior.
|
||||
if strings.Contains(err.Error(), "use of closed network connection") {
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
_ = conn.Close()
|
||||
})
|
||||
|
||||
@@ -418,7 +418,12 @@ func TestServiceHealthChecker_Launch(t *testing.T) {
|
||||
|
||||
targetURL, timeout := test.server.Start(t, cancel)
|
||||
|
||||
lb := &testLoadBalancer{RWMutex: &sync.RWMutex{}}
|
||||
// Create load balancer with event channel for synchronization.
|
||||
expectedEvents := test.expNumRemovedServers + test.expNumUpsertedServers
|
||||
lb := &testLoadBalancer{
|
||||
RWMutex: &sync.RWMutex{},
|
||||
eventCh: make(chan struct{}, expectedEvents+5),
|
||||
}
|
||||
|
||||
config := &dynamic.ServerHealthCheck{
|
||||
Mode: test.mode,
|
||||
@@ -441,18 +446,30 @@ func TestServiceHealthChecker_Launch(t *testing.T) {
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
t.Fatal("test did not complete in time")
|
||||
case <-ctx.Done():
|
||||
wg.Wait()
|
||||
// Wait for expected health check events using channel synchronization.
|
||||
for i := range expectedEvents {
|
||||
select {
|
||||
case <-lb.eventCh:
|
||||
// Event received.
|
||||
// On the last event, cancel to prevent extra health checks.
|
||||
if i == expectedEvents-1 {
|
||||
cancel()
|
||||
}
|
||||
case <-time.After(timeout):
|
||||
t.Fatalf("timeout waiting for health check event %d/%d", i+1, expectedEvents)
|
||||
}
|
||||
}
|
||||
|
||||
lb.Lock()
|
||||
defer lb.Unlock()
|
||||
// Wait for the health checker goroutine to exit before making assertions.
|
||||
wg.Wait()
|
||||
|
||||
assert.Equal(t, test.expNumRemovedServers, lb.numRemovedServers, "removed servers")
|
||||
assert.Equal(t, test.expNumUpsertedServers, lb.numUpsertedServers, "upserted servers")
|
||||
lb.RLock()
|
||||
removedServers := lb.numRemovedServers
|
||||
upsertedServers := lb.numUpsertedServers
|
||||
lb.RUnlock()
|
||||
|
||||
assert.Equal(t, test.expNumRemovedServers, removedServers, "removed servers")
|
||||
assert.Equal(t, test.expNumUpsertedServers, upsertedServers, "upserted servers")
|
||||
assert.InDelta(t, test.expGaugeValue, gauge.GaugeValue, delta, "ServerUp Gauge")
|
||||
assert.Equal(t, []string{"service", "foobar", "url", targetURL.String()}, gauge.LastLabelValues)
|
||||
assert.Equal(t, map[string]string{targetURL.String(): test.targetStatus}, serviceInfo.GetAllStatus())
|
||||
|
||||
@@ -168,14 +168,29 @@ type testLoadBalancer struct {
|
||||
|
||||
numRemovedServers int
|
||||
numUpsertedServers int
|
||||
|
||||
// eventCh is used to signal when a status change occurs, allowing tests
|
||||
// to synchronize with health check events deterministically.
|
||||
eventCh chan struct{}
|
||||
}
|
||||
|
||||
func (lb *testLoadBalancer) SetStatus(ctx context.Context, childName string, up bool) {
|
||||
lb.Lock()
|
||||
if up {
|
||||
lb.numUpsertedServers++
|
||||
} else {
|
||||
lb.numRemovedServers++
|
||||
}
|
||||
lb.Unlock()
|
||||
|
||||
// Signal the event if a listener is registered.
|
||||
if lb.eventCh != nil {
|
||||
select {
|
||||
case lb.eventCh <- struct{}{}:
|
||||
default:
|
||||
// Don't block if channel is full or no listener.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type MetricsMock struct {
|
||||
|
||||
@@ -10,8 +10,9 @@ import (
|
||||
|
||||
// connectCert holds our certificates as a client of the Consul Connect protocol.
|
||||
type connectCert struct {
|
||||
root []string
|
||||
leaf keyPair
|
||||
trustDomain string
|
||||
root []string
|
||||
leaf keyPair
|
||||
}
|
||||
|
||||
func (c *connectCert) getRoot() []types.FileOrContent {
|
||||
@@ -52,7 +53,8 @@ func (c *connectCert) equals(other *connectCert) bool {
|
||||
}
|
||||
|
||||
func (c *connectCert) serversTransport(item itemData) *dynamic.ServersTransport {
|
||||
spiffeID := fmt.Sprintf("spiffe:///ns/%s/dc/%s/svc/%s",
|
||||
spiffeID := fmt.Sprintf("spiffe://%s/ns/%s/dc/%s/svc/%s",
|
||||
c.trustDomain,
|
||||
item.Namespace,
|
||||
item.Datacenter,
|
||||
item.Name,
|
||||
@@ -72,7 +74,8 @@ func (c *connectCert) serversTransport(item itemData) *dynamic.ServersTransport
|
||||
}
|
||||
|
||||
func (c *connectCert) tcpServersTransport(item itemData) *dynamic.TCPServersTransport {
|
||||
spiffeID := fmt.Sprintf("spiffe:///ns/%s/dc/%s/svc/%s",
|
||||
spiffeID := fmt.Sprintf("spiffe://%s/ns/%s/dc/%s/svc/%s",
|
||||
c.trustDomain,
|
||||
item.Namespace,
|
||||
item.Datacenter,
|
||||
item.Name,
|
||||
|
||||
@@ -465,7 +465,7 @@ func (p *Provider) watchConnectTLS(ctx context.Context) error {
|
||||
}
|
||||
leafWatcher.HybridHandler = leafWatcherHandler(ctx, leafChan)
|
||||
|
||||
rootsChan := make(chan []string)
|
||||
rootsChan := make(chan caRootList)
|
||||
rootsWatcher, err := watch.Parse(map[string]any{
|
||||
"type": "connect_roots",
|
||||
})
|
||||
@@ -497,9 +497,9 @@ func (p *Provider) watchConnectTLS(ctx context.Context) error {
|
||||
}()
|
||||
|
||||
var (
|
||||
certInfo *connectCert
|
||||
leafCerts keyPair
|
||||
rootCerts []string
|
||||
certInfo *connectCert
|
||||
leafCert keyPair
|
||||
caRoots caRootList
|
||||
)
|
||||
|
||||
for {
|
||||
@@ -510,13 +510,14 @@ func (p *Provider) watchConnectTLS(ctx context.Context) error {
|
||||
case err := <-errChan:
|
||||
return fmt.Errorf("leaf or roots watcher terminated: %w", err)
|
||||
|
||||
case rootCerts = <-rootsChan:
|
||||
case leafCerts = <-leafChan:
|
||||
case caRoots = <-rootsChan:
|
||||
case leafCert = <-leafChan:
|
||||
}
|
||||
|
||||
newCertInfo := &connectCert{
|
||||
root: rootCerts,
|
||||
leaf: leafCerts,
|
||||
trustDomain: caRoots.trustDomain,
|
||||
root: caRoots.roots,
|
||||
leaf: leafCert,
|
||||
}
|
||||
if newCertInfo.isReady() && !newCertInfo.equals(certInfo) {
|
||||
log.Ctx(ctx).Debug().Msgf("Updating connect certs for service %s", p.ServiceName)
|
||||
@@ -546,7 +547,12 @@ func (p *Provider) includesHealthStatus(status string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func rootsWatchHandler(ctx context.Context, dest chan<- []string) func(watch.BlockingParamVal, any) {
|
||||
type caRootList struct {
|
||||
trustDomain string
|
||||
roots []string
|
||||
}
|
||||
|
||||
func rootsWatchHandler(ctx context.Context, dest chan<- caRootList) func(watch.BlockingParamVal, any) {
|
||||
return func(_ watch.BlockingParamVal, raw any) {
|
||||
if raw == nil {
|
||||
log.Ctx(ctx).Error().Msg("Root certificate watcher called with nil")
|
||||
@@ -566,7 +572,7 @@ func rootsWatchHandler(ctx context.Context, dest chan<- []string) func(watch.Blo
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case dest <- roots:
|
||||
case dest <- caRootList{trustDomain: v.TrustDomain, roots: roots}:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
desc: "Empty, no IngressClass",
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingresses/01-ingress-with-basicauth.yml",
|
||||
"ingresses/ingress-with-basicauth.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -57,7 +57,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/11-ingress-with-custom-headers.yml",
|
||||
"ingresses/ingress-with-custom-headers.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -116,7 +116,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
{
|
||||
desc: "No annotation",
|
||||
paths: []string{
|
||||
"ingresses/00-ingress-with-no-annotation.yml",
|
||||
"ingresses/ingress-with-no-annotation.yml",
|
||||
"ingressclasses.yml",
|
||||
"services.yml",
|
||||
"secrets.yml",
|
||||
@@ -196,7 +196,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/01-ingress-with-basicauth.yml",
|
||||
"ingresses/ingress-with-basicauth.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -260,7 +260,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/02-ingress-with-forwardauth.yml",
|
||||
"ingresses/ingress-with-forwardauth.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -324,7 +324,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
"services.yml",
|
||||
"secrets.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/03-ingress-with-ssl-redirect.yml",
|
||||
"ingresses/ingress-with-ssl-redirect.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -472,7 +472,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
"services.yml",
|
||||
"secrets.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/04-ingress-with-ssl-passthrough.yml",
|
||||
"ingresses/ingress-with-ssl-passthrough.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -518,7 +518,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
"services.yml",
|
||||
"secrets.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/06-ingress-with-sticky.yml",
|
||||
"ingresses/ingress-with-sticky.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -585,7 +585,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
"services.yml",
|
||||
"secrets.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/07-ingress-with-proxy-ssl.yml",
|
||||
"ingresses/ingress-with-proxy-ssl.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -642,7 +642,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/08-ingress-with-cors.yml",
|
||||
"ingresses/ingress-with-cors.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -708,7 +708,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/09-ingress-with-service-upstream.yml",
|
||||
"ingresses/ingress-with-service-upstream.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -759,7 +759,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/10-ingress-with-upstream-vhost.yml",
|
||||
"ingresses/ingress-with-upstream-vhost.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -820,7 +820,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/10-ingress-with-use-regex.yml",
|
||||
"ingresses/ingress-with-use-regex.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -874,7 +874,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/11-ingress-with-rewrite-target.yml",
|
||||
"ingresses/ingress-with-rewrite-target.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -936,7 +936,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/18-ingress-with-app-root.yml",
|
||||
"ingresses/ingress-with-app-root.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -998,7 +998,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/18-ingress-with-app-root-wrong.yml",
|
||||
"ingresses/ingress-with-app-root-wrong.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -1107,7 +1107,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/10-ingress-with-whitelist-single-ip.yml",
|
||||
"ingresses/ingress-with-whitelist-single-ip.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -1168,7 +1168,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/11-ingress-with-whitelist-single-cidr.yml",
|
||||
"ingresses/ingress-with-whitelist-single-cidr.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -1229,7 +1229,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/12-ingress-with-whitelist-multiple-ip-and-cidr.yml",
|
||||
"ingresses/ingress-with-whitelist-multiple-ip-and-cidr.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -1290,7 +1290,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/13-ingress-with-whitelist-empty.yml",
|
||||
"ingresses/ingress-with-whitelist-empty.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -1345,7 +1345,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/14-ingress-with-permanent-redirect.yml",
|
||||
"ingresses/ingress-with-permanent-redirect.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -1408,7 +1408,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/14-ingress-with-permanent-redirect-code-wrong-code.yml",
|
||||
"ingresses/ingress-with-permanent-redirect-code-wrong-code.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -1471,7 +1471,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/14-ingress-with-permanent-redirect-code-correct-code.yml",
|
||||
"ingresses/ingress-with-permanent-redirect-code-correct-code.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -1534,7 +1534,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/16-ingress-with-temporal-and-permanent-redirect.yml",
|
||||
"ingresses/ingress-with-temporal-and-permanent-redirect.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -1597,7 +1597,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/15-ingress-with-temporal-redirect.yml",
|
||||
"ingresses/ingress-with-temporal-redirect.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -1660,7 +1660,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/17-ingress-with-temporal-redirect-code-wrong-code.yml",
|
||||
"ingresses/ingress-with-temporal-redirect-code-wrong-code.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -1723,7 +1723,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/17-ingress-with-temporal-redirect-code-correct-code.yml",
|
||||
"ingresses/ingress-with-temporal-redirect-code-correct-code.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -1786,7 +1786,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
paths: []string{
|
||||
"services.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/19-ingress-with-proxy-timeout.yml",
|
||||
"ingresses/ingress-with-proxy-timeout.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -1837,7 +1837,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
"services.yml",
|
||||
"secrets.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/20-ingress-with-auth-tls-secret.yml",
|
||||
"ingresses/ingress-with-auth-tls-secret.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
@@ -1941,7 +1941,7 @@ func TestLoadIngresses(t *testing.T) {
|
||||
"services.yml",
|
||||
"secrets.yml",
|
||||
"ingressclasses.yml",
|
||||
"ingresses/21-ingress-with-auth-tls-verify-client.yml",
|
||||
"ingresses/ingress-with-auth-tls-verify-client.yml",
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
|
||||
@@ -2357,7 +2357,7 @@ func TestIngressEndpointPublishedService(t *testing.T) {
|
||||
ingress, err := kubeClient.NetworkingV1().Ingresses(metav1.NamespaceDefault).Get(t.Context(), "foo", metav1.GetOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, test.expected, ingress.Status.LoadBalancer.Ingress)
|
||||
assert.ElementsMatch(t, test.expected, ingress.Status.LoadBalancer.Ingress)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -800,53 +800,47 @@ func TestScoreCalculationWithWeights(t *testing.T) {
|
||||
}
|
||||
|
||||
// TestScoreCalculationWithInflight tests that inflight requests are considered in score calculation.
|
||||
// Uses direct manipulation of response times and nextServer() for deterministic results.
|
||||
func TestScoreCalculationWithInflight(t *testing.T) {
|
||||
balancer := New(nil, false)
|
||||
|
||||
// We'll manually control the inflight counters to test the score calculation.
|
||||
// Add two servers with same response time.
|
||||
// Add two servers with dummy handlers (we test selection logic directly).
|
||||
balancer.Add("server1", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
rw.Header().Set("server", "server1")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
httptrace.ContextClientTrace(req.Context()).GotFirstResponseByte()
|
||||
}), pointer(1), false)
|
||||
|
||||
balancer.Add("server2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
rw.Header().Set("server", "server2")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
httptrace.ContextClientTrace(req.Context()).GotFirstResponseByte()
|
||||
}), pointer(1), false)
|
||||
|
||||
// Build up response time averages for both servers.
|
||||
for range 2 {
|
||||
recorder := httptest.NewRecorder()
|
||||
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
|
||||
// Pre-fill response times directly (10ms average for both servers).
|
||||
for _, h := range balancer.handlers {
|
||||
for i := range sampleSize {
|
||||
h.responseTimes[i] = 10.0
|
||||
}
|
||||
h.responseTimeSum = 10.0 * sampleSize
|
||||
h.sampleCount = sampleSize
|
||||
}
|
||||
|
||||
// Now manually set server1 to have high inflight count.
|
||||
// Set server1 to have high inflight count.
|
||||
balancer.handlers[0].inflightCount.Store(5)
|
||||
|
||||
// Make requests - they should prefer server2 because:
|
||||
// Score for server1: (10 × (1 + 5)) / 1 = 60
|
||||
// Score for server2: (10 × (1 + 0)) / 1 = 10
|
||||
recorder := &responseRecorder{save: map[string]int{}}
|
||||
counts := map[string]int{"server1": 0, "server2": 0}
|
||||
for range 5 {
|
||||
// Manually increment to simulate the ServeHTTP behavior.
|
||||
server, _ := balancer.nextServer()
|
||||
server, err := balancer.nextServer()
|
||||
assert.NoError(t, err)
|
||||
counts[server.name]++
|
||||
// Simulate ServeHTTP incrementing inflight count.
|
||||
server.inflightCount.Add(1)
|
||||
|
||||
if server.name == "server1" {
|
||||
recorder.save["server1"]++
|
||||
} else {
|
||||
recorder.save["server2"]++
|
||||
}
|
||||
}
|
||||
|
||||
// Server2 should get all requests
|
||||
assert.Equal(t, 5, recorder.save["server2"])
|
||||
assert.Zero(t, recorder.save["server1"])
|
||||
// Server2 should get all requests since its score (10-50) is always less than server1's (60).
|
||||
// After each selection, server2's inflight grows: scores are 10, 20, 30, 40, 50 vs 60.
|
||||
assert.Equal(t, 5, counts["server2"])
|
||||
assert.Zero(t, counts["server1"])
|
||||
}
|
||||
|
||||
// TestScoreCalculationColdStart tests that new servers (0ms avg) get fair selection
|
||||
@@ -930,28 +924,20 @@ func TestFastServerGetsMoreTraffic(t *testing.T) {
|
||||
// TestTrafficShiftsWhenPerformanceDegrades verifies that the load balancer
|
||||
// adapts to changing server performance by shifting traffic away from degraded servers.
|
||||
// This tests the adaptive behavior - the core value proposition of least-time load balancing.
|
||||
// Uses nextServer() directly to avoid timing variations and ensure deterministic results.
|
||||
func TestTrafficShiftsWhenPerformanceDegrades(t *testing.T) {
|
||||
balancer := New(nil, false)
|
||||
|
||||
// Use atomic to dynamically control server1's response time.
|
||||
server1Delay := atomic.Int64{}
|
||||
server1Delay.Store(5) // Start with 5ms
|
||||
|
||||
// Add two servers with dummy handlers (we'll test selection logic directly).
|
||||
balancer.Add("server1", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
time.Sleep(time.Duration(server1Delay.Load()) * time.Millisecond)
|
||||
rw.Header().Set("server", "server1")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
httptrace.ContextClientTrace(req.Context()).GotFirstResponseByte()
|
||||
}), pointer(1), false)
|
||||
|
||||
balancer.Add("server2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
time.Sleep(5 * time.Millisecond) // Static 5ms
|
||||
rw.Header().Set("server", "server2")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
httptrace.ContextClientTrace(req.Context()).GotFirstResponseByte()
|
||||
}), pointer(1), false)
|
||||
|
||||
// Pre-fill ring buffers to eliminate cold start effects and ensure deterministic equal performance state.
|
||||
// Pre-fill ring buffers with equal response times (5ms each).
|
||||
for _, h := range balancer.handlers {
|
||||
for i := range sampleSize {
|
||||
h.responseTimes[i] = 5.0
|
||||
@@ -960,35 +946,43 @@ func TestTrafficShiftsWhenPerformanceDegrades(t *testing.T) {
|
||||
h.sampleCount = sampleSize
|
||||
}
|
||||
|
||||
// Phase 1: Both servers perform equally (5ms each).
|
||||
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
|
||||
// Phase 1: Both servers have equal performance (5ms each).
|
||||
// With WRR tie-breaking, traffic should be distributed evenly.
|
||||
counts := map[string]int{"server1": 0, "server2": 0}
|
||||
for range 50 {
|
||||
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
|
||||
server, err := balancer.nextServer()
|
||||
assert.NoError(t, err)
|
||||
counts[server.name]++
|
||||
}
|
||||
|
||||
// With equal performance and pre-filled buffers, distribution should be balanced via WRR tie-breaking.
|
||||
total := recorder.save["server1"] + recorder.save["server2"]
|
||||
total := counts["server1"] + counts["server2"]
|
||||
assert.Equal(t, 50, total)
|
||||
assert.InDelta(t, 25, recorder.save["server1"], 10) // 25 ± 10 requests
|
||||
assert.InDelta(t, 25, recorder.save["server2"], 10) // 25 ± 10 requests
|
||||
assert.InDelta(t, 25, counts["server1"], 1) // Deterministic WRR: 25 ± 1
|
||||
assert.InDelta(t, 25, counts["server2"], 1) // Deterministic WRR: 25 ± 1
|
||||
|
||||
// Phase 2: server1 degrades (simulating GC pause, CPU spike, or network latency).
|
||||
server1Delay.Store(50) // Now 50ms (10x slower) - dramatic degradation for reliable detection
|
||||
|
||||
// Make more requests to shift the moving average.
|
||||
// Ring buffer has 100 samples, need significant new samples to shift average.
|
||||
// server1's average will climb from ~5ms toward 50ms.
|
||||
recorder2 := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
|
||||
for range 60 {
|
||||
balancer.ServeHTTP(recorder2, httptest.NewRequest(http.MethodGet, "/", nil))
|
||||
// Phase 2: Simulate server1 degradation by directly updating its ring buffer.
|
||||
// Set server1's average response time to 50ms (10x slower than server2's 5ms).
|
||||
for _, h := range balancer.handlers {
|
||||
if h.name == "server1" {
|
||||
for i := range sampleSize {
|
||||
h.responseTimes[i] = 50.0
|
||||
}
|
||||
h.responseTimeSum = 50.0 * sampleSize
|
||||
}
|
||||
}
|
||||
|
||||
// server2 should get significantly more traffic
|
||||
// With 10x performance difference, server2 should dominate.
|
||||
total2 := recorder2.save["server1"] + recorder2.save["server2"]
|
||||
// With 10x performance difference, server2 should get significantly more traffic.
|
||||
counts2 := map[string]int{"server1": 0, "server2": 0}
|
||||
for range 60 {
|
||||
server, err := balancer.nextServer()
|
||||
assert.NoError(t, err)
|
||||
counts2[server.name]++
|
||||
}
|
||||
|
||||
total2 := counts2["server1"] + counts2["server2"]
|
||||
assert.Equal(t, 60, total2)
|
||||
assert.Greater(t, recorder2.save["server2"], 35) // At least ~60% (35/60)
|
||||
assert.Less(t, recorder2.save["server1"], 25) // At most ~40% (25/60)
|
||||
assert.Greater(t, counts2["server2"], 35) // At least ~60% (35/60)
|
||||
assert.Less(t, counts2["server1"], 25) // At most ~40% (25/60)
|
||||
}
|
||||
|
||||
// TestMultipleServersWithSameScore tests WRR tie-breaking when multiple servers have identical scores.
|
||||
|
||||
+6
-25
@@ -5,7 +5,6 @@ import (
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
@@ -160,37 +159,19 @@ func VerifyPeerCertificate(uri string, cfg *tls.Config, rawCerts [][]byte) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// verifyServerCertMatchesURI is used on tls connections dialed to a server
|
||||
// to ensure that the certificate it presented has the correct URI.
|
||||
// verifyServerCertMatchesURI verifies that the given certificate contains the specified URI in its SANs.
|
||||
func verifyServerCertMatchesURI(uri string, cert *x509.Certificate) error {
|
||||
if cert == nil {
|
||||
return errors.New("peer certificate mismatch: no peer certificate presented")
|
||||
}
|
||||
|
||||
// Our certs will only ever have a single URI for now so only check that
|
||||
if len(cert.URIs) < 1 {
|
||||
return errors.New("peer certificate mismatch: peer certificate invalid")
|
||||
for _, certURI := range cert.URIs {
|
||||
if strings.EqualFold(certURI.String(), uri) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
gotURI := cert.URIs[0]
|
||||
|
||||
// Override the hostname since we rely on x509 constraints to limit ability to spoof the trust domain if needed
|
||||
// (i.e. because a root is shared with other PKI or Consul clusters).
|
||||
// This allows for seamless migrations between trust domains.
|
||||
|
||||
expectURI := &url.URL{}
|
||||
id, err := url.Parse(uri)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%q is not a valid URI", uri)
|
||||
}
|
||||
*expectURI = *id
|
||||
expectURI.Host = gotURI.Host
|
||||
|
||||
if strings.EqualFold(gotURI.String(), expectURI.String()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("peer certificate mismatch got %s, want %s", gotURI, uri)
|
||||
return fmt.Errorf("peer certificate mismatch: no SAN URI in peer certificate matches %s", uri)
|
||||
}
|
||||
|
||||
// verifyChain performs standard TLS verification without enforcing remote hostname matching.
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
package tls
|
||||
|
||||
import (
|
||||
"crypto/x509"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func Test_verifyServerCertMatchesURI(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
uri string
|
||||
cert *x509.Certificate
|
||||
expErr require.ErrorAssertionFunc
|
||||
}{
|
||||
{
|
||||
desc: "returns error when certificate is nil",
|
||||
uri: "spiffe://foo.com",
|
||||
expErr: require.Error,
|
||||
},
|
||||
{
|
||||
desc: "returns error when certificate has no URIs",
|
||||
uri: "spiffe://foo.com",
|
||||
cert: &x509.Certificate{URIs: nil},
|
||||
expErr: require.Error,
|
||||
},
|
||||
{
|
||||
desc: "returns error when no URI matches",
|
||||
uri: "spiffe://foo.com",
|
||||
cert: &x509.Certificate{URIs: []*url.URL{
|
||||
{Scheme: "spiffe", Host: "other.org"},
|
||||
}},
|
||||
expErr: require.Error,
|
||||
},
|
||||
{
|
||||
desc: "returns nil when URI matches",
|
||||
uri: "spiffe://foo.com",
|
||||
cert: &x509.Certificate{URIs: []*url.URL{
|
||||
{Scheme: "spiffe", Host: "foo.com"},
|
||||
}},
|
||||
expErr: require.NoError,
|
||||
},
|
||||
{
|
||||
desc: "returns nil when one of the URI matches",
|
||||
uri: "spiffe://foo.com",
|
||||
cert: &x509.Certificate{URIs: []*url.URL{
|
||||
{Scheme: "spiffe", Host: "example.org"},
|
||||
{Scheme: "spiffe", Host: "foo.com"},
|
||||
}},
|
||||
expErr: require.NoError,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
err := verifyServerCertMatchesURI(test.uri, test.cert)
|
||||
test.expErr(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user