Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
* [FEATURE] Ingester: Add experimental active series queried metric. #7173
* [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203
* [ENHANCEMENT] HATracker: Add a local cache warmup on startup to prevent KV store operations. #7213
* [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191
* [ENHANCEMENT] Ingester: Optimize labels out-of-order (ooo) check by allowing the iteration to terminate immediately upon finding the first unsorted label. #7186
* [ENHANCEMENT] Distributor: Skip attaching `__unit__` and `__type__` labels when `-distributor.enable-type-and-unit-labels` is enabled, as these are appended from metadata. #7145
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3095,6 +3095,12 @@ ha_tracker:
# CLI flag: -distributor.ha-tracker.failover-timeout
[ha_tracker_failover_timeout: <duration> | default = 30s]

# If enabled, fetch all tracked keys on startup to populate the local cache.
# This reduces CAS operations for existing replicas but causes a spike in GET
# during initialization.
# CLI flag: -distributor.ha-tracker.enable-startup-sync
[enable_startup_sync: <boolean> | default = false]

# Backend storage to use for the ring. Please be aware that memberlist is not
# supported by the HA tracker since gossip propagation is too slow for HA
# purposes.
Expand Down
7 changes: 7 additions & 0 deletions pkg/distributor/distributor_ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ type HATrackerConfig struct {
// between the stored timestamp and the time we received a sample is
// more than this duration
FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"`
// EnableStartupSync controls whether to fetch all tracked keys from the KV store
// on startup to populate the local cache.
// Enabling this reduces CAS operations for existing replicas after startup,
// but causes a spike in GET requests during initialization.
EnableStartupSync bool `yaml:"enable_startup_sync"`

KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes."`
}
Expand All @@ -31,6 +36,7 @@ func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.UpdateTimeout, "distributor.ha-tracker.update-timeout", 15*time.Second, "Update the timestamp in the KV store for a given cluster/replica only after this amount of time has passed since the current stored timestamp.")
f.DurationVar(&cfg.UpdateTimeoutJitterMax, "distributor.ha-tracker.update-timeout-jitter-max", 5*time.Second, "Maximum jitter applied to the update timeout, in order to spread the HA heartbeats over time.")
f.DurationVar(&cfg.FailoverTimeout, "distributor.ha-tracker.failover-timeout", 30*time.Second, "If we don't receive any samples from the accepted replica for a cluster in this amount of time we will failover to the next replica we receive a sample from. This value must be greater than the update timeout")
f.BoolVar(&cfg.EnableStartupSync, "distributor.ha-tracker.enable-startup-sync", false, "If enabled, fetch all tracked keys on startup to populate the local cache. This reduces CAS operations for existing replicas but causes a spike in GET during initialization.")

// We want the ability to use different Consul instances for the ring and
// for HA cluster tracking. We also customize the default keys prefix, in
Expand All @@ -48,6 +54,7 @@ func (cfg *HATrackerConfig) ToHATrackerConfig() ha.HATrackerConfig {
haCfg.UpdateTimeoutJitterMax = cfg.UpdateTimeoutJitterMax
haCfg.FailoverTimeout = cfg.FailoverTimeout
haCfg.KVStore = cfg.KVStore
haCfg.EnableStartupSync = cfg.EnableStartupSync

return haCfg
}
87 changes: 86 additions & 1 deletion pkg/ha/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"maps"
"math/rand"
"slices"
"strings"
Expand Down Expand Up @@ -64,6 +65,11 @@ type HATrackerConfig struct {
// between the stored timestamp and the time we received a sample is
// more than this duration
FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"`
// EnableStartupSync controls whether to fetch all tracked keys from the KV store
// on startup to populate the local cache.
// Enabling this reduces CAS operations for existing replicas after startup,
// but causes a spike in GET requests during initialization.
EnableStartupSync bool `yaml:"enable_startup_sync"`

KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes."`
}
Expand All @@ -89,6 +95,7 @@ func (cfg *HATrackerConfig) RegisterFlagsWithPrefix(flagPrefix string, kvPrefix
f.DurationVar(&cfg.UpdateTimeout, finalFlagPrefix+"ha-tracker.update-timeout", 15*time.Second, "Update the timestamp in the KV store for a given cluster/replicaGroup only after this amount of time has passed since the current stored timestamp.")
f.DurationVar(&cfg.UpdateTimeoutJitterMax, finalFlagPrefix+"ha-tracker.update-timeout-jitter-max", 5*time.Second, "Maximum jitter applied to the update timeout, in order to spread the HA heartbeats over time.")
f.DurationVar(&cfg.FailoverTimeout, finalFlagPrefix+"ha-tracker.failover-timeout", 30*time.Second, "If we don't receive any data from the accepted replica for a cluster/replicaGroup in this amount of time we will failover to the next replica we receive a sample from. This value must be greater than the update timeout")
f.BoolVar(&cfg.EnableStartupSync, finalFlagPrefix+"ha-tracker.enable-startup-sync", false, "If enabled, fetch all tracked keys on startup to populate the local cache. This reduces CAS operations for existing replicas but causes a spike in GET during initialization.")

// We want the ability to use different Consul instances for the ring and
// for HA cluster tracking. We also customize the default keys prefix, in
Expand Down Expand Up @@ -222,10 +229,88 @@ func NewHATracker(cfg HATrackerConfig, limits HATrackerLimits, trackerStatusConf
t.client = client
}

t.Service = services.NewBasicService(nil, t.loop, nil)
t.Service = services.NewBasicService(t.syncKVStoreToLocalMap, t.loop, nil)
return t, nil
}

// syncKVStoreToLocalMap warms up the local cache by fetching all active entries from the KV store.
func (c *HATracker) syncKVStoreToLocalMap(ctx context.Context) error {
if !c.cfg.EnableHATracker {
return nil
}

if !c.cfg.EnableStartupSync {
return nil
}

start := time.Now()
level.Info(c.logger).Log("msg", "starting HA tracker cache warmup")

keys, err := c.client.List(ctx, "")
if err != nil {
level.Error(c.logger).Log("msg", "failed to list keys during HA tracker cache warmup", "err", err)
return err
}

if len(keys) == 0 {
level.Info(c.logger).Log("msg", "HA tracker cache warmup finished", "reason", "no keys found in KV store")
return nil
}

// create temporarily map
tempElected := make(map[string]ReplicaDesc, len(keys))
tempReplicaGroups := make(map[string]map[string]struct{})
successCount := 0

for _, key := range keys {
if ctx.Err() != nil {
return ctx.Err()
}

val, err := c.client.Get(ctx, key)
if err != nil {
level.Warn(c.logger).Log("msg", "failed to fetch key during cache warmup", "key", key, "err", err)
continue
}

desc, ok := val.(*ReplicaDesc)
if !ok || desc == nil || desc.DeletedAt > 0 {
continue
}

user, cluster, keyHasSeparator := strings.Cut(key, "/")
if !keyHasSeparator {
continue
}

tempElected[key] = *desc
if tempReplicaGroups[user] == nil {
tempReplicaGroups[user] = make(map[string]struct{})
}
tempReplicaGroups[user][cluster] = struct{}{}
successCount++
}

c.electedLock.Lock()

// Update local map
maps.Copy(c.elected, tempElected)
for user, clusters := range tempReplicaGroups {
if c.replicaGroups[user] == nil {
c.replicaGroups[user] = make(map[string]struct{})
}
for cluster := range clusters {
c.replicaGroups[user][cluster] = struct{}{}
}
}
c.electedLock.Unlock()

c.updateUserReplicaGroupCount()

level.Info(c.logger).Log("msg", "HA tracker cache warmup completed", "duration", time.Since(start), "synced keys", successCount)
return nil
}

// Follows pattern used by ring for WatchKey.
func (c *HATracker) loop(ctx context.Context) error {
if !c.cfg.EnableHATracker {
Expand Down
146 changes: 146 additions & 0 deletions pkg/ha/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,152 @@ func TestCheckReplicaCleanup(t *testing.T) {
))
}

func BenchmarkHATracker_syncKVStoreToLocalMap(b *testing.B) {
keyCounts := []int{100, 1000, 10000}

for _, count := range keyCounts {
b.Run(fmt.Sprintf("keys=%d", count), func(b *testing.B) {
ctx := context.Background()

codec := GetReplicaDescCodec()
kvStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil)
b.Cleanup(func() { assert.NoError(b, closer.Close()) })

mockKV := kv.PrefixClient(kvStore, "prefix")

for i := range count {
key := fmt.Sprintf("user-%d/cluster-%d", i%100, i)
desc := &ReplicaDesc{
Replica: fmt.Sprintf("replica-%d", i),
ReceivedAt: timestamp.FromTime(time.Now()),
}
err := mockKV.CAS(ctx, key, func(_ any) (any, bool, error) {
return desc, true, nil
})
require.NoError(b, err)
}

cfg := HATrackerConfig{
EnableHATracker: true,
EnableStartupSync: true,
KVStore: kv.Config{Mock: mockKV},
}
tracker, _ := NewHATracker(cfg, trackerLimits{}, haTrackerStatusConfig, nil, "bench", log.NewNopLogger())

b.ReportAllocs()
for b.Loop() {
err := tracker.syncKVStoreToLocalMap(ctx)
if err != nil {
b.Fatal(err)
}
}
})
}
}

func TestHATracker_CacheWarmupOnStart(t *testing.T) {
t.Parallel()
ctx := context.Background()
reg := prometheus.NewPedanticRegistry()

codec := GetReplicaDescCodec()
kvStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

mockKV := kv.PrefixClient(kvStore, "prefix")

// CAS valid entry
user1 := "user1"
clusterUser1 := "clusterUser1"
key1 := fmt.Sprintf("%s/%s", user1, clusterUser1)
desc1 := &ReplicaDesc{
Replica: "replica-0",
ReceivedAt: timestamp.FromTime(time.Now()),
}

err := mockKV.CAS(ctx, key1, func(_ any) (any, bool, error) {
return desc1, true, nil
})
require.NoError(t, err)

user2 := "user2"
clusterUser2 := "clusterUser2"
key2 := fmt.Sprintf("%s/%s", user2, clusterUser2)
desc2 := &ReplicaDesc{
Replica: "replica-0",
ReceivedAt: timestamp.FromTime(time.Now()),
}
err = mockKV.CAS(ctx, key2, func(_ any) (any, bool, error) {
return desc2, true, nil
})
require.NoError(t, err)

// CAS deleted entry
clusterDeleted := "clusterDeleted"
keyDeleted := fmt.Sprintf("%s/%s", user1, clusterDeleted)
descDeleted := &ReplicaDesc{
Replica: "replica-old",
ReceivedAt: timestamp.FromTime(time.Now()),
DeletedAt: timestamp.FromTime(time.Now()), // Marked as deleted
}
err = mockKV.CAS(ctx, keyDeleted, func(_ any) (any, bool, error) {
return descDeleted, true, nil
})
require.NoError(t, err)

cfg := HATrackerConfig{
EnableHATracker: true,
EnableStartupSync: true,
KVStore: kv.Config{Mock: mockKV}, // Use the seeded KV
UpdateTimeout: time.Second,
UpdateTimeoutJitterMax: 0,
FailoverTimeout: time.Second,
}

tracker, err := NewHATracker(cfg, trackerLimits{maxReplicaGroups: 100}, haTrackerStatusConfig, prometheus.WrapRegistererWithPrefix("cortex_", reg), "test-ha-tracker", log.NewNopLogger())
require.NoError(t, err)

// Start ha tracker
require.NoError(t, services.StartAndAwaitRunning(ctx, tracker))
defer services.StopAndAwaitTerminated(ctx, tracker) // nolint:errcheck

tracker.electedLock.Lock()
// Check local cache updated
desc1Cached, ok := tracker.elected[key1]
require.True(t, ok)
require.Equal(t, desc1.Replica, desc1Cached.Replica)

_, ok = tracker.elected[keyDeleted]
require.False(t, ok)

desc2Cached, ok := tracker.elected[key2]
require.True(t, ok)
require.Equal(t, desc2.Replica, desc2Cached.Replica)

// user1 should have 1 group (clusterUser1), ignoring clusterDeleted
require.NotNil(t, tracker.replicaGroups[user1])
require.Equal(t, 1, len(tracker.replicaGroups[user1]))
_, hasClusterUser1 := tracker.replicaGroups[user1][clusterUser1]
require.True(t, hasClusterUser1)

// user2 should have 1 group (clusterUser2), ignoring clusterDeleted
require.NotNil(t, tracker.replicaGroups[user2])
require.Equal(t, 1, len(tracker.replicaGroups[user2]))
_, hasClusterUser2 := tracker.replicaGroups[user2][clusterUser2]
require.True(t, hasClusterUser2)

tracker.electedLock.Unlock()

// Check metric updated
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_ha_tracker_user_replica_group_count Number of HA replica groups tracked for each user.
# TYPE cortex_ha_tracker_user_replica_group_count gauge
cortex_ha_tracker_user_replica_group_count{user="user1"} 1
cortex_ha_tracker_user_replica_group_count{user="user2"} 1
`), "cortex_ha_tracker_user_replica_group_count",
))
}

func checkUserReplicaGroups(t *testing.T, duration time.Duration, c *HATracker, user string, expectedReplicaGroups int) {
t.Helper()
test.Poll(t, duration, nil, func() any {
Expand Down
6 changes: 6 additions & 0 deletions schemas/cortex-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -3722,6 +3722,12 @@
"type": "boolean",
"x-cli-flag": "distributor.ha-tracker.enable"
},
"enable_startup_sync": {
"default": false,
"description": "If enabled, fetch all tracked keys on startup to populate the local cache. This reduces CAS operations for existing replicas but causes a spike in GET during initialization.",
"type": "boolean",
"x-cli-flag": "distributor.ha-tracker.enable-startup-sync"
},
"ha_tracker_failover_timeout": {
"default": "30s",
"description": "If we don't receive any samples from the accepted replica for a cluster in this amount of time we will failover to the next replica we receive a sample from. This value must be greater than the update timeout",
Expand Down
Loading