Skip to content

Commit af2a23e

Browse files
fabxcbwplotka
authored andcommitted
store: add response trace, small fixes (thanos-io#162)
1 parent 52abd09 commit af2a23e

File tree

1 file changed

+49
-46
lines changed

1 file changed

+49
-46
lines changed

pkg/store/prometheus.go

Lines changed: 49 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -123,85 +123,93 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
123123
q.Matchers = append(q.Matchers, pm)
124124
}
125125

126+
resp, err := p.promSeries(s.Context(), q)
127+
if err != nil {
128+
return errors.Wrap(err, "query Prometheus")
129+
}
130+
131+
span, _ := tracing.StartSpan(s.Context(), "transform_and_respond")
132+
defer span.Finish()
133+
134+
var res storepb.SeriesResponse
135+
136+
for _, e := range resp.Results[0].Timeseries {
137+
lset := p.translateAndExtendLabels(e.Labels, ext)
138+
// We generally expect all samples of the requested range to be traversed
139+
// so we just encode all samples into one big chunk regardless of size.
140+
enc, cb, err := p.encodeChunk(e.Samples)
141+
if err != nil {
142+
return status.Error(codes.Unknown, err.Error())
143+
}
144+
res.Series = storepb.Series{
145+
Labels: lset,
146+
Chunks: []storepb.Chunk{{
147+
MinTime: int64(e.Samples[0].Timestamp),
148+
MaxTime: int64(e.Samples[len(e.Samples)-1].Timestamp),
149+
Type: enc,
150+
Data: cb,
151+
}},
152+
}
153+
if err := s.Send(&res); err != nil {
154+
return err
155+
}
156+
}
157+
return nil
158+
}
159+
160+
func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prompb.ReadResponse, error) {
161+
span, ctx := tracing.StartSpan(ctx, "query_prometheus")
162+
defer span.Finish()
163+
126164
reqb, err := proto.Marshal(&prompb.ReadRequest{Queries: []prompb.Query{q}})
127165
if err != nil {
128-
return errors.Wrap(err, "marshal read request")
166+
return nil, errors.Wrap(err, "marshal read request")
129167
}
130168

131169
u := *p.base
132170
u.Path = "/api/v1/read"
133171

134172
preq, err := http.NewRequest("POST", u.String(), bytes.NewReader(snappy.Encode(nil, reqb)))
135173
if err != nil {
136-
return errors.Wrap(err, "unable to create request")
174+
return nil, errors.Wrap(err, "unable to create request")
137175
}
138176
preq.Header.Add("Content-Encoding", "snappy")
139177
preq.Header.Set("Content-Type", "application/x-protobuf")
140178
preq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")
141179

142-
span, ctx := tracing.StartSpan(s.Context(), "/prom_v1_read_series HTTP[client]")
143-
defer span.Finish()
144-
145180
preq = preq.WithContext(ctx)
146181

147182
presp, err := p.client.Do(preq)
148183
if err != nil {
149-
return errors.Wrap(err, "send request")
184+
return nil, errors.Wrap(err, "send request")
150185
}
151186
defer presp.Body.Close()
152187

153188
if presp.StatusCode/100 != 2 {
154-
return errors.Errorf("request failed with code %s", presp.Status)
189+
return nil, errors.Errorf("request failed with code %s", presp.Status)
155190
}
156191

157192
buf := bytes.NewBuffer(p.getBuffer())
158193
defer func() {
159194
p.putBuffer(buf.Bytes())
160195
}()
161196
if _, err := io.Copy(buf, presp.Body); err != nil {
162-
return errors.Wrap(err, "copy response")
197+
return nil, errors.Wrap(err, "copy response")
163198
}
164199
decomp, err := snappy.Decode(p.getBuffer(), buf.Bytes())
165200
defer p.putBuffer(decomp)
166201
if err != nil {
167-
return errors.Wrap(err, "decompress response")
202+
return nil, errors.Wrap(err, "decompress response")
168203
}
169204

170205
var data prompb.ReadResponse
171206
if err := proto.Unmarshal(decomp, &data); err != nil {
172-
return errors.Wrap(err, "unmarshal response")
207+
return nil, errors.Wrap(err, "unmarshal response")
173208
}
174209
if len(data.Results) != 1 {
175-
return errors.Errorf("unexepected result size %d", len(data.Results))
176-
}
177-
178-
var res storepb.SeriesResponse
179-
180-
for _, e := range data.Results[0].Timeseries {
181-
lset := p.translateAndExtendLabels(e.Labels, ext)
182-
// We generally expect all samples of the requested range to be traversed
183-
// so we just encode all samples into one big chunk regardless of size.
184-
//
185-
// Drop all data before r.MinTime since we might have fetched more than
186-
// the requested range (see above).
187-
enc, b, err := p.encodeChunk(e.Samples, r.MinTime)
188-
if err != nil {
189-
return status.Error(codes.Unknown, err.Error())
190-
}
191-
res.Series = storepb.Series{
192-
Labels: lset,
193-
Chunks: []storepb.Chunk{{
194-
MinTime: int64(e.Samples[0].Timestamp),
195-
MaxTime: int64(e.Samples[len(e.Samples)-1].Timestamp),
196-
Type: enc,
197-
Data: b,
198-
}},
199-
}
200-
if err := s.Send(&res); err != nil {
201-
return err
202-
}
210+
return nil, errors.Errorf("unexepected result size %d", len(data.Results))
203211
}
204-
return nil
212+
return &data, nil
205213
}
206214

207215
func extLabelsMatches(extLabels labels.Labels, ms []storepb.LabelMatcher) (bool, []storepb.LabelMatcher, error) {
@@ -225,23 +233,18 @@ func extLabelsMatches(extLabels labels.Labels, ms []storepb.LabelMatcher) (bool,
225233
return false, nil, nil
226234
}
227235
}
228-
229236
return true, newMatcher, nil
230237
}
231238

232-
// encodeChunk translates the sample pairs into a chunk. It takes a minimum timestamp
233-
// and drops all samples before that one.
234-
func (p *PrometheusStore) encodeChunk(ss []prompb.Sample, mint int64) (storepb.Chunk_Encoding, []byte, error) {
239+
// encodeChunk translates the sample pairs into a chunk.
240+
func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encoding, []byte, error) {
235241
c := chunkenc.NewXORChunk()
236242

237243
a, err := c.Appender()
238244
if err != nil {
239245
return 0, nil, err
240246
}
241247
for _, s := range ss {
242-
if int64(s.Timestamp) < mint {
243-
continue
244-
}
245248
a.Append(int64(s.Timestamp), float64(s.Value))
246249
}
247250
return storepb.Chunk_XOR, c.Bytes(), nil

0 commit comments

Comments
 (0)