Add configuration transformer mechanism to the ConfigurationWatcher

This commit is contained in:
Simon Delicata
2026-01-28 11:44:05 +01:00
committed by GitHub
parent 2c47d71666
commit f4f129a279
2 changed files with 111 additions and 18 deletions
+27 -18
View File
@@ -28,6 +28,8 @@ type ConfigurationWatcher struct {
requiredProvider string requiredProvider string
configurationListeners []func(dynamic.Configuration) configurationListeners []func(dynamic.Configuration)
configurationTransformers []func(context.Context, dynamic.Configurations) dynamic.Configurations
routinesPool *safe.Pool routinesPool *safe.Pool
} }
@@ -63,12 +65,14 @@ func (c *ConfigurationWatcher) Stop() {
// AddListener adds a new listener function used when new configuration is provided. // AddListener adds a new listener function used when new configuration is provided.
func (c *ConfigurationWatcher) AddListener(listener func(dynamic.Configuration)) { func (c *ConfigurationWatcher) AddListener(listener func(dynamic.Configuration)) {
if c.configurationListeners == nil {
c.configurationListeners = make([]func(dynamic.Configuration), 0)
}
c.configurationListeners = append(c.configurationListeners, listener) c.configurationListeners = append(c.configurationListeners, listener)
} }
// AddTransformer registers a function to modify configurations before they are applied.
func (c *ConfigurationWatcher) AddTransformer(transformer func(context.Context, dynamic.Configurations) dynamic.Configurations) {
c.configurationTransformers = append(c.configurationTransformers, transformer)
}
func (c *ConfigurationWatcher) startProviderAggregator() { func (c *ConfigurationWatcher) startProviderAggregator() {
log.Info().Msgf("Starting provider aggregator %T", c.providerAggregator) log.Info().Msgf("Starting provider aggregator %T", c.providerAggregator)
@@ -81,22 +85,24 @@ func (c *ConfigurationWatcher) startProviderAggregator() {
} }
// receiveConfigurations receives configuration changes from the providers. // receiveConfigurations receives configuration changes from the providers.
// The configuration message then gets passed along a series of check, notably // The configuration message then gets passed along a series of checks, notably
// to verify that, for a given provider, the configuration that was just received // to verify that, for a given provider, the configuration that was just received
// is at least different from the previously received one. // is at least different from the previously received one.
// The full set of configurations is then sent to the throttling goroutine, // The full set of configurations is then sent to applyConfigurations
// (throttleAndApplyConfigurations) via a RingChannel, which ensures that we can // via a channel in a non-blocking manner, ensuring the latest global state
// constantly send in a non-blocking way to the throttling goroutine the last // is always available for processing.
// global state we are aware of.
func (c *ConfigurationWatcher) receiveConfigurations(ctx context.Context) { func (c *ConfigurationWatcher) receiveConfigurations(ctx context.Context) {
newConfigurations := make(dynamic.Configurations) newConfigurations := make(dynamic.Configurations)
transformedConfigurations := make(dynamic.Configurations)
var output chan dynamic.Configurations var output chan dynamic.Configurations
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
// DeepCopy is necessary because newConfigurations gets modified later by the consumer of c.newConfigs // DeepCopy is necessary because transformedConfigurations gets modified later by the consumer of c.newConfigs.
case output <- newConfigurations.DeepCopy(): case output <- transformedConfigurations.DeepCopy():
output = nil output = nil
default: default:
@@ -123,28 +129,31 @@ func (c *ConfigurationWatcher) receiveConfigurations(ctx context.Context) {
logConfiguration(logger, configMsg) logConfiguration(logger, configMsg)
if reflect.DeepEqual(newConfigurations[configMsg.ProviderName], configMsg.Configuration) { if reflect.DeepEqual(newConfigurations[configMsg.ProviderName], configMsg.Configuration) {
// no change, do nothing // no change, do nothing.
logger.Debug().Msg("Skipping unchanged configuration") logger.Debug().Msg("Skipping unchanged configuration")
continue continue
} }
newConfigurations[configMsg.ProviderName] = configMsg.Configuration.DeepCopy() newConfigurations[configMsg.ProviderName] = configMsg.Configuration.DeepCopy()
transformedConfigurations = newConfigurations
for _, transform := range c.configurationTransformers {
transformedConfigurations = transform(logger.WithContext(ctx), transformedConfigurations.DeepCopy())
}
output = c.newConfigs output = c.newConfigs
// DeepCopy is necessary because newConfigurations gets modified later by the consumer of c.newConfigs // DeepCopy is necessary because newConfigurations gets modified later by the consumer of c.newConfigs.
case output <- newConfigurations.DeepCopy(): case output <- transformedConfigurations.DeepCopy():
output = nil output = nil
} }
} }
} }
} }
// applyConfigurations blocks on a RingChannel that receives the new // applyConfigurations receives the full set of configurations from
// set of configurations that is compiled and sent by receiveConfigurations as soon // receiveConfigurations and applies them if they differ from the previous set.
// as a provider change occurs. If the new set is different from the previous set // It waits for the required provider's configuration before applying any configs.
// that had been applied, the new set is applied, and we sleep for a while before
// listening on the channel again.
func (c *ConfigurationWatcher) applyConfigurations(ctx context.Context) { func (c *ConfigurationWatcher) applyConfigurations(ctx context.Context) {
var lastConfigurations dynamic.Configurations var lastConfigurations dynamic.Configurations
for { for {
+84
View File
@@ -908,3 +908,87 @@ func TestPublishConfigUpdatedByConfigWatcherListener(t *testing.T) {
assert.Equal(t, 1, publishedConfigCount) assert.Equal(t, 1, publishedConfigCount)
} }
func TestConfigurationWatcher_MultipleTransformers(t *testing.T) {
routinesPool := safe.NewPool(t.Context())
t.Cleanup(routinesPool.Stop)
pvd := &mockProvider{
messages: []dynamic.Message{{
ProviderName: "mock",
Configuration: &dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(
th.WithRouter("original",
th.WithEntryPoints("e"),
th.WithServiceName("scv"))),
),
},
}},
}
watcher := NewConfigurationWatcher(routinesPool, pvd, []string{}, "")
var callOrder []string
var callCount1, callCount2 int
watcher.AddTransformer(func(_ context.Context, configs dynamic.Configurations) dynamic.Configurations {
callCount1++
callOrder = append(callOrder, "transformer1")
for _, config := range configs {
if config != nil && config.HTTP != nil {
config.HTTP.Routers["from-transformer1"] = &dynamic.Router{
EntryPoints: []string{"e"},
Service: "svc1",
}
}
}
return configs
})
watcher.AddTransformer(func(_ context.Context, configs dynamic.Configurations) dynamic.Configurations {
callCount2++
callOrder = append(callOrder, "transformer2")
// Verify that transformer1's changes are visible.
for _, config := range configs {
if config != nil && config.HTTP != nil {
assert.Contains(t, config.HTTP.Routers, "from-transformer1")
config.HTTP.Routers["from-transformer2"] = &dynamic.Router{
EntryPoints: []string{"e"},
Service: "svc2",
}
}
}
return configs
})
run := make(chan struct{})
watcher.AddListener(func(conf dynamic.Configuration) {
assert.NotNil(t, conf.HTTP)
assert.Contains(t, conf.HTTP.Routers, "original@mock")
assert.Contains(t, conf.HTTP.Routers, "from-transformer1@mock")
assert.Contains(t, conf.HTTP.Routers, "from-transformer2@mock")
close(run)
})
watcher.Start()
t.Cleanup(watcher.Stop)
select {
case <-run:
case <-time.After(5 * time.Second):
t.Fatal("Timeout waiting for configuration")
}
assert.Equal(t, []string{"transformer1", "transformer2"}, callOrder)
assert.Equal(t, 1, callCount1)
assert.Equal(t, 1, callCount2)
}