Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: add initial time-based query pushdown #4712

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4680](https://github.com/thanos-io/thanos/pull/4680) Query: add `exemplar.partial-response` flag to control partial response.
- [#4679](https://github.com/thanos-io/thanos/pull/4679) Added `enable-feature` flag to enable negative offsets and @ modifier, similar to Prometheus.
- [#4696](https://github.com/thanos-io/thanos/pull/4696) Query: add cache name to tracing spans.
- [#4712](https://github.com/thanos-io/thanos/pull/4712) Query/Store: added experimental feature `store-pushdown` and corresponding flags to Thanos Store. You can enable it with `--enable-feature` on Thanos Query. Currently it makes Thanos Query push down a query to a leaf node if it is the only one matching the provided time range via the API. It should cover most cases where Sidecar/Ruler/Receive is responsible for a few days of data, and the rest of the data is covered by load-balanced Thanos Stores. Ad-hoc tests show a decrease of up to 50% in duration of queries which touch lots of time series because it is not necessary anymore to transfer all of them over the wire.

### Fixed

Expand Down
18 changes: 16 additions & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/metadata"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/pushdown"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/rules"
"github.com/thanos-io/thanos/pkg/runutil"
Expand All @@ -55,6 +56,7 @@ import (
const (
promqlNegativeOffset = "promql-negative-offset"
promqlAtModifier = "promql-at-modifier"
storePushdown = "store-pushdown"
)

// registerQuery registers a query command.
Expand Down Expand Up @@ -151,7 +153,7 @@ func registerQuery(app *extkingpin.App) {
enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling.").
Hidden().Default("true").Bool()

featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+promqlNegativeOffset+" and "+promqlAtModifier+".").Default("").Strings()
featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+promqlNegativeOffset+", "+storePushdown+", and "+promqlAtModifier+".").Default("").Strings()

enableExemplarPartialResponse := cmd.Flag("exemplar.partial-response", "Enable partial response for exemplar endpoint. --no-exemplar.partial-response for disabling.").
Hidden().Default("true").Bool()
Expand All @@ -170,14 +172,17 @@ func registerQuery(app *extkingpin.App) {
return errors.Wrap(err, "parse federation labels")
}

var enableNegativeOffset, enableAtModifier bool
var enableNegativeOffset, enableAtModifier, enableStorePushdown bool
for _, feature := range *featureList {
if feature == promqlNegativeOffset {
enableNegativeOffset = true
}
if feature == promqlAtModifier {
enableAtModifier = true
}
if feature == storePushdown {
enableStorePushdown = true
}
}

if dup := firstDuplicate(*stores); dup != "" {
Expand Down Expand Up @@ -285,6 +290,7 @@ func registerQuery(app *extkingpin.App) {
*webDisableCORS,
enableAtModifier,
enableNegativeOffset,
enableStorePushdown,
component.Query,
)
})
Expand Down Expand Up @@ -350,6 +356,7 @@ func runQuery(
disableCORS bool,
enableAtModifier bool,
enableNegativeOffset bool,
enableStorePushdown bool,
comp component.Component,
) error {
// TODO(bplotka in PR #513 review): Move arguments into struct.
Expand Down Expand Up @@ -567,6 +574,12 @@ func runQuery(
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName).Register(router, ins)

var pushdownAdapter *pushdown.TimeBasedPushdown

if enableStorePushdown {
pushdownAdapter = pushdown.NewTimeBasedPushdown(endpoints.GetStoreClients, extprom.WrapRegistererWithPrefix("thanos_query_pushdown_", reg))
}

api := v1.NewQueryAPI(
logger,
endpoints,
Expand Down Expand Up @@ -594,6 +607,7 @@ func runQuery(
maxConcurrentQueries,
),
reg,
pushdownAdapter,
)

api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware)
Expand Down
13 changes: 12 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/pushdown"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
Expand Down Expand Up @@ -71,6 +72,9 @@ type storeConfig struct {
reqLogConfig *extflag.PathOrContent
lazyIndexReaderEnabled bool
lazyIndexReaderIdleTimeout time.Duration

maxSamples int
queryTimeout time.Duration
}

func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -162,6 +166,12 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("web.disable-cors", "Whether to disable CORS headers to be set by Thanos. By default Thanos sets CORS headers to be allowed by all.").
Default("false").BoolVar(&sc.webConfig.disableCORS)

cmd.Flag("pushdown.query-timeout", "Timeout of a query sent via the QueryAPI.").
Default("120s").DurationVar(&sc.queryTimeout)

cmd.Flag("pushdown.max-samples", "Maximum samples that could be loaded into memory when executing a query via the QueryAPI.").
Default("5000000").IntVar(&sc.maxSamples)

sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)
}

Expand Down Expand Up @@ -382,7 +392,7 @@ func runStore(
cancel()
})
}
// Start query (proxy) gRPC StoreAPI.
// Start the gRPC server with APIs.
{
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpcConfig.tlsSrvCert, conf.grpcConfig.tlsSrvKey, conf.grpcConfig.tlsSrvClientCA)
if err != nil {
Expand All @@ -391,6 +401,7 @@ func runStore(

s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, conf.component, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(bs)),
grpcserver.WithServer(pushdown.RegisterQueryServer(bs, conf.maxSamples, conf.queryTimeout, logger, extprom.WrapRegistererWithPrefix("thanos_bucket_queryapi_query_", reg))),
grpcserver.WithListen(conf.grpcConfig.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpcConfig.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
3 changes: 2 additions & 1 deletion docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ store nodes.
Flags:
--enable-feature= ... Comma separated experimental feature names to
enable.The current list of features is
promql-negative-offset and promql-at-modifier.
promql-negative-offset, store-pushdown, and
promql-at-modifier.
--grpc-address="0.0.0.0:10901"
Listen ip:port address for gRPC endpoints
(StoreAPI). Make sure this address is routable
Expand Down
5 changes: 5 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ Flags:
Path to YAML file that contains object store
configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--pushdown.max-samples=5000000
Maximum samples that could be loaded into
memory when executing a query via the QueryAPI.
--pushdown.query-timeout=120s
Timeout of a query sent via the QueryAPI.
--request.logging-config=<content>
Alternative to 'request.logging-config-file'
flag (mutually exclusive). Content of YAML file
Expand Down
48 changes: 48 additions & 0 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

cortexutil "github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -51,6 +52,8 @@ import (
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/metadata"
"github.com/thanos-io/thanos/pkg/metadata/metadatapb"
"github.com/thanos-io/thanos/pkg/pushdown"
"github.com/thanos-io/thanos/pkg/pushdown/querypb"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/rules"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
Expand Down Expand Up @@ -101,6 +104,8 @@ type QueryAPI struct {
defaultMetadataTimeRange time.Duration

queryRangeHist prometheus.Histogram

pushdownAdapter *pushdown.TimeBasedPushdown
}

// NewQueryAPI returns an initialized QueryAPI type.
Expand All @@ -127,6 +132,7 @@ func NewQueryAPI(
disableCORS bool,
gate gate.Gate,
reg *prometheus.Registry,
pushdownAdapter *pushdown.TimeBasedPushdown,
) *QueryAPI {
return &QueryAPI{
baseAPI: api.NewBaseAPI(logger, disableCORS, flagsMap),
Expand Down Expand Up @@ -157,6 +163,8 @@ func NewQueryAPI(
Help: "A histogram of the query range window in seconds",
Buckets: prometheus.ExponentialBuckets(15*60, 2, 12),
}),

pushdownAdapter: pushdownAdapter,
}
}

Expand Down Expand Up @@ -379,6 +387,16 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
}, res.Warnings, nil
}

// asisJSONMarshaler returns the given string
// as-is when marshaling it into JSON.
type asisJSONMarshaler struct {
data string
}

func (m *asisJSONMarshaler) MarshalJSON() ([]byte, error) {
return []byte(m.data), nil
}

func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.ApiError) {
start, err := parseTime(r.FormValue("start"))
if err != nil {
Expand Down Expand Up @@ -443,6 +461,36 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr
}

startNS := start.UnixNano()
endNS := end.UnixNano()
if qapi.pushdownAdapter != nil {
if node, match := qapi.pushdownAdapter.Match(startNS, endNS); match {
tracing.DoInSpan(ctx, "query_gate_ismyturn", func(ctx context.Context) {
err = qapi.gate.Start(ctx)
})
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
defer qapi.gate.Done()

level.Debug(qapi.logger).Log("msg", "pushing down", "query", r.FormValue("query"))

resp, err := node.Query(ctx, &querypb.QueryRequest{
Query: r.FormValue("query"),
StartNs: startNS,
EndNs: endNS,
Interval: int64(step),
ReplicaLabels: replicaLabels,
MaxSourceResolution: maxSourceResolution,
})
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}

return &asisJSONMarshaler{resp.Response}, nil, nil
}
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
return nil, nil, apiErr
Expand Down
Loading