@@ -123,85 +123,93 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
123
123
q .Matchers = append (q .Matchers , pm )
124
124
}
125
125
126
+ resp , err := p .promSeries (s .Context (), q )
127
+ if err != nil {
128
+ return errors .Wrap (err , "query Prometheus" )
129
+ }
130
+
131
+ span , _ := tracing .StartSpan (s .Context (), "transform_and_respond" )
132
+ defer span .Finish ()
133
+
134
+ var res storepb.SeriesResponse
135
+
136
+ for _ , e := range resp .Results [0 ].Timeseries {
137
+ lset := p .translateAndExtendLabels (e .Labels , ext )
138
+ // We generally expect all samples of the requested range to be traversed
139
+ // so we just encode all samples into one big chunk regardless of size.
140
+ enc , cb , err := p .encodeChunk (e .Samples )
141
+ if err != nil {
142
+ return status .Error (codes .Unknown , err .Error ())
143
+ }
144
+ res .Series = storepb.Series {
145
+ Labels : lset ,
146
+ Chunks : []storepb.Chunk {{
147
+ MinTime : int64 (e .Samples [0 ].Timestamp ),
148
+ MaxTime : int64 (e .Samples [len (e .Samples )- 1 ].Timestamp ),
149
+ Type : enc ,
150
+ Data : cb ,
151
+ }},
152
+ }
153
+ if err := s .Send (& res ); err != nil {
154
+ return err
155
+ }
156
+ }
157
+ return nil
158
+ }
159
+
160
+ func (p * PrometheusStore ) promSeries (ctx context.Context , q prompb.Query ) (* prompb.ReadResponse , error ) {
161
+ span , ctx := tracing .StartSpan (ctx , "query_prometheus" )
162
+ defer span .Finish ()
163
+
126
164
reqb , err := proto .Marshal (& prompb.ReadRequest {Queries : []prompb.Query {q }})
127
165
if err != nil {
128
- return errors .Wrap (err , "marshal read request" )
166
+ return nil , errors .Wrap (err , "marshal read request" )
129
167
}
130
168
131
169
u := * p .base
132
170
u .Path = "/api/v1/read"
133
171
134
172
preq , err := http .NewRequest ("POST" , u .String (), bytes .NewReader (snappy .Encode (nil , reqb )))
135
173
if err != nil {
136
- return errors .Wrap (err , "unable to create request" )
174
+ return nil , errors .Wrap (err , "unable to create request" )
137
175
}
138
176
preq .Header .Add ("Content-Encoding" , "snappy" )
139
177
preq .Header .Set ("Content-Type" , "application/x-protobuf" )
140
178
preq .Header .Set ("X-Prometheus-Remote-Read-Version" , "0.1.0" )
141
179
142
- span , ctx := tracing .StartSpan (s .Context (), "/prom_v1_read_series HTTP[client]" )
143
- defer span .Finish ()
144
-
145
180
preq = preq .WithContext (ctx )
146
181
147
182
presp , err := p .client .Do (preq )
148
183
if err != nil {
149
- return errors .Wrap (err , "send request" )
184
+ return nil , errors .Wrap (err , "send request" )
150
185
}
151
186
defer presp .Body .Close ()
152
187
153
188
if presp .StatusCode / 100 != 2 {
154
- return errors .Errorf ("request failed with code %s" , presp .Status )
189
+ return nil , errors .Errorf ("request failed with code %s" , presp .Status )
155
190
}
156
191
157
192
buf := bytes .NewBuffer (p .getBuffer ())
158
193
defer func () {
159
194
p .putBuffer (buf .Bytes ())
160
195
}()
161
196
if _ , err := io .Copy (buf , presp .Body ); err != nil {
162
- return errors .Wrap (err , "copy response" )
197
+ return nil , errors .Wrap (err , "copy response" )
163
198
}
164
199
decomp , err := snappy .Decode (p .getBuffer (), buf .Bytes ())
165
200
defer p .putBuffer (decomp )
166
201
if err != nil {
167
- return errors .Wrap (err , "decompress response" )
202
+ return nil , errors .Wrap (err , "decompress response" )
168
203
}
169
204
170
205
var data prompb.ReadResponse
171
206
if err := proto .Unmarshal (decomp , & data ); err != nil {
172
- return errors .Wrap (err , "unmarshal response" )
207
+ return nil , errors .Wrap (err , "unmarshal response" )
173
208
}
174
209
if len (data .Results ) != 1 {
175
- return errors .Errorf ("unexepected result size %d" , len (data .Results ))
176
- }
177
-
178
- var res storepb.SeriesResponse
179
-
180
- for _ , e := range data .Results [0 ].Timeseries {
181
- lset := p .translateAndExtendLabels (e .Labels , ext )
182
- // We generally expect all samples of the requested range to be traversed
183
- // so we just encode all samples into one big chunk regardless of size.
184
- //
185
- // Drop all data before r.MinTime since we might have fetched more than
186
- // the requested range (see above).
187
- enc , b , err := p .encodeChunk (e .Samples , r .MinTime )
188
- if err != nil {
189
- return status .Error (codes .Unknown , err .Error ())
190
- }
191
- res .Series = storepb.Series {
192
- Labels : lset ,
193
- Chunks : []storepb.Chunk {{
194
- MinTime : int64 (e .Samples [0 ].Timestamp ),
195
- MaxTime : int64 (e .Samples [len (e .Samples )- 1 ].Timestamp ),
196
- Type : enc ,
197
- Data : b ,
198
- }},
199
- }
200
- if err := s .Send (& res ); err != nil {
201
- return err
202
- }
210
+ return nil , errors .Errorf ("unexepected result size %d" , len (data .Results ))
203
211
}
204
- return nil
212
+ return & data , nil
205
213
}
206
214
207
215
func extLabelsMatches (extLabels labels.Labels , ms []storepb.LabelMatcher ) (bool , []storepb.LabelMatcher , error ) {
@@ -225,23 +233,18 @@ func extLabelsMatches(extLabels labels.Labels, ms []storepb.LabelMatcher) (bool,
225
233
return false , nil , nil
226
234
}
227
235
}
228
-
229
236
return true , newMatcher , nil
230
237
}
231
238
232
- // encodeChunk translates the sample pairs into a chunk. It takes a minimum timestamp
233
- // and drops all samples before that one.
234
- func (p * PrometheusStore ) encodeChunk (ss []prompb.Sample , mint int64 ) (storepb.Chunk_Encoding , []byte , error ) {
239
+ // encodeChunk translates the sample pairs into a chunk.
240
+ func (p * PrometheusStore ) encodeChunk (ss []prompb.Sample ) (storepb.Chunk_Encoding , []byte , error ) {
235
241
c := chunkenc .NewXORChunk ()
236
242
237
243
a , err := c .Appender ()
238
244
if err != nil {
239
245
return 0 , nil , err
240
246
}
241
247
for _ , s := range ss {
242
- if int64 (s .Timestamp ) < mint {
243
- continue
244
- }
245
248
a .Append (int64 (s .Timestamp ), float64 (s .Value ))
246
249
}
247
250
return storepb .Chunk_XOR , c .Bytes (), nil
0 commit comments