Skip to content

Commit bb6ac4b

Browse files
authored
query: Added support for warning gRPC user about partial errors and support for it in querier. (thanos-io#167)
* Added support for warn gRPC user about partial errors and support for it in querier. Signed-off-by: Bartek Plotka <bwplotka@gmail.com> * Fixed storeAPI srv implementations. Signed-off-by: Bartek Plotka <bwplotka@gmail.com> * Addressed comments. Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
1 parent ac26757 commit bb6ac4b

File tree

9 files changed

+460
-147
lines changed

9 files changed

+460
-147
lines changed

pkg/query/querier.go

Lines changed: 62 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,10 @@ func storeMatches(s *StoreInfo, mint, maxt int64, matchers ...*labels.Matcher) b
166166
return true
167167
}
168168

169+
func (q *querier) isDedupEnabled(o *opts) bool {
170+
return o.deduplicate && q.replicaLabel != ""
171+
}
172+
169173
func (q *querier) Select(ms ...*labels.Matcher) (storage.SeriesSet, error) {
170174
var (
171175
mtx sync.Mutex
@@ -192,13 +196,24 @@ func (q *querier) Select(ms ...*labels.Matcher) (storage.SeriesSet, error) {
192196
store := s
193197

194198
g.Go(func() error {
195-
set, err := q.selectSingle(ctx, store.Client, opts.deduplicate, sms...)
199+
set, warnings, err := q.selectSingle(ctx, store.Client, sms...)
196200
if err != nil {
197201
opts.partialErrReporter(errors.Wrapf(err, "querying store failed"))
198202
return nil
199203
}
204+
205+
for _, w := range warnings {
206+
opts.partialErrReporter(errors.New(w))
207+
}
208+
209+
if q.isDedupEnabled(opts) {
210+
// TODO(fabxc): this could potentially pushed further down into the store API
211+
// to make true streaming possible.
212+
sortDedupLabels(set, q.replicaLabel)
213+
}
214+
200215
mtx.Lock()
201-
all = append(all, set)
216+
all = append(all, newStoreSeriesSet(set))
202217
mtx.Unlock()
203218

204219
return nil
@@ -213,7 +228,7 @@ func (q *querier) Select(ms ...*labels.Matcher) (storage.SeriesSet, error) {
213228
set: storepb.MergeSeriesSets(all...),
214229
}
215230

216-
if !opts.deduplicate || q.replicaLabel == "" {
231+
if !q.isDedupEnabled(opts) {
217232
// Return data without any deduplication.
218233
return set, nil
219234
}
@@ -223,59 +238,63 @@ func (q *querier) Select(ms ...*labels.Matcher) (storage.SeriesSet, error) {
223238
return newDedupSeriesSet(set, q.replicaLabel), nil
224239
}
225240

241+
// sortDedupLabels resorts the set so that the same series with different replica
242+
// labels are coming right after each other.
243+
func sortDedupLabels(set []storepb.Series, replicaLabel string) {
244+
for _, s := range set {
245+
// Move the replica label to the very end.
246+
sort.Slice(s.Labels, func(i, j int) bool {
247+
if s.Labels[i].Name == replicaLabel {
248+
return false
249+
}
250+
if s.Labels[j].Name == replicaLabel {
251+
return true
252+
}
253+
return s.Labels[i].Name < s.Labels[j].Name
254+
})
255+
}
256+
// With the re-ordered label sets, re-sorting all series aligns the same series
257+
// from different replicas sequentially.
258+
sort.Slice(set, func(i, j int) bool {
259+
return storepb.CompareLabels(set[i].Labels, set[j].Labels) < 0
260+
})
261+
}
226262
func (q *querier) selectSingle(
227263
ctx context.Context,
228264
client storepb.StoreClient,
229-
deduplicate bool,
230265
ms ...storepb.LabelMatcher,
231-
) (storepb.SeriesSet, error) {
266+
) ([]storepb.Series, []string, error) {
232267
sc, err := client.Series(ctx, &storepb.SeriesRequest{
233268
MinTime: q.mint,
234269
MaxTime: q.maxt,
235270
Matchers: ms,
236271
})
237272
if err != nil {
238-
return nil, errors.Wrap(err, "fetch series")
273+
return nil, nil, errors.Wrap(err, "fetch series")
239274
}
240-
var set []storepb.Series
275+
var (
276+
set []storepb.Series
277+
warnings []string
278+
)
241279

242280
for {
243281
r, err := sc.Recv()
244282
if err == io.EOF {
245283
break
246284
}
247285
if err != nil {
248-
return nil, errors.Wrap(err, "receive series")
286+
return nil, nil, errors.Wrap(err, "receive series")
249287
}
250-
set = append(set, r.Series)
251-
}
252-
res := newStoreSeriesSet(set)
253288

254-
if !deduplicate || q.replicaLabel == "" {
255-
return res, nil
256-
}
257-
// Resort the result so that the same series with different replica
258-
// labels are coming right after each other.
259-
// TODO(fabxc): this could potentially pushed further down into the store API
260-
// to make true streaming possible.
261-
for _, s := range set {
262-
// Move the replica label to the very end.
263-
sort.Slice(s.Labels, func(i, j int) bool {
264-
if s.Labels[i].Name == q.replicaLabel {
265-
return false
266-
}
267-
if s.Labels[j].Name == q.replicaLabel {
268-
return true
269-
}
270-
return s.Labels[i].Name < s.Labels[j].Name
271-
})
289+
if w := r.GetWarning(); w != "" {
290+
warnings = append(warnings, w)
291+
continue
292+
}
293+
294+
set = append(set, *r.GetSeries())
272295
}
273-
// With the re-ordered label sets, re-sorting all series aligns the same series
274-
// from different replicas sequentially.
275-
sort.Slice(set, func(i, j int) bool {
276-
return storepb.CompareLabels(set[i].Labels, set[j].Labels) < 0
277-
})
278-
return res, nil
296+
297+
return set, warnings, nil
279298
}
280299

281300
func (q *querier) LabelValues(name string) ([]string, error) {
@@ -295,12 +314,16 @@ func (q *querier) LabelValues(name string) ([]string, error) {
295314
store := s
296315

297316
g.Go(func() error {
298-
values, err := q.labelValuesSingle(ctx, store.Client, name)
317+
values, warnings, err := q.labelValuesSingle(ctx, store.Client, name)
299318
if err != nil {
300319
opts.partialErrReporter(errors.Wrap(err, "querying store failed"))
301320
return nil
302321
}
303322

323+
for _, w := range warnings {
324+
opts.partialErrReporter(errors.New(w))
325+
}
326+
304327
mtx.Lock()
305328
all = append(all, values)
306329
mtx.Unlock()
@@ -314,14 +337,14 @@ func (q *querier) LabelValues(name string) ([]string, error) {
314337
return strutil.MergeUnsortedSlices(all...), nil
315338
}
316339

317-
func (q *querier) labelValuesSingle(ctx context.Context, client storepb.StoreClient, name string) ([]string, error) {
340+
func (q *querier) labelValuesSingle(ctx context.Context, client storepb.StoreClient, name string) ([]string, []string, error) {
318341
resp, err := client.LabelValues(ctx, &storepb.LabelValuesRequest{
319342
Label: name,
320343
})
321344
if err != nil {
322-
return nil, errors.Wrap(err, "fetch series")
345+
return nil, nil, errors.Wrap(err, "fetch series")
323346
}
324-
return resp.Values, nil
347+
return resp.Values, resp.Warning, nil
325348
}
326349

327350
func (q *querier) Close() error {

pkg/query/querier_test.go

Lines changed: 35 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"io/ioutil"
88
"math"
99
"math/rand"
10-
"sort"
1110
"testing"
1211

1312
"google.golang.org/grpc/codes"
@@ -114,73 +113,57 @@ func TestQuerier_Series(t *testing.T) {
114113
testutil.Ok(t, res.Err())
115114
}
116115

117-
func TestStoreSelectSingle(t *testing.T) {
118-
c := &testStoreClient{
119-
series: []storepb.Series{
120-
{Labels: []storepb.Label{
121-
{"a", "1"},
122-
{"b", "replica-1"},
123-
{"c", "3"},
124-
}},
125-
{Labels: []storepb.Label{
126-
{"a", "1"},
127-
{"b", "replica-1"},
128-
{"c", "3"},
129-
{"d", "4"},
130-
}},
131-
{Labels: []storepb.Label{
132-
{"a", "1"},
133-
{"b", "replica-1"},
134-
{"c", "4"},
135-
}},
136-
{Labels: []storepb.Label{
137-
{"a", "1"},
138-
{"b", "replica-2"},
139-
{"c", "3"},
140-
}},
141-
},
116+
func TestSortReplicaLabel(t *testing.T) {
117+
set := []storepb.Series{
118+
{Labels: []storepb.Label{
119+
{"a", "1"},
120+
{"b", "replica-1"},
121+
{"c", "3"},
122+
}},
123+
{Labels: []storepb.Label{
124+
{"a", "1"},
125+
{"b", "replica-1"},
126+
{"c", "3"},
127+
{"d", "4"},
128+
}},
129+
{Labels: []storepb.Label{
130+
{"a", "1"},
131+
{"b", "replica-1"},
132+
{"c", "4"},
133+
}},
134+
{Labels: []storepb.Label{
135+
{"a", "1"},
136+
{"b", "replica-2"},
137+
{"c", "3"},
138+
}},
142139
}
143-
// Just verify we assembled the input data according to the store API contract.
144-
ok := sort.SliceIsSorted(c.series, func(i, j int) bool {
145-
return storepb.CompareLabels(c.series[i].Labels, c.series[j].Labels) < 0
146-
})
147-
testutil.Assert(t, ok, "input data unoreded")
148140

149-
q := newQuerier(context.Background(), nil, nil, 0, 0, "b")
141+
sortDedupLabels(set, "b")
150142

151-
res, err := q.selectSingle(context.Background(), c, true)
152-
testutil.Ok(t, err)
153-
154-
exp := [][]storepb.Label{
155-
{
143+
exp := []storepb.Series{
144+
{Labels: []storepb.Label{
156145
{"a", "1"},
157146
{"c", "3"},
158147
{"b", "replica-1"},
159-
},
160-
{
148+
}},
149+
{Labels: []storepb.Label{
161150
{"a", "1"},
162151
{"c", "3"},
163152
{"b", "replica-2"},
164-
},
165-
{
153+
}},
154+
{Labels: []storepb.Label{
166155
{"a", "1"},
167156
{"c", "3"},
168157
{"d", "4"},
169158
{"b", "replica-1"},
170-
},
171-
{
159+
}},
160+
{Labels: []storepb.Label{
172161
{"a", "1"},
173162
{"c", "4"},
174163
{"b", "replica-1"},
175-
},
176-
}
177-
var got [][]storepb.Label
178-
179-
for res.Next() {
180-
lset, _ := res.At()
181-
got = append(got, lset)
164+
}},
182165
}
183-
testutil.Equals(t, exp, got)
166+
testutil.Equals(t, exp, set)
184167
}
185168

186169
func TestStoreMatches(t *testing.T) {
@@ -326,7 +309,7 @@ func (c *testStoreSeriesClient) Recv() (*storepb.SeriesResponse, error) {
326309
}
327310
s := c.series[c.i]
328311
c.i++
329-
return &storepb.SeriesResponse{Series: s}, nil
312+
return storepb.NewSeriesResponse(&s), nil
330313
}
331314

332315
func (c *testStoreSeriesClient) Context() context.Context {

pkg/store/bucket.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -601,19 +601,19 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
601601
defer span.Finish()
602602

603603
begin := time.Now()
604-
var resp storepb.SeriesResponse
604+
var series storepb.Series
605605

606606
// Merge series set into an union of all block sets. This exposes all blocks are single seriesSet.
607607
// Returned set is can be out of order in terms of series time ranges. It is fixed later on, inside querier.
608608
set := storepb.MergeSeriesSets(res...)
609609
for set.Next() {
610-
resp.Series.Labels, resp.Series.Chunks = set.At()
610+
series.Labels, series.Chunks = set.At()
611611

612612
stats.mergedSeriesCount++
613-
stats.mergedChunksCount += len(resp.Series.Chunks)
614-
s.metrics.chunkSizeBytes.Observe(float64(chunksSize(resp.Series.Chunks)))
613+
stats.mergedChunksCount += len(series.Chunks)
614+
s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks)))
615615

616-
if err := srv.Send(&resp); err != nil {
616+
if err := srv.Send(storepb.NewSeriesResponse(&series)); err != nil {
617617
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
618618
}
619619
}

pkg/store/bucket_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,10 @@ type testStoreSeriesServer struct {
292292
}
293293

294294
func (s *testStoreSeriesServer) Send(r *storepb.SeriesResponse) error {
295-
s.series = append(s.series, r.Series)
295+
if r.GetSeries() == nil {
296+
return errors.New("no series")
297+
}
298+
s.series = append(s.series, *r.GetSeries())
296299
return nil
297300
}
298301

pkg/store/prometheus.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,6 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
131131
span, _ := tracing.StartSpan(s.Context(), "transform_and_respond")
132132
defer span.Finish()
133133

134-
var res storepb.SeriesResponse
135-
136134
for _, e := range resp.Results[0].Timeseries {
137135
lset := p.translateAndExtendLabels(e.Labels, ext)
138136
// We generally expect all samples of the requested range to be traversed
@@ -141,16 +139,16 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
141139
if err != nil {
142140
return status.Error(codes.Unknown, err.Error())
143141
}
144-
res.Series = storepb.Series{
142+
resp := storepb.NewSeriesResponse(&storepb.Series{
145143
Labels: lset,
146144
Chunks: []storepb.Chunk{{
147145
MinTime: int64(e.Samples[0].Timestamp),
148146
MaxTime: int64(e.Samples[len(e.Samples)-1].Timestamp),
149147
Type: enc,
150148
Data: cb,
151149
}},
152-
}
153-
if err := s.Send(&res); err != nil {
150+
})
151+
if err := s.Send(resp); err != nil {
154152
return err
155153
}
156154
}

pkg/store/storepb/custom.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,23 @@ import (
44
"strings"
55
)
66

7+
func NewWarnSeriesResponse(err error) *SeriesResponse {
8+
return &SeriesResponse{
9+
Result: &SeriesResponse_Warning{
10+
Warning: err.Error(),
11+
},
12+
}
13+
}
14+
15+
func NewSeriesResponse(series *Series) *SeriesResponse {
16+
return &SeriesResponse{
17+
Result: &SeriesResponse_Series{
18+
Series: series,
19+
},
20+
}
21+
}
22+
23+
724
// CompareLabels compares two sets of labels.
825
func CompareLabels(a, b []Label) int {
926
l := len(a)

0 commit comments

Comments
 (0)