diff --git a/pkg/ruler/rulestore/bucketclient/bucket_client.go b/pkg/ruler/rulestore/bucketclient/bucket_client.go index 62f6903275f..5276f2b3f81 100644 --- a/pkg/ruler/rulestore/bucketclient/bucket_client.go +++ b/pkg/ruler/rulestore/bucketclient/bucket_client.go @@ -8,12 +8,14 @@ import ( "io" "strings" "sync" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/extprom" "golang.org/x/sync/errgroup" @@ -25,6 +27,11 @@ import ( "github.com/cortexproject/cortex/pkg/util/users" ) +type cachedRuleGroup struct { + downloadedAt time.Time + ruleGroup *rulespb.RuleGroupDesc +} + const ( // The bucket prefix under which all tenants rule groups are stored. rulesPrefix = "rules" @@ -48,6 +55,10 @@ type BucketRuleStore struct { usersScanner users.Scanner userIndexUpdater *users.UserIndexUpdater + + ruleGroupCache map[string]*cachedRuleGroup + ruleGroupCacheMu sync.RWMutex + cacheOps *prometheus.CounterVec } func NewBucketRuleStore(bkt objstore.Bucket, userScannerCfg users.UsersScannerConfig, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) (*BucketRuleStore, error) { @@ -74,6 +85,11 @@ func NewBucketRuleStore(bkt objstore.Bucket, userScannerCfg users.UsersScannerCo logger: logger, usersScanner: usersScanner, userIndexUpdater: userIndexUpdater, + ruleGroupCache: make(map[string]*cachedRuleGroup), + cacheOps: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ruler_rule_group_load_cache_operations_total", + Help: "Total number of rule group load operations by cache result (hit=skipped GET, miss=full GET).", + }, []string{"result"}), }, nil } @@ -82,21 +98,40 @@ func (b *BucketRuleStore) GetUserIndexUpdater() *users.UserIndexUpdater { } // getRuleGroup loads and return a rules group. If existing rule group is supplied, it is Reset and reused. If nil, new RuleGroupDesc is allocated. -func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, groupName string, rg *rulespb.RuleGroupDesc) (*rulespb.RuleGroupDesc, error) { +// Uses conditional download: checks LastModified via HEAD before doing a full GET to avoid +// redundant downloads for unchanged rule groups. +func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, groupName string, _ *rulespb.RuleGroupDesc) (*rulespb.RuleGroupDesc, error) { userBucket := bucket.NewUserBucketClient(userID, b.bucket, b.cfgProvider) objectKey := getRuleGroupObjectKey(namespace, groupName) + cacheKey := userID + "/" + objectKey + + // Only check S3 HEAD if we have a cached version to compare against. + b.ruleGroupCacheMu.RLock() + cached, hasCached := b.ruleGroupCache[cacheKey] + b.ruleGroupCacheMu.RUnlock() + + if hasCached { + attrs, err := userBucket.Attributes(ctx, objectKey) + if err == nil && cached.downloadedAt.After(attrs.LastModified) { + b.cacheOps.WithLabelValues("hit").Inc() + return cached.ruleGroup, nil + } + // HEAD failed or file changed — fall through to full GET. + } + + // Full GET: cold cache or file has changed. + b.cacheOps.WithLabelValues("miss").Inc() reader, err := userBucket.Get(ctx, objectKey) if userBucket.IsObjNotFoundErr(err) { + b.evictFromCache(cacheKey) level.Debug(b.logger).Log("msg", "rule group does not exist", "user", userID, "key", objectKey) return nil, rulestore.ErrGroupNotFound } - if userBucket.IsAccessDeniedErr(err) { level.Debug(b.logger).Log("msg", "permission denied when loading group", "user", userID, "key", objectKey) return nil, rulestore.ErrAccessDenied } - if err != nil { return nil, errors.Wrapf(err, "failed to get rule group %s", objectKey) } @@ -107,20 +142,53 @@ func (b *BucketRuleStore) getRuleGroup(ctx context.Context, userID, namespace, g return nil, errors.Wrapf(err, "failed to read rule group %s", objectKey) } - if rg == nil { - rg = &rulespb.RuleGroupDesc{} - } else { - rg.Reset() + rg := &rulespb.RuleGroupDesc{} + if err = proto.Unmarshal(buf, rg); err != nil { + return nil, errors.Wrapf(err, "failed to unmarshal rule group %s", objectKey) } - err = proto.Unmarshal(buf, rg) - if err != nil { - return nil, errors.Wrapf(err, "failed to unmarshal rule group %s", objectKey) + b.ruleGroupCacheMu.Lock() + b.ruleGroupCache[cacheKey] = &cachedRuleGroup{ + downloadedAt: time.Now(), + ruleGroup: rg, } + b.ruleGroupCacheMu.Unlock() return rg, nil } +// evictFromCache removes a rule group from the cache (e.g., when deleted). +func (b *BucketRuleStore) evictFromCache(cacheKey string) { + b.ruleGroupCacheMu.Lock() + delete(b.ruleGroupCache, cacheKey) + b.ruleGroupCacheMu.Unlock() +} + +// ClearCache removes all cached rule groups. Exposed for testing. +func (b *BucketRuleStore) ClearCache() { + b.ruleGroupCacheMu.Lock() + b.ruleGroupCache = make(map[string]*cachedRuleGroup) + b.ruleGroupCacheMu.Unlock() +} + +// pruneCache removes cache entries for rule groups not in the current groupsToLoad set. +func (b *BucketRuleStore) pruneCache(groupsToLoad map[string]rulespb.RuleGroupList) { + validKeys := make(map[string]struct{}, len(groupsToLoad)) + for user, groups := range groupsToLoad { + for _, g := range groups { + validKeys[user+"/"+getRuleGroupObjectKey(g.Namespace, g.Name)] = struct{}{} + } + } + + b.ruleGroupCacheMu.Lock() + for key := range b.ruleGroupCache { + if _, ok := validKeys[key]; !ok { + delete(b.ruleGroupCache, key) + } + } + b.ruleGroupCacheMu.Unlock() +} + // ListAllUsers implements rules.RuleStore. func (b *BucketRuleStore) ListAllUsers(ctx context.Context) ([]string, error) { active, deleting, _, err := b.usersScanner.ScanUsers(ctx) @@ -268,6 +336,9 @@ outer: return loadedGroups, e } + // Prune cache entries for rule groups no longer owned by this pod (e.g., after ring rebalance). + b.pruneCache(groupsToLoad) + return loadedGroups, errs.Err() } diff --git a/pkg/ruler/rulestore/bucketclient/bucket_client_test.go b/pkg/ruler/rulestore/bucketclient/bucket_client_test.go index 343a3354d7f..1d82e26ef2a 100644 --- a/pkg/ruler/rulestore/bucketclient/bucket_client_test.go +++ b/pkg/ruler/rulestore/bucketclient/bucket_client_test.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + promtestutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/rulefmt" "github.com/stretchr/testify/assert" @@ -201,7 +202,7 @@ func TestLoadRules(t *testing.T) { // Load with missing rule groups fails. require.NoError(t, rs.DeleteRuleGroup(context.Background(), "user1", "hello", "first testGroup")) _, err = rs.LoadRuleGroups(context.Background(), allGroupsMap) - require.EqualError(t, err, "get rule group user=\"user2\", namespace=\"world\", name=\"first testGroup\": group does not exist") + require.EqualError(t, err, "get rule group user=\"user1\", namespace=\"hello\", name=\"first testGroup\": group does not exist") }) } @@ -461,3 +462,101 @@ func (mb mockBucket) Iter(_ context.Context, dir string, f func(string) error, o } return nil } + +func TestLoadRuleGroupsCache(t *testing.T) { + bucketClient := objstore.NewInMemBucket() + reg := prometheus.NewPedanticRegistry() + usersScannerConfig := users.UsersScannerConfig{Strategy: users.UserScanStrategyList} + bucketStore, err := NewBucketRuleStore(bucketClient, usersScannerConfig, nil, log.NewNopLogger(), reg) + require.NoError(t, err) + + // Setup: create a rule group. + desc := rulespb.ToProto("user1", "ns", rulefmt.RuleGroup{Name: "group1", Interval: model.Duration(time.Minute)}) + require.NoError(t, bucketStore.SetRuleGroup(context.Background(), "user1", "ns", desc)) + + allGroups, err := bucketStore.ListAllRuleGroups(context.Background()) + require.NoError(t, err) + + // First load: cold cache, should do full GET (miss). + loaded, err := bucketStore.LoadRuleGroups(context.Background(), allGroups) + require.NoError(t, err) + require.Len(t, loaded["user1"], 1) + require.Equal(t, "group1", loaded["user1"][0].Name) + + // Second load: cache is warm, file unchanged → should be a cache hit. + time.Sleep(10 * time.Millisecond) // ensure downloadedAt is after LastModified + loaded2, err := bucketStore.LoadRuleGroups(context.Background(), allGroups) + require.NoError(t, err) + require.Len(t, loaded2["user1"], 1) + require.Equal(t, "group1", loaded2["user1"][0].Name) + + // Verify cache hit metric. + hitCount := promtestutil.ToFloat64(bucketStore.cacheOps.WithLabelValues("hit")) + require.Equal(t, float64(1), hitCount) +} + +func TestLoadRuleGroupsCacheMissOnModification(t *testing.T) { + bucketClient := objstore.NewInMemBucket() + reg := prometheus.NewPedanticRegistry() + usersScannerConfig := users.UsersScannerConfig{Strategy: users.UserScanStrategyList} + bucketStore, err := NewBucketRuleStore(bucketClient, usersScannerConfig, nil, log.NewNopLogger(), reg) + require.NoError(t, err) + + // Setup: create a rule group. + desc := rulespb.ToProto("user1", "ns", rulefmt.RuleGroup{Name: "group1", Interval: model.Duration(time.Minute)}) + require.NoError(t, bucketStore.SetRuleGroup(context.Background(), "user1", "ns", desc)) + + allGroups, err := bucketStore.ListAllRuleGroups(context.Background()) + require.NoError(t, err) + + // First load: populates cache. + _, err = bucketStore.LoadRuleGroups(context.Background(), allGroups) + require.NoError(t, err) + + // Modify the rule group in S3. + time.Sleep(10 * time.Millisecond) + desc2 := rulespb.ToProto("user1", "ns", rulefmt.RuleGroup{Name: "group1", Interval: model.Duration(2 * time.Minute)}) + require.NoError(t, bucketStore.SetRuleGroup(context.Background(), "user1", "ns", desc2)) + + // Second load: file modified → cache miss → should get new content. + loaded, err := bucketStore.LoadRuleGroups(context.Background(), allGroups) + require.NoError(t, err) + require.Equal(t, 2*time.Minute, loaded["user1"][0].Interval) +} + +func TestLoadRuleGroupsCachePrune(t *testing.T) { + bucketClient := objstore.NewInMemBucket() + reg := prometheus.NewPedanticRegistry() + usersScannerConfig := users.UsersScannerConfig{Strategy: users.UserScanStrategyList} + bucketStore, err := NewBucketRuleStore(bucketClient, usersScannerConfig, nil, log.NewNopLogger(), reg) + require.NoError(t, err) + + // Setup: create two rule groups. + desc1 := rulespb.ToProto("user1", "ns", rulefmt.RuleGroup{Name: "group1", Interval: model.Duration(time.Minute)}) + desc2 := rulespb.ToProto("user1", "ns", rulefmt.RuleGroup{Name: "group2", Interval: model.Duration(time.Minute)}) + require.NoError(t, bucketStore.SetRuleGroup(context.Background(), "user1", "ns", desc1)) + require.NoError(t, bucketStore.SetRuleGroup(context.Background(), "user1", "ns", desc2)) + + allGroups, err := bucketStore.ListAllRuleGroups(context.Background()) + require.NoError(t, err) + + // Load both groups → cache has 2 entries. + _, err = bucketStore.LoadRuleGroups(context.Background(), allGroups) + require.NoError(t, err) + + bucketStore.ruleGroupCacheMu.RLock() + require.Len(t, bucketStore.ruleGroupCache, 2) + bucketStore.ruleGroupCacheMu.RUnlock() + + // Now load only group1 (simulating ring rebalance where group2 is no longer owned). + partialGroups := map[string]rulespb.RuleGroupList{ + "user1": {allGroups["user1"][0]}, // only first group + } + _, err = bucketStore.LoadRuleGroups(context.Background(), partialGroups) + require.NoError(t, err) + + // Cache should be pruned to 1 entry. + bucketStore.ruleGroupCacheMu.RLock() + require.Len(t, bucketStore.ruleGroupCache, 1) + bucketStore.ruleGroupCacheMu.RUnlock() +} diff --git a/pkg/util/testutil/objstore.go b/pkg/util/testutil/objstore.go index efd9fb9aa66..bcdb7f34a30 100644 --- a/pkg/util/testutil/objstore.go +++ b/pkg/util/testutil/objstore.go @@ -97,6 +97,12 @@ func (m *MockBucketFailure) Attributes(ctx context.Context, name string) (objsto if e, ok := m.AttributesFailures[name]; ok { return objstore.ObjectAttributes{}, e } + // In real object storage, HEAD fails with the same errors as GET (e.g., access denied, not found). + for prefix, err := range m.GetFailures { + if strings.HasPrefix(name, prefix) { + return objstore.ObjectAttributes{}, err + } + } return m.Bucket.Attributes(ctx, name) }