Fix flakiness unit tests

This commit is contained in:
Michael
2026-01-29 11:42:06 +01:00
committed by GitHub
parent 72ba481bbc
commit f32d58c577
5 changed files with 153 additions and 94 deletions
+56 -23
View File
@@ -466,42 +466,52 @@ func TestServiceTCPHealthChecker_Launch(t *testing.T) {
},
}
lb := &testLoadBalancer{}
// Create load balancer with event channel for synchronization.
lb := &testLoadBalancer{
RWMutex: &sync.RWMutex{},
eventCh: make(chan struct{}, len(test.server.StatusSequence)+5),
}
serviceInfo := &truntime.TCPServiceInfo{}
service := NewServiceTCPHealthChecker(ctx, test.config, lb, serviceInfo, targets, "serviceName")
go service.Launch(ctx)
// How much time to wait for the health check to actually complete.
deadline := time.Now().Add(200 * time.Millisecond)
// TLS handshake can take much longer.
// Timeout for each event - TLS handshake can take longer.
eventTimeout := 500 * time.Millisecond
if test.server.TLS {
deadline = time.Now().Add(1000 * time.Millisecond)
eventTimeout = 2 * time.Second
}
// Wait for all health checks to complete deterministically
// Wait for health check events using channel synchronization.
// Iterate over StatusSequence to release each connection via Next().
for i := range test.server.StatusSequence {
test.server.Next()
initialUpserted := lb.numUpsertedServers
initialRemoved := lb.numRemovedServers
for time.Now().Before(deadline) {
time.Sleep(5 * time.Millisecond)
if lb.numUpsertedServers > initialUpserted || lb.numRemovedServers > initialRemoved {
// Stop the health checker immediately after the last expected sequence completes
// to prevent extra health checks from firing and modifying the counters.
select {
case <-lb.eventCh:
// Event received
// On the last iteration, stop the health checker immediately
// to prevent extra checks from modifying the counters.
if i == len(test.server.StatusSequence)-1 {
test.server.Close()
cancel()
}
break
}
case <-time.After(eventTimeout):
t.Fatalf("timeout waiting for health check event %d/%d", i+1, len(test.server.StatusSequence))
}
}
assert.Equal(t, test.expNumRemovedServers, lb.numRemovedServers, "removed servers")
assert.Equal(t, test.expNumUpsertedServers, lb.numUpsertedServers, "upserted servers")
// Small delay to let goroutines clean up.
time.Sleep(10 * time.Millisecond)
lb.RLock()
removedServers := lb.numRemovedServers
upsertedServers := lb.numUpsertedServers
lb.RUnlock()
assert.Equal(t, test.expNumRemovedServers, removedServers, "removed servers")
assert.Equal(t, test.expNumUpsertedServers, upsertedServers, "upserted servers")
assert.Equal(t, map[string]string{test.server.Addr.String(): test.targetStatus}, serviceInfo.GetAllStatus())
})
}
@@ -597,6 +607,8 @@ type sequencedTCPServer struct {
StatusSequence []tcpMockSequence
TLS bool
release chan struct{}
mu sync.Mutex
listener net.Listener
}
func newTCPServer(t *testing.T, tlsEnabled bool, statusSequence ...tcpMockSequence) *sequencedTCPServer {
@@ -624,17 +636,28 @@ func (s *sequencedTCPServer) Next() {
s.release <- struct{}{}
}
func (s *sequencedTCPServer) Close() {
s.mu.Lock()
defer s.mu.Unlock()
if s.listener != nil {
s.listener.Close()
s.listener = nil
}
}
func (s *sequencedTCPServer) Start(t *testing.T) {
t.Helper()
go func() {
var listener net.Listener
for _, seq := range s.StatusSequence {
<-s.release
if listener != nil {
listener.Close()
s.mu.Lock()
if s.listener != nil {
s.listener.Close()
s.listener = nil
}
s.mu.Unlock()
if !seq.accept {
continue
@@ -643,7 +666,7 @@ func (s *sequencedTCPServer) Start(t *testing.T) {
lis, err := net.ListenTCP("tcp", s.Addr)
require.NoError(t, err)
listener = lis
var listener net.Listener = lis
if s.TLS {
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
@@ -670,8 +693,18 @@ func (s *sequencedTCPServer) Start(t *testing.T) {
)
}
s.mu.Lock()
s.listener = listener
s.mu.Unlock()
conn, err := listener.Accept()
if err != nil {
// Listener was closed during shutdown - this is expected behavior.
if strings.Contains(err.Error(), "use of closed network connection") {
return
}
require.NoError(t, err)
}
t.Cleanup(func() {
_ = conn.Close()
})
+25 -8
View File
@@ -418,7 +418,12 @@ func TestServiceHealthChecker_Launch(t *testing.T) {
targetURL, timeout := test.server.Start(t, cancel)
lb := &testLoadBalancer{RWMutex: &sync.RWMutex{}}
// Create load balancer with event channel for synchronization.
expectedEvents := test.expNumRemovedServers + test.expNumUpsertedServers
lb := &testLoadBalancer{
RWMutex: &sync.RWMutex{},
eventCh: make(chan struct{}, expectedEvents+5),
}
config := &dynamic.ServerHealthCheck{
Mode: test.mode,
@@ -441,18 +446,30 @@ func TestServiceHealthChecker_Launch(t *testing.T) {
wg.Done()
}()
// Wait for expected health check events using channel synchronization.
for i := range expectedEvents {
select {
case <-lb.eventCh:
// Event received.
// On the last event, cancel to prevent extra health checks.
if i == expectedEvents-1 {
cancel()
}
case <-time.After(timeout):
t.Fatal("test did not complete in time")
case <-ctx.Done():
wg.Wait()
t.Fatalf("timeout waiting for health check event %d/%d", i+1, expectedEvents)
}
}
lb.Lock()
defer lb.Unlock()
// Wait for the health checker goroutine to exit before making assertions.
wg.Wait()
assert.Equal(t, test.expNumRemovedServers, lb.numRemovedServers, "removed servers")
assert.Equal(t, test.expNumUpsertedServers, lb.numUpsertedServers, "upserted servers")
lb.RLock()
removedServers := lb.numRemovedServers
upsertedServers := lb.numUpsertedServers
lb.RUnlock()
assert.Equal(t, test.expNumRemovedServers, removedServers, "removed servers")
assert.Equal(t, test.expNumUpsertedServers, upsertedServers, "upserted servers")
assert.InDelta(t, test.expGaugeValue, gauge.GaugeValue, delta, "ServerUp Gauge")
assert.Equal(t, []string{"service", "foobar", "url", targetURL.String()}, gauge.LastLabelValues)
assert.Equal(t, map[string]string{targetURL.String(): test.targetStatus}, serviceInfo.GetAllStatus())
+15
View File
@@ -168,14 +168,29 @@ type testLoadBalancer struct {
numRemovedServers int
numUpsertedServers int
// eventCh is used to signal when a status change occurs, allowing tests
// to synchronize with health check events deterministically.
eventCh chan struct{}
}
func (lb *testLoadBalancer) SetStatus(ctx context.Context, childName string, up bool) {
lb.Lock()
if up {
lb.numUpsertedServers++
} else {
lb.numRemovedServers++
}
lb.Unlock()
// Signal the event if a listener is registered.
if lb.eventCh != nil {
select {
case lb.eventCh <- struct{}{}:
default:
// Don't block if channel is full or no listener.
}
}
}
type MetricsMock struct {
@@ -2357,7 +2357,7 @@ func TestIngressEndpointPublishedService(t *testing.T) {
ingress, err := kubeClient.NetworkingV1().Ingresses(metav1.NamespaceDefault).Get(t.Context(), "foo", metav1.GetOptions{})
require.NoError(t, err)
assert.Equal(t, test.expected, ingress.Status.LoadBalancer.Ingress)
assert.ElementsMatch(t, test.expected, ingress.Status.LoadBalancer.Ingress)
})
}
}
@@ -800,53 +800,47 @@ func TestScoreCalculationWithWeights(t *testing.T) {
}
// TestScoreCalculationWithInflight tests that inflight requests are considered in score calculation.
// Uses direct manipulation of response times and nextServer() for deterministic results.
func TestScoreCalculationWithInflight(t *testing.T) {
balancer := New(nil, false)
// We'll manually control the inflight counters to test the score calculation.
// Add two servers with same response time.
// Add two servers with dummy handlers (we test selection logic directly).
balancer.Add("server1", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
time.Sleep(10 * time.Millisecond)
rw.Header().Set("server", "server1")
rw.WriteHeader(http.StatusOK)
httptrace.ContextClientTrace(req.Context()).GotFirstResponseByte()
}), pointer(1), false)
balancer.Add("server2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
time.Sleep(10 * time.Millisecond)
rw.Header().Set("server", "server2")
rw.WriteHeader(http.StatusOK)
httptrace.ContextClientTrace(req.Context()).GotFirstResponseByte()
}), pointer(1), false)
// Build up response time averages for both servers.
for range 2 {
recorder := httptest.NewRecorder()
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
// Pre-fill response times directly (10ms average for both servers).
for _, h := range balancer.handlers {
for i := range sampleSize {
h.responseTimes[i] = 10.0
}
h.responseTimeSum = 10.0 * sampleSize
h.sampleCount = sampleSize
}
// Now manually set server1 to have high inflight count.
// Set server1 to have high inflight count.
balancer.handlers[0].inflightCount.Store(5)
// Make requests - they should prefer server2 because:
// Score for server1: (10 × (1 + 5)) / 1 = 60
// Score for server2: (10 × (1 + 0)) / 1 = 10
recorder := &responseRecorder{save: map[string]int{}}
counts := map[string]int{"server1": 0, "server2": 0}
for range 5 {
// Manually increment to simulate the ServeHTTP behavior.
server, _ := balancer.nextServer()
server, err := balancer.nextServer()
assert.NoError(t, err)
counts[server.name]++
// Simulate ServeHTTP incrementing inflight count.
server.inflightCount.Add(1)
if server.name == "server1" {
recorder.save["server1"]++
} else {
recorder.save["server2"]++
}
}
// Server2 should get all requests
assert.Equal(t, 5, recorder.save["server2"])
assert.Zero(t, recorder.save["server1"])
// Server2 should get all requests since its score (10-50) is always less than server1's (60).
// After each selection, server2's inflight grows: scores are 10, 20, 30, 40, 50 vs 60.
assert.Equal(t, 5, counts["server2"])
assert.Zero(t, counts["server1"])
}
// TestScoreCalculationColdStart tests that new servers (0ms avg) get fair selection
@@ -930,28 +924,20 @@ func TestFastServerGetsMoreTraffic(t *testing.T) {
// TestTrafficShiftsWhenPerformanceDegrades verifies that the load balancer
// adapts to changing server performance by shifting traffic away from degraded servers.
// This tests the adaptive behavior - the core value proposition of least-time load balancing.
// Uses nextServer() directly to avoid timing variations and ensure deterministic results.
func TestTrafficShiftsWhenPerformanceDegrades(t *testing.T) {
balancer := New(nil, false)
// Use atomic to dynamically control server1's response time.
server1Delay := atomic.Int64{}
server1Delay.Store(5) // Start with 5ms
// Add two servers with dummy handlers (we'll test selection logic directly).
balancer.Add("server1", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
time.Sleep(time.Duration(server1Delay.Load()) * time.Millisecond)
rw.Header().Set("server", "server1")
rw.WriteHeader(http.StatusOK)
httptrace.ContextClientTrace(req.Context()).GotFirstResponseByte()
}), pointer(1), false)
balancer.Add("server2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
time.Sleep(5 * time.Millisecond) // Static 5ms
rw.Header().Set("server", "server2")
rw.WriteHeader(http.StatusOK)
httptrace.ContextClientTrace(req.Context()).GotFirstResponseByte()
}), pointer(1), false)
// Pre-fill ring buffers to eliminate cold start effects and ensure deterministic equal performance state.
// Pre-fill ring buffers with equal response times (5ms each).
for _, h := range balancer.handlers {
for i := range sampleSize {
h.responseTimes[i] = 5.0
@@ -960,35 +946,43 @@ func TestTrafficShiftsWhenPerformanceDegrades(t *testing.T) {
h.sampleCount = sampleSize
}
// Phase 1: Both servers perform equally (5ms each).
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
// Phase 1: Both servers have equal performance (5ms each).
// With WRR tie-breaking, traffic should be distributed evenly.
counts := map[string]int{"server1": 0, "server2": 0}
for range 50 {
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
server, err := balancer.nextServer()
assert.NoError(t, err)
counts[server.name]++
}
// With equal performance and pre-filled buffers, distribution should be balanced via WRR tie-breaking.
total := recorder.save["server1"] + recorder.save["server2"]
total := counts["server1"] + counts["server2"]
assert.Equal(t, 50, total)
assert.InDelta(t, 25, recorder.save["server1"], 10) // 25 ± 10 requests
assert.InDelta(t, 25, recorder.save["server2"], 10) // 25 ± 10 requests
assert.InDelta(t, 25, counts["server1"], 1) // Deterministic WRR: 25 ± 1
assert.InDelta(t, 25, counts["server2"], 1) // Deterministic WRR: 25 ± 1
// Phase 2: server1 degrades (simulating GC pause, CPU spike, or network latency).
server1Delay.Store(50) // Now 50ms (10x slower) - dramatic degradation for reliable detection
// Make more requests to shift the moving average.
// Ring buffer has 100 samples, need significant new samples to shift average.
// server1's average will climb from ~5ms toward 50ms.
recorder2 := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
for range 60 {
balancer.ServeHTTP(recorder2, httptest.NewRequest(http.MethodGet, "/", nil))
// Phase 2: Simulate server1 degradation by directly updating its ring buffer.
// Set server1's average response time to 50ms (10x slower than server2's 5ms).
for _, h := range balancer.handlers {
if h.name == "server1" {
for i := range sampleSize {
h.responseTimes[i] = 50.0
}
h.responseTimeSum = 50.0 * sampleSize
}
}
// server2 should get significantly more traffic
// With 10x performance difference, server2 should dominate.
total2 := recorder2.save["server1"] + recorder2.save["server2"]
// With 10x performance difference, server2 should get significantly more traffic.
counts2 := map[string]int{"server1": 0, "server2": 0}
for range 60 {
server, err := balancer.nextServer()
assert.NoError(t, err)
counts2[server.name]++
}
total2 := counts2["server1"] + counts2["server2"]
assert.Equal(t, 60, total2)
assert.Greater(t, recorder2.save["server2"], 35) // At least ~60% (35/60)
assert.Less(t, recorder2.save["server1"], 25) // At most ~40% (25/60)
assert.Greater(t, counts2["server2"], 35) // At least ~60% (35/60)
assert.Less(t, counts2["server1"], 25) // At most ~40% (25/60)
}
// TestMultipleServersWithSameScore tests WRR tie-breaking when multiple servers have identical scores.