From 6cfb50d23ee23afec1e2df0059562f7b8ab08fbe Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 22 Dec 2025 20:32:04 +0900 Subject: [PATCH 1/2] ruler: Refactor query execution and tenant factory logic Signed-off-by: SungJin1212 --- pkg/ruler/compat.go | 143 ++++++++++++++++++++++++++++---------------- 1 file changed, 90 insertions(+), 53 deletions(-) diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 55b458a6f0a..74aec5a61f0 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -166,10 +166,51 @@ type RulesLimits interface { RulerExternalLabels(userID string) labels.Labels } -// EngineQueryFunc returns a new engine query function validating max queryLength. -// Modified from Prometheus rules.EngineQueryFunc -// https://github.com/prometheus/prometheus/blob/v2.39.1/rules/manager.go#L189. +type QueryExecutor func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) + func EngineQueryFunc(engine promql.QueryEngine, frontendClient *frontendClient, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc { + var executor QueryExecutor + + if frontendClient != nil { + // query to query frontend + executor = frontendClient.InstantQuery + } else { + // query to engine + executor = func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { + return executeQuery(ctx, engine, q, qs, t) + } + } + + return wrapWithMiddleware(executor, overrides, userID, lookbackDelta) +} + +func executeQuery(ctx context.Context, engine promql.QueryEngine, q storage.Queryable, qs string, t time.Time) (promql.Vector, error) { + qry, err := engine.NewInstantQuery(ctx, q, nil, qs, t) + if err != nil { + return nil, err + } + defer qry.Close() + + res := qry.Exec(ctx) + if res.Err != nil { + return nil, res.Err + } + + switch v := res.Value.(type) { + case promql.Vector: + return v, nil + case promql.Scalar: + return promql.Vector{promql.Sample{ + T: v.T, + F: v.V, + Metric: labels.Labels{}, + }}, nil + default: + return nil, errors.New("rule result is not a vector or scalar") + } +} + +func wrapWithMiddleware(next QueryExecutor, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc { return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { // Enforce the max query length. maxQueryLength := overrides.MaxQueryLength(userID) @@ -192,35 +233,7 @@ func EngineQueryFunc(engine promql.QueryEngine, frontendClient *frontendClient, } ctx = requestmeta.ContextWithRequestSource(ctx, requestmeta.SourceRuler) - if frontendClient != nil { - v, err := frontendClient.InstantQuery(ctx, qs, t) - if err != nil { - return nil, err - } - - return v, nil - } else { - q, err := engine.NewInstantQuery(ctx, q, nil, qs, t) - if err != nil { - return nil, err - } - res := q.Exec(ctx) - if res.Err != nil { - return nil, res.Err - } - switch v := res.Value.(type) { - case promql.Vector: - return v, nil - case promql.Scalar: - return promql.Vector{promql.Sample{ - T: v.T, - F: v.V, - Metric: labels.Labels{}, - }}, nil - default: - return nil, errors.New("rule result is not a vector or scalar") - } - } + return next(ctx, qs, t) } } @@ -337,36 +350,25 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi // and errors returned by PromQL engine. Errors from Queryable can be either caused by user (limits) or internal errors. // Errors from PromQL are always "user" errors. q = querier.NewErrorTranslateQueryableWithFn(q, WrapQueryableErrors) - return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, frontendPool *client.Pool, reg prometheus.Registerer) (RulesManager, error) { - var client *frontendClient - failedQueries := evalMetrics.FailedQueriesVec.WithLabelValues(userID) - totalQueries := evalMetrics.TotalQueriesVec.WithLabelValues(userID) - totalWrites := evalMetrics.TotalWritesVec.WithLabelValues(userID) - failedWrites := evalMetrics.FailedWritesVec.WithLabelValues(userID) - - if cfg.FrontendAddress != "" { - c, err := frontendPool.GetClientFor(cfg.FrontendAddress) - if err != nil { - return nil, err - } - client = c.(*frontendClient) + qfeClient, err := resolveFrontendClient(cfg.FrontendAddress, frontendPool) + if err != nil { + return nil, err } - var queryFunc rules.QueryFunc - engineQueryFunc := EngineQueryFunc(engine, client, q, overrides, userID, cfg.LookbackDelta) - metricsQueryFunc := MetricsQueryFunc(engineQueryFunc, totalQueries, failedQueries) - if cfg.EnableQueryStats { - queryFunc = RecordAndReportRuleQueryMetrics(metricsQueryFunc, userID, evalMetrics, logger) - } else { - queryFunc = metricsQueryFunc + + if qfeClient == nil && engine == nil { + return nil, fmt.Errorf("neither engine nor frontend client is configured for user %s", userID) } + queryFunc := buildQueryFunc(engine, qfeClient, q, overrides, userID, cfg, evalMetrics, logger) // We let the Prometheus rules manager control the context so that there is a chance // for graceful shutdown of rules that are still in execution even in case the cortex context is canceled. prometheusContext := user.InjectOrgID(context.WithoutCancel(ctx), userID) return rules.NewManager(&rules.ManagerOptions{ - Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites), + Appendable: NewPusherAppendable(p, userID, overrides, + evalMetrics.TotalWritesVec.WithLabelValues(userID), + evalMetrics.FailedWritesVec.WithLabelValues(userID)), Queryable: q, QueryFunc: queryFunc, Context: prometheusContext, @@ -387,6 +389,41 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi } } +func resolveFrontendClient(addr string, pool *client.Pool) (*frontendClient, error) { + if addr == "" { + return nil, nil + } + c, err := pool.GetClientFor(addr) + if err != nil { + return nil, err + } + return c.(*frontendClient), nil +} + +func buildQueryFunc( + engine promql.QueryEngine, + client *frontendClient, + q storage.Queryable, + overrides RulesLimits, + userID string, + cfg Config, + metrics *RuleEvalMetrics, + logger log.Logger, +) rules.QueryFunc { + baseQueryFunc := EngineQueryFunc(engine, client, q, overrides, userID, cfg.LookbackDelta) + + // apply metric middleware + totalQueries := metrics.TotalQueriesVec.WithLabelValues(userID) + failedQueries := metrics.FailedQueriesVec.WithLabelValues(userID) + metricsFunc := MetricsQueryFunc(baseQueryFunc, totalQueries, failedQueries) + + // apply statistic middleware + if cfg.EnableQueryStats { + return RecordAndReportRuleQueryMetrics(metricsFunc, userID, metrics, logger) + } + return metricsFunc +} + type QueryableError struct { err error } From e6fdbfc28c5718af660d1a6ff59092766da6fe9d Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 15 Jan 2026 17:10:01 +0900 Subject: [PATCH 2/2] change to private Signed-off-by: SungJin1212 --- pkg/ruler/compat.go | 12 ++++++------ pkg/ruler/compat_test.go | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 74aec5a61f0..0dc5c0210eb 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -168,7 +168,7 @@ type RulesLimits interface { type QueryExecutor func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) -func EngineQueryFunc(engine promql.QueryEngine, frontendClient *frontendClient, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc { +func engineQueryFunc(engine promql.QueryEngine, frontendClient *frontendClient, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc { var executor QueryExecutor if frontendClient != nil { @@ -237,7 +237,7 @@ func wrapWithMiddleware(next QueryExecutor, overrides RulesLimits, userID string } } -func MetricsQueryFunc(qf rules.QueryFunc, queries, failedQueries prometheus.Counter) rules.QueryFunc { +func metricsQueryFunc(qf rules.QueryFunc, queries, failedQueries prometheus.Counter) rules.QueryFunc { return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { queries.Inc() result, err := qf(ctx, qs, t) @@ -269,7 +269,7 @@ func MetricsQueryFunc(qf rules.QueryFunc, queries, failedQueries prometheus.Coun } } -func RecordAndReportRuleQueryMetrics(qf rules.QueryFunc, userID string, evalMetrics *RuleEvalMetrics, logger log.Logger) rules.QueryFunc { +func recordAndReportRuleQueryMetrics(qf rules.QueryFunc, userID string, evalMetrics *RuleEvalMetrics, logger log.Logger) rules.QueryFunc { queryTime := evalMetrics.RulerQuerySeconds.WithLabelValues(userID) querySeries := evalMetrics.RulerQuerySeries.WithLabelValues(userID) querySample := evalMetrics.RulerQuerySamples.WithLabelValues(userID) @@ -410,16 +410,16 @@ func buildQueryFunc( metrics *RuleEvalMetrics, logger log.Logger, ) rules.QueryFunc { - baseQueryFunc := EngineQueryFunc(engine, client, q, overrides, userID, cfg.LookbackDelta) + baseQueryFunc := engineQueryFunc(engine, client, q, overrides, userID, cfg.LookbackDelta) // apply metric middleware totalQueries := metrics.TotalQueriesVec.WithLabelValues(userID) failedQueries := metrics.FailedQueriesVec.WithLabelValues(userID) - metricsFunc := MetricsQueryFunc(baseQueryFunc, totalQueries, failedQueries) + metricsFunc := metricsQueryFunc(baseQueryFunc, totalQueries, failedQueries) // apply statistic middleware if cfg.EnableQueryStats { - return RecordAndReportRuleQueryMetrics(metricsFunc, userID, metrics, logger) + return recordAndReportRuleQueryMetrics(metricsFunc, userID, metrics, logger) } return metricsFunc } diff --git a/pkg/ruler/compat_test.go b/pkg/ruler/compat_test.go index d921171ee52..19d45062e74 100644 --- a/pkg/ruler/compat_test.go +++ b/pkg/ruler/compat_test.go @@ -381,7 +381,7 @@ func TestMetricsQueryFuncErrors(t *testing.T) { } return promql.Vector{}, err } - qf := MetricsQueryFunc(mockFunc, queries, failures) + qf := metricsQueryFunc(mockFunc, queries, failures) _, err := qf(context.Background(), "test", time.Now()) require.Equal(t, tc.returnedError, err) @@ -404,7 +404,7 @@ func TestRecordAndReportRuleQueryMetrics(t *testing.T) { time.Sleep(1 * time.Second) return promql.Vector{}, nil } - qf := RecordAndReportRuleQueryMetrics(mockFunc, "userID", metrics, log.NewNopLogger()) + qf := recordAndReportRuleQueryMetrics(mockFunc, "userID", metrics, log.NewNopLogger()) _, _ = qf(context.Background(), "test", time.Now()) require.GreaterOrEqual(t, testutil.ToFloat64(metrics.RulerQuerySeconds.WithLabelValues("userID")), float64(1))