diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ef69c15ca..c22d0983fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7852](https://github.com/thanos-io/thanos/pull/7852) Query Frontend: pass "stats" parameter forward to queriers and fix Prometheus stats merging. - [#7832](https://github.com/thanos-io/thanos/pull/7832) Query Frontend: Fix cache keys for dynamic split intervals. - [#7885](https://github.com/thanos-io/thanos/pull/7885) Store: Return chunks to the pool after completing a Series call. +- [#7893](https://github.com/thanos-io/thanos/pull/7893) Sidecar: Fix retrieval of external labels for Prometheus v3.0.0. +- [#7903](https://github.com/thanos-io/thanos/pull/7903) Query: Fix panic on regex store matchers. +- [#7915](https://github.com/thanos-io/thanos/pull/7915) Store: Close block series client at the end to not reuse chunk buffer +- [#7941](https://github.com/thanos-io/thanos/pull/7941) Receive: Fix race condition when adding multiple new tenants, see [issue-7892](https://github.com/thanos-io/thanos/issues/7892). ### Added - [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics. diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index 3ce069c1b2..f190f40ef0 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -32,10 +32,15 @@ type grpcConfig struct { maxConnectionAge time.Duration } -func (gc *grpcConfig) registerFlag(cmd extkingpin.FlagClause) *grpcConfig { +func (gc *grpcConfig) registerFlag(cmd extkingpin.FlagClause, optionalListener bool) *grpcConfig { + var addr = "0.0.0.0:10901" + if optionalListener { + addr = "" + } cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components."). - Default("0.0.0.0:10901").StringVar(&gc.bindAddress) + Default(addr).StringVar(&gc.bindAddress) + cmd.Flag("grpc-server-tls-cert", "TLS Certificate for gRPC server, leave blank to disable TLS"). Default("").StringVar(&gc.tlsSrvCert) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 1506fdd7ba..fb152f5df7 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -83,7 +83,7 @@ func registerQuery(app *extkingpin.App) { httpBindAddr, httpGracePeriod, httpTLSConfig := extkingpin.RegisterHTTPFlags(cmd) var grpcServerConfig grpcConfig - grpcServerConfig.registerFlag(cmd) + grpcServerConfig.registerFlag(cmd, false) secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool() skipVerify := cmd.Flag("grpc-client-tls-skip-verify", "Disable TLS certificate verification i.e self signed, signed by fake CA").Default("false").Bool() diff --git a/cmd/thanos/query_frontend.go b/cmd/thanos/query_frontend.go index 36b9ed53e8..dd15912093 100644 --- a/cmd/thanos/query_frontend.go +++ b/cmd/thanos/query_frontend.go @@ -34,9 +34,11 @@ import ( "github.com/thanos-io/thanos/pkg/logging" "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/queryfrontend" + grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/server/http/middleware" "github.com/thanos-io/thanos/pkg/tenancy" + "github.com/thanos-io/thanos/pkg/tls" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -67,6 +69,9 @@ func registerQueryFrontend(app *extkingpin.App) { cfg.http.registerFlag(cmd) + var grpcServerConfig grpcConfig + grpcServerConfig.registerFlag(cmd, true) + cmd.Flag("web.disable-cors", "Whether to disable CORS headers to be set by Thanos. By default Thanos sets CORS headers to be allowed by all."). Default("false").BoolVar(&cfg.webDisableCORS) @@ -171,7 +176,7 @@ func registerQueryFrontend(app *extkingpin.App) { return errors.Wrap(err, "error while parsing config for request logging") } - return runQueryFrontend(g, logger, reg, tracer, httpLogOpts, cfg, comp) + return runQueryFrontend(g, logger, reg, tracer, httpLogOpts, cfg, comp, grpcServerConfig) }) } @@ -237,6 +242,7 @@ func runQueryFrontend( httpLogOpts []logging.Option, cfg *queryFrontendConfig, comp component.Component, + grpcServerConfig grpcConfig, ) error { tenantHeaderProvided := cfg.TenantHeader != "" && cfg.TenantHeader != tenancy.DefaultTenantHeader // If tenant header is set and different from the default tenant header, add it to the list of org id headers. @@ -328,8 +334,11 @@ func runQueryFrontend( } httpProbe := prober.NewHTTP() + grpcProbe := prober.NewGRPC() + statusProber := prober.Combine( httpProbe, + grpcProbe, prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)), ) @@ -337,6 +346,31 @@ func runQueryFrontend( logMiddleware := logging.NewHTTPServerMiddleware(logger, httpLogOpts...) ins := extpromhttp.NewTenantInstrumentationMiddleware(cfg.TenantHeader, cfg.DefaultTenant, reg, nil) + instr := func(f http.HandlerFunc) http.HandlerFunc { + hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + orgId := extractOrgId(cfg, r) + name := "query-frontend" + if !cfg.webDisableCORS { + api.SetCORS(w) + } + middleware.RequestID( + tracing.HTTPMiddleware( + tracer, + name, + logger, + ins.NewHandler( + name, + logMiddleware.HTTPMiddleware(name, f), + ), + // Cortex frontend middlewares require orgID. + ), + ).ServeHTTP(w, r.WithContext(user.InjectOrgID(r.Context(), orgId))) + }) + return hf + } + + instrumentedHandler := instr(handler.ServeHTTP) + // Start metrics HTTP server. { srv := httpserver.New(logger, reg, comp, httpProbe, @@ -345,29 +379,7 @@ func runQueryFrontend( httpserver.WithTLSConfig(cfg.http.tlsConfig), ) - instr := func(f http.HandlerFunc) http.HandlerFunc { - hf := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - orgId := extractOrgId(cfg, r) - name := "query-frontend" - if !cfg.webDisableCORS { - api.SetCORS(w) - } - middleware.RequestID( - tracing.HTTPMiddleware( - tracer, - name, - logger, - ins.NewHandler( - name, - logMiddleware.HTTPMiddleware(name, f), - ), - // Cortex frontend middlewares require orgID. - ), - ).ServeHTTP(w, r.WithContext(user.InjectOrgID(r.Context(), orgId))) - }) - return hf - } - srv.Handle("/", instr(handler.ServeHTTP)) + srv.Handle("/", instrumentedHandler) g.Add(func() error { statusProber.Healthy() @@ -381,8 +393,37 @@ func runQueryFrontend( }) } + if grpcServerConfig.bindAddress != "" { + level.Info(logger).Log("msg", "starting gRPC server", "address", grpcServerConfig.bindAddress) + tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), grpcServerConfig.tlsSrvCert, grpcServerConfig.tlsSrvKey, grpcServerConfig.tlsSrvClientCA) + if err != nil { + return errors.Wrap(err, "setup gRPC server") + } + + httpgrpcServer := queryfrontend.NewHTTPGRPCServer(instrumentedHandler) + + s := grpcserver.New(logger, reg, tracer, nil, nil, comp, grpcProbe, + grpcserver.WithServer(queryfrontend.RegisterHTTPGRPCServer(httpgrpcServer)), + grpcserver.WithListen(grpcServerConfig.bindAddress), + grpcserver.WithGracePeriod(grpcServerConfig.gracePeriod), + grpcserver.WithMaxConnAge(grpcServerConfig.maxConnectionAge), + grpcserver.WithTLSConfig(tlsCfg), + ) + + g.Add(func() error { + statusProber.Ready() + + return s.ListenAndServe() + }, func(error) { + statusProber.NotReady(err) + s.Shutdown(err) + }) + } else { + level.Info(logger).Log("msg", "gRPC server is disabled") + statusProber.Ready() + } + level.Info(logger).Log("msg", "starting query frontend") - statusProber.Ready() return nil } diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index d700b56986..62097f3308 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -895,7 +895,7 @@ type receiveConfig struct { func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.httpBindAddr, rc.httpGracePeriod, rc.httpTLSConfig = extkingpin.RegisterHTTPFlags(cmd) - rc.grpcConfig.registerFlag(cmd) + rc.grpcConfig.registerFlag(cmd, false) rc.storeRateLimits.RegisterFlags(cmd) cmd.Flag("remote-write.address", "Address to listen on for remote write requests."). diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index a2424efa10..1e4e11d089 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -120,7 +120,7 @@ type Expression struct { func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) { rc.http.registerFlag(cmd) - rc.grpc.registerFlag(cmd) + rc.grpc.registerFlag(cmd, false) rc.web.registerFlag(cmd) rc.shipper.registerFlag(cmd) rc.query.registerFlag(cmd) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 91f7feee54..f33de52136 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -551,7 +551,7 @@ type sidecarConfig struct { func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) { sc.http.registerFlag(cmd) - sc.grpc.registerFlag(cmd) + sc.grpc.registerFlag(cmd, false) sc.prometheus.registerFlag(cmd) sc.tsdb.registerFlag(cmd) sc.reloader.registerFlag(cmd) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 747bc09145..662a6f056c 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -106,7 +106,7 @@ type storeConfig struct { func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { sc.httpConfig = *sc.httpConfig.registerFlag(cmd) - sc.grpcConfig = *sc.grpcConfig.registerFlag(cmd) + sc.grpcConfig = *sc.grpcConfig.registerFlag(cmd, false) sc.storeRateLimits.RegisterFlags(cmd) cmd.Flag("data-dir", "Local data directory used for caching purposes (index-header, in-mem cache items and meta.jsons). If removed, no data will be lost, just store will have to rebuild the cache. NOTE: Putting raw blocks here will not cause the store to read them. For such use cases use Prometheus + sidecar. Ignored if --no-cache-index-header option is specified."). diff --git a/docs/sharding.md b/docs/sharding.md index 9cfa0fbf8a..f943ec071b 100644 --- a/docs/sharding.md +++ b/docs/sharding.md @@ -18,7 +18,7 @@ Queries against store gateway which are touching large number of blocks (no matt # Relabelling -Similar to [promtail](https://grafana.com/docs/loki/latest/send-data/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax. +Similar to [promtail](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax. Currently, thanos only supports the following relabel actions: diff --git a/go.mod b/go.mod index b856043127..d035f0839d 100644 --- a/go.mod +++ b/go.mod @@ -217,7 +217,7 @@ require ( github.com/gobwas/pool v0.2.1 // indirect github.com/gobwas/ws v1.2.1 // indirect github.com/gofrs/flock v0.8.1 // indirect - github.com/gogo/googleapis v1.4.0 // indirect + github.com/gogo/googleapis v1.4.0 github.com/google/go-cmp v0.6.0 github.com/google/go-querystring v1.1.0 // indirect github.com/google/pprof v0.0.0-20240827171923-fa2c70bbbfe5 // indirect @@ -285,7 +285,7 @@ replace ( // Required by Cortex https://github.com/cortexproject/cortex/pull/3051. github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab - github.com/prometheus/prometheus => github.com/vinted/prometheus v1.8.2-0.20241104091239-2b97618294c8 + github.com/prometheus/prometheus => github.com/vinted/prometheus v0.0.0-20241212114003-53a808bbaf43 // Pin kuberesolver/v5 to support new grpc version. Need to upgrade kuberesolver version on weaveworks/common. github.com/sercand/kuberesolver/v4 => github.com/sercand/kuberesolver/v5 v5.1.1 diff --git a/go.sum b/go.sum index 3dc07f2b3b..ec41112b78 100644 --- a/go.sum +++ b/go.sum @@ -2271,8 +2271,8 @@ github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMW github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg= github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= -github.com/vinted/prometheus v1.8.2-0.20241104091239-2b97618294c8 h1:yLIYtyb+wGxhvSDYGV7ySkXXMx+S/TQj6dlzsWMG35w= -github.com/vinted/prometheus v1.8.2-0.20241104091239-2b97618294c8/go.mod h1:n3/PY5be8xgYe+DUCjKdK0eSmDSQBQ6ZOoIUGmO9vEY= +github.com/vinted/prometheus v0.0.0-20241212114003-53a808bbaf43 h1:3PZq7phU4tTEF8It55AHuM8mAx18edD9guvlWiAHyz8= +github.com/vinted/prometheus v0.0.0-20241212114003-53a808bbaf43/go.mod h1:n3/PY5be8xgYe+DUCjKdK0eSmDSQBQ6ZOoIUGmO9vEY= github.com/vultr/govultr/v2 v2.17.2 h1:gej/rwr91Puc/tgh+j33p/BLR16UrIPnSr+AIwYWZQs= github.com/vultr/govultr/v2 v2.17.2/go.mod h1:ZFOKGWmgjytfyjeyAdhQlSWwTjh2ig+X49cAp50dzXI= github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5 h1:nORobjToZAvi54wcuUXLq+XG2Rsr0XEizy5aHBHvqWQ= diff --git a/pkg/queryfrontend/httpgrpc.pb.go b/pkg/queryfrontend/httpgrpc.pb.go new file mode 100644 index 0000000000..94e7dafbcf --- /dev/null +++ b/pkg/queryfrontend/httpgrpc.pb.go @@ -0,0 +1,1278 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: queryfrontend/httpgrpc.proto + +package queryfrontend + +import ( + bytes "bytes" + context "context" + fmt "fmt" + io "io" + math "math" + math_bits "math/bits" + reflect "reflect" + strings "strings" + + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type HTTPRequest struct { + Method string `protobuf:"bytes,1,opt,name=method,proto3" json:"method,omitempty"` + Url string `protobuf:"bytes,2,opt,name=url,proto3" json:"url,omitempty"` + Headers []*Header `protobuf:"bytes,3,rep,name=headers,proto3" json:"headers,omitempty"` + Body []byte `protobuf:"bytes,4,opt,name=body,proto3" json:"body,omitempty"` +} + +func (m *HTTPRequest) Reset() { *m = HTTPRequest{} } +func (*HTTPRequest) ProtoMessage() {} +func (*HTTPRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_534680489746a24d, []int{0} +} +func (m *HTTPRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *HTTPRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_HTTPRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *HTTPRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_HTTPRequest.Merge(m, src) +} +func (m *HTTPRequest) XXX_Size() int { + return m.Size() +} +func (m *HTTPRequest) XXX_DiscardUnknown() { + xxx_messageInfo_HTTPRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_HTTPRequest proto.InternalMessageInfo + +func (m *HTTPRequest) GetMethod() string { + if m != nil { + return m.Method + } + return "" +} + +func (m *HTTPRequest) GetUrl() string { + if m != nil { + return m.Url + } + return "" +} + +func (m *HTTPRequest) GetHeaders() []*Header { + if m != nil { + return m.Headers + } + return nil +} + +func (m *HTTPRequest) GetBody() []byte { + if m != nil { + return m.Body + } + return nil +} + +type HTTPResponse struct { + Code int32 `protobuf:"varint,1,opt,name=Code,proto3" json:"Code,omitempty"` + Headers []*Header `protobuf:"bytes,2,rep,name=headers,proto3" json:"headers,omitempty"` + Body []byte `protobuf:"bytes,3,opt,name=body,proto3" json:"body,omitempty"` +} + +func (m *HTTPResponse) Reset() { *m = HTTPResponse{} } +func (*HTTPResponse) ProtoMessage() {} +func (*HTTPResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_534680489746a24d, []int{1} +} +func (m *HTTPResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *HTTPResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_HTTPResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *HTTPResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_HTTPResponse.Merge(m, src) +} +func (m *HTTPResponse) XXX_Size() int { + return m.Size() +} +func (m *HTTPResponse) XXX_DiscardUnknown() { + xxx_messageInfo_HTTPResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_HTTPResponse proto.InternalMessageInfo + +func (m *HTTPResponse) GetCode() int32 { + if m != nil { + return m.Code + } + return 0 +} + +func (m *HTTPResponse) GetHeaders() []*Header { + if m != nil { + return m.Headers + } + return nil +} + +func (m *HTTPResponse) GetBody() []byte { + if m != nil { + return m.Body + } + return nil +} + +type Header struct { + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Values []string `protobuf:"bytes,2,rep,name=values,proto3" json:"values,omitempty"` +} + +func (m *Header) Reset() { *m = Header{} } +func (*Header) ProtoMessage() {} +func (*Header) Descriptor() ([]byte, []int) { + return fileDescriptor_534680489746a24d, []int{2} +} +func (m *Header) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Header) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Header.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Header) XXX_Merge(src proto.Message) { + xxx_messageInfo_Header.Merge(m, src) +} +func (m *Header) XXX_Size() int { + return m.Size() +} +func (m *Header) XXX_DiscardUnknown() { + xxx_messageInfo_Header.DiscardUnknown(m) +} + +var xxx_messageInfo_Header proto.InternalMessageInfo + +func (m *Header) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +func (m *Header) GetValues() []string { + if m != nil { + return m.Values + } + return nil +} + +func init() { + proto.RegisterType((*HTTPRequest)(nil), "httpgrpc.HTTPRequest") + proto.RegisterType((*HTTPResponse)(nil), "httpgrpc.HTTPResponse") + proto.RegisterType((*Header)(nil), "httpgrpc.Header") +} + +func init() { proto.RegisterFile("queryfrontend/httpgrpc.proto", fileDescriptor_534680489746a24d) } + +var fileDescriptor_534680489746a24d = []byte{ + // 307 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x29, 0x2c, 0x4d, 0x2d, + 0xaa, 0x4c, 0x2b, 0xca, 0xcf, 0x2b, 0x49, 0xcd, 0x4b, 0xd1, 0xcf, 0x28, 0x29, 0x29, 0x48, 0x2f, + 0x2a, 0x48, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x80, 0xf1, 0xa5, 0x44, 0xd2, 0xf3, + 0xd3, 0xf3, 0xc1, 0x82, 0xfa, 0x20, 0x16, 0x44, 0x5e, 0xa9, 0x9c, 0x8b, 0xdb, 0x23, 0x24, 0x24, + 0x20, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x44, 0x48, 0x8c, 0x8b, 0x2d, 0x37, 0xb5, 0x24, 0x23, + 0x3f, 0x45, 0x82, 0x51, 0x81, 0x51, 0x83, 0x33, 0x08, 0xca, 0x13, 0x12, 0xe0, 0x62, 0x2e, 0x2d, + 0xca, 0x91, 0x60, 0x02, 0x0b, 0x82, 0x98, 0x42, 0x5a, 0x5c, 0xec, 0x19, 0xa9, 0x89, 0x29, 0xa9, + 0x45, 0xc5, 0x12, 0xcc, 0x0a, 0xcc, 0x1a, 0xdc, 0x46, 0x02, 0x7a, 0x70, 0xab, 0x3d, 0xc0, 0x12, + 0x41, 0x30, 0x05, 0x42, 0x42, 0x5c, 0x2c, 0x49, 0xf9, 0x29, 0x95, 0x12, 0x2c, 0x0a, 0x8c, 0x1a, + 0x3c, 0x41, 0x60, 0xb6, 0x52, 0x12, 0x17, 0x0f, 0xc4, 0xe2, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, + 0x90, 0x1a, 0xe7, 0xfc, 0x94, 0x54, 0xb0, 0xbd, 0xac, 0x41, 0x60, 0x36, 0xb2, 0x1d, 0x4c, 0xc4, + 0xda, 0xc1, 0x8c, 0x64, 0x87, 0x11, 0x17, 0x1b, 0x44, 0x19, 0xc8, 0xfd, 0xd9, 0xa9, 0x95, 0x50, + 0x4f, 0x81, 0x98, 0x20, 0x9f, 0x96, 0x25, 0xe6, 0x94, 0xa6, 0x42, 0x8c, 0xe6, 0x0c, 0x82, 0xf2, + 0x8c, 0x1c, 0xb9, 0x58, 0x40, 0xee, 0x12, 0xb2, 0xe4, 0x62, 0xf3, 0x48, 0xcc, 0x4b, 0xc9, 0x49, + 0x15, 0x12, 0x45, 0xb2, 0x14, 0x11, 0x54, 0x52, 0x62, 0xe8, 0xc2, 0x10, 0x8f, 0x28, 0x31, 0x38, + 0x99, 0xdf, 0x78, 0x28, 0xc7, 0xf0, 0xe1, 0xa1, 0x1c, 0x63, 0xc3, 0x23, 0x39, 0xc6, 0x15, 0x8f, + 0xe4, 0x18, 0x4f, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x09, + 0x8f, 0xe5, 0x18, 0x2e, 0x3c, 0x96, 0x63, 0xb8, 0xf1, 0x58, 0x8e, 0x21, 0x8a, 0x17, 0x25, 0xee, + 0x92, 0xd8, 0xc0, 0x71, 0x62, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0xde, 0xc5, 0x68, 0xfc, 0xd3, + 0x01, 0x00, 0x00, +} + +func (this *HTTPRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*HTTPRequest) + if !ok { + that2, ok := that.(HTTPRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Method != that1.Method { + return false + } + if this.Url != that1.Url { + return false + } + if len(this.Headers) != len(that1.Headers) { + return false + } + for i := range this.Headers { + if !this.Headers[i].Equal(that1.Headers[i]) { + return false + } + } + if !bytes.Equal(this.Body, that1.Body) { + return false + } + return true +} +func (this *HTTPResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*HTTPResponse) + if !ok { + that2, ok := that.(HTTPResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Code != that1.Code { + return false + } + if len(this.Headers) != len(that1.Headers) { + return false + } + for i := range this.Headers { + if !this.Headers[i].Equal(that1.Headers[i]) { + return false + } + } + if !bytes.Equal(this.Body, that1.Body) { + return false + } + return true +} +func (this *Header) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Header) + if !ok { + that2, ok := that.(Header) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Key != that1.Key { + return false + } + if len(this.Values) != len(that1.Values) { + return false + } + for i := range this.Values { + if this.Values[i] != that1.Values[i] { + return false + } + } + return true +} +func (this *HTTPRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 8) + s = append(s, "&queryfrontend.HTTPRequest{") + s = append(s, "Method: "+fmt.Sprintf("%#v", this.Method)+",\n") + s = append(s, "Url: "+fmt.Sprintf("%#v", this.Url)+",\n") + if this.Headers != nil { + s = append(s, "Headers: "+fmt.Sprintf("%#v", this.Headers)+",\n") + } + s = append(s, "Body: "+fmt.Sprintf("%#v", this.Body)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *HTTPResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&queryfrontend.HTTPResponse{") + s = append(s, "Code: "+fmt.Sprintf("%#v", this.Code)+",\n") + if this.Headers != nil { + s = append(s, "Headers: "+fmt.Sprintf("%#v", this.Headers)+",\n") + } + s = append(s, "Body: "+fmt.Sprintf("%#v", this.Body)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *Header) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&queryfrontend.Header{") + s = append(s, "Key: "+fmt.Sprintf("%#v", this.Key)+",\n") + s = append(s, "Values: "+fmt.Sprintf("%#v", this.Values)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringHttpgrpc(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// HTTPClient is the client API for HTTP service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type HTTPClient interface { + Handle(ctx context.Context, in *HTTPRequest, opts ...grpc.CallOption) (*HTTPResponse, error) +} + +type hTTPClient struct { + cc *grpc.ClientConn +} + +func NewHTTPClient(cc *grpc.ClientConn) HTTPClient { + return &hTTPClient{cc} +} + +func (c *hTTPClient) Handle(ctx context.Context, in *HTTPRequest, opts ...grpc.CallOption) (*HTTPResponse, error) { + out := new(HTTPResponse) + err := c.cc.Invoke(ctx, "/httpgrpc.HTTP/Handle", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// HTTPServer is the server API for HTTP service. +type HTTPServer interface { + Handle(context.Context, *HTTPRequest) (*HTTPResponse, error) +} + +// UnimplementedHTTPServer can be embedded to have forward compatible implementations. +type UnimplementedHTTPServer struct { +} + +func (*UnimplementedHTTPServer) Handle(ctx context.Context, req *HTTPRequest) (*HTTPResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Handle not implemented") +} + +func RegisterHTTPServer(s *grpc.Server, srv HTTPServer) { + s.RegisterService(&_HTTP_serviceDesc, srv) +} + +func _HTTP_Handle_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HTTPRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HTTPServer).Handle(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/httpgrpc.HTTP/Handle", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HTTPServer).Handle(ctx, req.(*HTTPRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _HTTP_serviceDesc = grpc.ServiceDesc{ + ServiceName: "httpgrpc.HTTP", + HandlerType: (*HTTPServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Handle", + Handler: _HTTP_Handle_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "queryfrontend/httpgrpc.proto", +} + +func (m *HTTPRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *HTTPRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *HTTPRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Body) > 0 { + i -= len(m.Body) + copy(dAtA[i:], m.Body) + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Body))) + i-- + dAtA[i] = 0x22 + } + if len(m.Headers) > 0 { + for iNdEx := len(m.Headers) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Headers[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintHttpgrpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + } + if len(m.Url) > 0 { + i -= len(m.Url) + copy(dAtA[i:], m.Url) + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Url))) + i-- + dAtA[i] = 0x12 + } + if len(m.Method) > 0 { + i -= len(m.Method) + copy(dAtA[i:], m.Method) + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Method))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *HTTPResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *HTTPResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *HTTPResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Body) > 0 { + i -= len(m.Body) + copy(dAtA[i:], m.Body) + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Body))) + i-- + dAtA[i] = 0x1a + } + if len(m.Headers) > 0 { + for iNdEx := len(m.Headers) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Headers[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintHttpgrpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + } + if m.Code != 0 { + i = encodeVarintHttpgrpc(dAtA, i, uint64(m.Code)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func (m *Header) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Header) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Header) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Values) > 0 { + for iNdEx := len(m.Values) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Values[iNdEx]) + copy(dAtA[i:], m.Values[iNdEx]) + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Values[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if len(m.Key) > 0 { + i -= len(m.Key) + copy(dAtA[i:], m.Key) + i = encodeVarintHttpgrpc(dAtA, i, uint64(len(m.Key))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintHttpgrpc(dAtA []byte, offset int, v uint64) int { + offset -= sovHttpgrpc(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *HTTPRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Method) + if l > 0 { + n += 1 + l + sovHttpgrpc(uint64(l)) + } + l = len(m.Url) + if l > 0 { + n += 1 + l + sovHttpgrpc(uint64(l)) + } + if len(m.Headers) > 0 { + for _, e := range m.Headers { + l = e.Size() + n += 1 + l + sovHttpgrpc(uint64(l)) + } + } + l = len(m.Body) + if l > 0 { + n += 1 + l + sovHttpgrpc(uint64(l)) + } + return n +} + +func (m *HTTPResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Code != 0 { + n += 1 + sovHttpgrpc(uint64(m.Code)) + } + if len(m.Headers) > 0 { + for _, e := range m.Headers { + l = e.Size() + n += 1 + l + sovHttpgrpc(uint64(l)) + } + } + l = len(m.Body) + if l > 0 { + n += 1 + l + sovHttpgrpc(uint64(l)) + } + return n +} + +func (m *Header) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Key) + if l > 0 { + n += 1 + l + sovHttpgrpc(uint64(l)) + } + if len(m.Values) > 0 { + for _, s := range m.Values { + l = len(s) + n += 1 + l + sovHttpgrpc(uint64(l)) + } + } + return n +} + +func sovHttpgrpc(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozHttpgrpc(x uint64) (n int) { + return sovHttpgrpc(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *HTTPRequest) String() string { + if this == nil { + return "nil" + } + repeatedStringForHeaders := "[]*Header{" + for _, f := range this.Headers { + repeatedStringForHeaders += strings.Replace(f.String(), "Header", "Header", 1) + "," + } + repeatedStringForHeaders += "}" + s := strings.Join([]string{`&HTTPRequest{`, + `Method:` + fmt.Sprintf("%v", this.Method) + `,`, + `Url:` + fmt.Sprintf("%v", this.Url) + `,`, + `Headers:` + repeatedStringForHeaders + `,`, + `Body:` + fmt.Sprintf("%v", this.Body) + `,`, + `}`, + }, "") + return s +} +func (this *HTTPResponse) String() string { + if this == nil { + return "nil" + } + repeatedStringForHeaders := "[]*Header{" + for _, f := range this.Headers { + repeatedStringForHeaders += strings.Replace(f.String(), "Header", "Header", 1) + "," + } + repeatedStringForHeaders += "}" + s := strings.Join([]string{`&HTTPResponse{`, + `Code:` + fmt.Sprintf("%v", this.Code) + `,`, + `Headers:` + repeatedStringForHeaders + `,`, + `Body:` + fmt.Sprintf("%v", this.Body) + `,`, + `}`, + }, "") + return s +} +func (this *Header) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Header{`, + `Key:` + fmt.Sprintf("%v", this.Key) + `,`, + `Values:` + fmt.Sprintf("%v", this.Values) + `,`, + `}`, + }, "") + return s +} +func valueToStringHttpgrpc(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *HTTPRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: HTTPRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: HTTPRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Method", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthHttpgrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Method = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Url", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthHttpgrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Url = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Headers", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthHttpgrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Headers = append(m.Headers, &Header{}) + if err := m.Headers[len(m.Headers)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Body", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthHttpgrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Body = append(m.Body[:0], dAtA[iNdEx:postIndex]...) + if m.Body == nil { + m.Body = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipHttpgrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthHttpgrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *HTTPResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: HTTPResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: HTTPResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Code", wireType) + } + m.Code = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Code |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Headers", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthHttpgrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Headers = append(m.Headers, &Header{}) + if err := m.Headers[len(m.Headers)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Body", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthHttpgrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Body = append(m.Body[:0], dAtA[iNdEx:postIndex]...) + if m.Body == nil { + m.Body = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipHttpgrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthHttpgrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Header) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Header: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Header: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Key", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthHttpgrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Key = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Values", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthHttpgrpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthHttpgrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Values = append(m.Values, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipHttpgrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthHttpgrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipHttpgrpc(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowHttpgrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthHttpgrpc + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupHttpgrpc + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthHttpgrpc + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthHttpgrpc = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowHttpgrpc = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupHttpgrpc = fmt.Errorf("proto: unexpected end of group") +) diff --git a/pkg/queryfrontend/httpgrpc.proto b/pkg/queryfrontend/httpgrpc.proto new file mode 100644 index 0000000000..3d848f1ecd --- /dev/null +++ b/pkg/queryfrontend/httpgrpc.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; + +package httpgrpc; + +import "gogoproto/gogo.proto"; + +option (gogoproto.equal_all) = true; +option (gogoproto.gostring_all) = true; +option (gogoproto.stringer_all) = true; +option (gogoproto.goproto_stringer_all) = false; +option (gogoproto.goproto_unkeyed_all) = false; +option (gogoproto.goproto_unrecognized_all) = false; +option (gogoproto.goproto_sizecache_all) = false; +option go_package = "queryfrontend"; + +service HTTP { + rpc Handle(HTTPRequest) returns (HTTPResponse) {}; +} + +message HTTPRequest { + string method = 1; + string url = 2; + repeated Header headers = 3; + bytes body = 4; +} + +message HTTPResponse { + int32 Code = 1; + repeated Header headers = 2; + bytes body = 3; +} + +message Header { + string key = 1; + repeated string values = 2; +} \ No newline at end of file diff --git a/pkg/queryfrontend/httpgrpcserver.go b/pkg/queryfrontend/httpgrpcserver.go new file mode 100644 index 0000000000..21aa53410e --- /dev/null +++ b/pkg/queryfrontend/httpgrpcserver.go @@ -0,0 +1,91 @@ +package queryfrontend + +import ( + bytes "bytes" + "context" + "net/http" + "net/http/httptest" + + spb "github.com/gogo/googleapis/google/rpc" + "github.com/gogo/protobuf/types" + "github.com/gogo/status" + grpc "google.golang.org/grpc" +) + +type HTTPGRPCServer struct { + handler http.HandlerFunc +} + +type nopCloser struct { + *bytes.Buffer +} + +func toHeader(hs []*Header, header http.Header) { + for _, h := range hs { + header[h.Key] = h.Values + } +} + +func fromHeader(hs http.Header) []*Header { + result := make([]*Header, 0, len(hs)) + for k, vs := range hs { + result = append(result, &Header{ + Key: k, + Values: vs, + }) + } + return result +} + +func (nopCloser) Close() error { return nil } + +// BytesBuffer returns the underlaying `bytes.buffer` used to build this io.ReadCloser. +func (n nopCloser) BytesBuffer() *bytes.Buffer { return n.Buffer } + +var _ HTTPServer = &HTTPGRPCServer{} + +func (s *HTTPGRPCServer) Handle(ctx context.Context, r *HTTPRequest) (*HTTPResponse, error) { + req, err := http.NewRequest(r.Method, r.Url, nopCloser{Buffer: bytes.NewBuffer(r.Body)}) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + req.RequestURI = r.Url + req.ContentLength = int64(len(r.Body)) + toHeader(r.Headers, req.Header) + + recorder := httptest.NewRecorder() + s.handler.ServeHTTP(recorder, req) + resp := &HTTPResponse{ + Code: int32(recorder.Code), + Headers: fromHeader(recorder.Header()), + Body: recorder.Body.Bytes(), + } + if recorder.Code/100 == 5 { + return nil, ErrorFromHTTPResponse(resp) + } + return resp, nil +} + +func ErrorFromHTTPResponse(resp *HTTPResponse) error { + a, err := types.MarshalAny(resp) + if err != nil { + return err + } + + return status.ErrorProto(&spb.Status{ + Code: resp.Code, + Message: string(resp.Body), + Details: []*types.Any{a}, + }) +} + +func RegisterHTTPGRPCServer(httpgrpcServer HTTPServer) func(*grpc.Server) { + return func(s *grpc.Server) { + RegisterHTTPServer(s, httpgrpcServer) + } +} + +func NewHTTPGRPCServer(handler http.HandlerFunc) *HTTPGRPCServer { + return &HTTPGRPCServer{handler: handler} +} diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 1997e0fd38..da974d140c 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -64,11 +64,8 @@ type MultiTSDB struct { hashFunc metadata.HashFunc hashringConfigs []HashringConfig - tsdbClients []store.Client - tsdbClientsNeedUpdate bool - - exemplarClients map[string]*exemplars.TSDB - exemplarClientsNeedUpdate bool + tsdbClients []store.Client + exemplarClients map[string]*exemplars.TSDB metricNameFilterEnabled bool @@ -117,19 +114,19 @@ func NewMultiTSDB( } mt := &MultiTSDB{ - dataDir: dataDir, - logger: log.With(l, "component", "multi-tsdb"), - reg: reg, - tsdbOpts: tsdbOpts, - mtx: &sync.RWMutex{}, - tenants: map[string]*tenant{}, - labels: labels, - tsdbClientsNeedUpdate: true, - exemplarClientsNeedUpdate: true, - tenantLabelName: tenantLabelName, - bucket: bucket, - allowOutOfOrderUpload: allowOutOfOrderUpload, - hashFunc: hashFunc, + dataDir: dataDir, + logger: log.With(l, "component", "multi-tsdb"), + reg: reg, + tsdbOpts: tsdbOpts, + mtx: &sync.RWMutex{}, + tenants: map[string]*tenant{}, + labels: labels, + tsdbClients: make([]store.Client, 0), + exemplarClients: map[string]*exemplars.TSDB{}, + tenantLabelName: tenantLabelName, + bucket: bucket, + allowOutOfOrderUpload: allowOutOfOrderUpload, + hashFunc: hashFunc, } for _, option := range options { @@ -139,6 +136,49 @@ func NewMultiTSDB( return mt } +// testGetTenant returns the tenant with the given tenantID for testing purposes. +func (t *MultiTSDB) testGetTenant(tenantID string) *tenant { + t.mtx.RLock() + defer t.mtx.RUnlock() + return t.tenants[tenantID] +} + +func (t *MultiTSDB) updateTSDBClients() { + t.tsdbClients = t.tsdbClients[:0] + for _, tenant := range t.tenants { + client := tenant.client() + if client != nil { + t.tsdbClients = append(t.tsdbClients, client) + } + } +} + +func (t *MultiTSDB) addTenantUnlocked(tenantID string, newTenant *tenant) { + t.tenants[tenantID] = newTenant + t.updateTSDBClients() + if newTenant.exemplars() != nil { + t.exemplarClients[tenantID] = newTenant.exemplars() + } +} + +func (t *MultiTSDB) addTenantLocked(tenantID string, newTenant *tenant) { + t.mtx.Lock() + defer t.mtx.Unlock() + t.addTenantUnlocked(tenantID, newTenant) +} + +func (t *MultiTSDB) removeTenantUnlocked(tenantID string) { + delete(t.tenants, tenantID) + delete(t.exemplarClients, tenantID) + t.updateTSDBClients() +} + +func (t *MultiTSDB) removeTenantLocked(tenantID string) { + t.mtx.Lock() + defer t.mtx.Unlock() + t.removeTenantUnlocked(tenantID) +} + type localClient struct { store *store.TSDBStore @@ -433,9 +473,7 @@ func (t *MultiTSDB) Prune(ctx context.Context) error { } level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID) - delete(t.tenants, tenantID) - t.tsdbClientsNeedUpdate = true - t.exemplarClientsNeedUpdate = true + t.removeTenantUnlocked(tenantID) } return merr.Err() @@ -595,58 +633,17 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error { return merr.Err() } +// TSDBLocalClients should be used as read-only. func (t *MultiTSDB) TSDBLocalClients() []store.Client { t.mtx.RLock() - if !t.tsdbClientsNeedUpdate { - t.mtx.RUnlock() - return t.tsdbClients - } - - t.mtx.RUnlock() - t.mtx.Lock() - defer t.mtx.Unlock() - if !t.tsdbClientsNeedUpdate { - return t.tsdbClients - } - - res := make([]store.Client, 0, len(t.tenants)) - for _, tenant := range t.tenants { - client := tenant.client() - if client != nil { - res = append(res, client) - } - } - - t.tsdbClientsNeedUpdate = false - t.tsdbClients = res - + defer t.mtx.RUnlock() return t.tsdbClients } +// TSDBExemplars should be used as read-only. func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB { t.mtx.RLock() - if !t.exemplarClientsNeedUpdate { - t.mtx.RUnlock() - return t.exemplarClients - } - t.mtx.RUnlock() - t.mtx.Lock() - defer t.mtx.Unlock() - - if !t.exemplarClientsNeedUpdate { - return t.exemplarClients - } - - res := make(map[string]*exemplars.TSDB, len(t.tenants)) - for k, tenant := range t.tenants { - e := tenant.exemplars() - if e != nil { - res[k] = e - } - } - - t.exemplarClientsNeedUpdate = false - t.exemplarClients = res + defer t.mtx.RUnlock() return t.exemplarClients } @@ -740,11 +737,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant nil, ) if err != nil { - t.mtx.Lock() - delete(t.tenants, tenantID) - t.tsdbClientsNeedUpdate = true - t.exemplarClientsNeedUpdate = true - t.mtx.Unlock() + t.removeTenantLocked(tenantID) return err } var ship *shipper.Shipper @@ -767,6 +760,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant options = append(options, store.WithCuckooMetricNameStoreFilter()) } tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset)) + t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil level.Info(logger).Log("msg", "TSDB is now ready") return nil } @@ -795,9 +789,7 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan } tenant = newTenant() - t.tenants[tenantID] = tenant - t.tsdbClientsNeedUpdate = true - t.exemplarClientsNeedUpdate = true + t.addTenantUnlocked(tenantID, tenant) t.mtx.Unlock() logger := log.With(t.logger, "tenant", tenantID) diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index eb82c22281..bafd882e0e 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -5,6 +5,7 @@ package receive import ( "context" + "fmt" "io" "math" "os" @@ -191,7 +192,7 @@ func TestMultiTSDB(t *testing.T) { testutil.Ok(t, m.Open()) testutil.Ok(t, appendSample(m, testTenant, time.Now())) - tenant := m.tenants[testTenant] + tenant := m.testGetTenant(testTenant) db := tenant.readyStorage().Get() testutil.Equals(t, 0, len(db.Blocks())) @@ -535,6 +536,47 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) { testutil.Equals(t, 1, len(m.TSDBLocalClients())) } +func TestMultiTSDBAddNewTenant(t *testing.T) { + t.Parallel() + const iterations = 10 + // This test detects race conditions, so we run it multiple times to increase the chance of catching the issue. + for i := 0; i < iterations; i++ { + t.Run(fmt.Sprintf("iteration-%d", i), func(t *testing.T) { + dir := t.TempDir() + m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(), + &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + }, + labels.FromStrings("replica", "test"), + "tenant_id", + objstore.NewInMemBucket(), + false, + metadata.NoneFunc, + ) + defer func() { testutil.Ok(t, m.Close()) }() + + concurrency := 50 + var wg sync.WaitGroup + for i := 0; i < concurrency; i++ { + wg.Add(1) + // simulate remote write with new tenant concurrently + go func(i int) { + defer wg.Done() + testutil.Ok(t, appendSample(m, fmt.Sprintf("tenant-%d", i), time.UnixMilli(int64(10)))) + }(i) + // simulate read request concurrently + go func() { + m.TSDBLocalClients() + }() + } + wg.Wait() + testutil.Equals(t, concurrency, len(m.TSDBLocalClients())) + }) + } +} + func TestAlignedHeadFlush(t *testing.T) { hourInSeconds := int64(1 * 60 * 60) @@ -787,7 +829,10 @@ func appendSampleWithLabels(m *MultiTSDB, tenant string, lbls labels.Labels, tim func queryLabelValues(ctx context.Context, m *MultiTSDB) error { proxy := store.NewProxyStore(nil, nil, func() []store.Client { - clients := m.TSDBLocalClients() + m.mtx.Lock() + defer m.mtx.Unlock() + clients := make([]store.Client, len(m.tsdbClients)) + copy(clients, m.tsdbClients) if len(clients) > 0 { clients[0] = &slowClient{clients[0]} } diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index ea1b6d81fd..08fe68e229 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -208,7 +208,7 @@ func TestAddingExternalLabelsForTenants(t *testing.T) { for _, c := range tc.cfg { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -290,7 +290,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -315,7 +315,7 @@ func TestLabelSetsOfTenantsWhenAddingTenants(t *testing.T) { for _, c := range changedConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -528,7 +528,7 @@ func TestLabelSetsOfTenantsWhenChangingLabels(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -696,7 +696,7 @@ func TestAddingLabelsWhenTenantAppearsInMultipleHashrings(t *testing.T) { for _, c := range initialConfig { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) } @@ -768,7 +768,7 @@ func TestReceiverLabelsNotOverwrittenByExternalLabels(t *testing.T) { for _, c := range cfg { for _, tenantId := range c.Tenants { - if m.tenants[tenantId] == nil { + if m.testGetTenant(tenantId) == nil { err = appendSample(m, tenantId, time.Now()) require.NoError(t, err) }