Skip to content

Commit 01ffe70

Browse files
authored
downsample: handle staleness markers, add average chunk iterator (thanos-io#182)
* downsample: handle staleness markers, add average chunk iterator * Add comment on downsampling levels
1 parent fb2f545 commit 01ffe70

File tree

2 files changed

+124
-36
lines changed

2 files changed

+124
-36
lines changed

pkg/compact/downsample/downsample.go

Lines changed: 96 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"sort"
1010
"strings"
1111

12+
"github.com/prometheus/prometheus/pkg/value"
13+
1214
"github.com/improbable-eng/thanos/pkg/block"
1315
"github.com/prometheus/tsdb/chunkenc"
1416

@@ -21,6 +23,13 @@ import (
2123
"github.com/prometheus/tsdb/labels"
2224
)
2325

26+
// Standard downsampling resolution levels in Thanos.
27+
const (
28+
ResLevel0 = int64(0) // raw data
29+
ResLevel1 = int64(5 * 60 * 1000) // 5 minutes in milliseconds
30+
ResLevel2 = int64(60 * 60 * 1000) // 1 hour in milliseconds
31+
)
32+
2433
// Downsample downsamples the given block. It writes a new block into dir and returns its ID.
2534
func Downsample(
2635
ctx context.Context,
@@ -54,7 +63,7 @@ func Downsample(
5463
return id, errors.Wrap(err, "get all postings list")
5564
}
5665
var (
57-
aggrChunks []*AggrChunk
66+
aggrChunks []AggrChunk
5867
all []sample
5968
chks []chunks.Meta
6069
)
@@ -88,7 +97,7 @@ func Downsample(
8897
if err != nil {
8998
return id, errors.Wrapf(err, "get chunk %d", c.Ref)
9099
}
91-
aggrChunks = append(aggrChunks, chk.(*AggrChunk))
100+
aggrChunks = append(aggrChunks, chk.(AggrChunk))
92101
}
93102
res, err := downsampleAggr(aggrChunks, &all, chks[0].MinTime, chks[len(chks)-1].MaxTime, resolution)
94103
if err != nil {
@@ -99,7 +108,7 @@ func Downsample(
99108
if pall.Err() != nil {
100109
return id, errors.Wrap(pall.Err(), "iterate series set")
101110
}
102-
comp, err := tsdb.NewLeveledCompactor(nil, log.NewNopLogger(), []int64{rng}, aggrPool{})
111+
comp, err := tsdb.NewLeveledCompactor(nil, log.NewNopLogger(), []int64{rng}, AggrChunkPool{})
103112
if err != nil {
104113
return id, errors.Wrap(err, "create compactor")
105114
}
@@ -340,7 +349,7 @@ func (b *aggrChunkBuilder) add(t int64, aggr *aggregator) {
340349
b.added++
341350
}
342351

343-
func (b *aggrChunkBuilder) encode() *AggrChunk {
352+
func (b *aggrChunkBuilder) encode() AggrChunk {
344353
return EncodeAggrChunk(b.chunks)
345354
}
346355

@@ -403,6 +412,9 @@ func downsampleBatch(data []sample, resolution int64, add func(int64, *aggregato
403412
)
404413
// Fill up one aggregate chunk with up to m samples.
405414
for _, s := range data {
415+
if value.IsStaleNaN(s.v) {
416+
continue
417+
}
406418
if s.t > nextT {
407419
if nextT != -1 {
408420
add(nextT, &aggr)
@@ -419,7 +431,7 @@ func downsampleBatch(data []sample, resolution int64, add func(int64, *aggregato
419431
}
420432

421433
// downsampleAggr downsamples a sequence of aggregation chunks to the given resolution.
422-
func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, resolution int64) ([]chunks.Meta, error) {
434+
func downsampleAggr(chks []AggrChunk, buf *[]sample, mint, maxt, resolution int64) ([]chunks.Meta, error) {
423435
// We downsample aggregates only along chunk boundaries. This is required for counters
424436
// to be downsampled correctly since a chunks' last counter value is the true last value
425437
// of the original series. We need to preserve it even across multiple aggregation iterations.
@@ -442,7 +454,6 @@ func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, resolution int
442454
if err != nil {
443455
return nil, err
444456
}
445-
446457
res = append(res, chunks.Meta{
447458
MinTime: mint,
448459
MaxTime: maxt,
@@ -460,8 +471,9 @@ func expandChunkIterator(it chunkenc.Iterator, buf *[]sample) error {
460471
return it.Err()
461472
}
462473

463-
func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (mint, maxt int64, c *AggrChunk, err error) {
474+
func downsampleAggrBatch(chks []AggrChunk, buf *[]sample, resolution int64) (mint, maxt int64, c AggrChunk, err error) {
464475
ab := &aggrChunkBuilder{}
476+
mint, maxt = math.MaxInt64, math.MinInt64
465477

466478
// do does a generic aggregation for count, sum, min, and max aggregates.
467479
// Counters need special treatment.
@@ -486,6 +498,11 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (mi
486498
ab.apps[at], _ = ab.chunks[at].Appender()
487499

488500
downsampleBatch(*buf, resolution, func(t int64, a *aggregator) {
501+
if t < mint {
502+
mint = t
503+
} else if t > maxt {
504+
maxt = t
505+
}
489506
ab.apps[at].Append(t, f(a))
490507
})
491508
return nil
@@ -529,17 +546,22 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (mi
529546
return 0, 0, nil, err
530547
}
531548
if len(*buf) == 0 {
532-
return ab.mint, ab.maxt, ab.encode(), nil
549+
return mint, maxt, ab.encode(), nil
533550
}
534551
ab.chunks[AggrCounter] = chunkenc.NewXORChunk()
535552
ab.apps[AggrCounter], _ = ab.chunks[AggrCounter].Appender()
536553

537554
lastT := downsampleBatch(*buf, resolution, func(t int64, a *aggregator) {
555+
if t < mint {
556+
mint = t
557+
} else if t > maxt {
558+
maxt = t
559+
}
538560
ab.apps[AggrCounter].Append(t, a.counter)
539561
})
540562
ab.apps[AggrCounter].Append(lastT, it.lastV)
541563

542-
return ab.mint, ab.maxt, ab.encode(), nil
564+
return mint, maxt, ab.encode(), nil
543565
}
544566

545567
// isCounter guesses whether a series is a counter based on its label set.
@@ -552,9 +574,9 @@ func isCounter(lset labels.Labels) bool {
552574

553575
// targetChunkSize computes an intended sample count per chunk given a fixed length
554576
// of samples in the series.
555-
// It aims to split the series into chunks of length 120 or higher.
577+
// It aims to split the series into chunks of length 100 or higher.
556578
func targetChunkSize(l int) (c, s int) {
557-
for c = 1; l/c > 240; c++ {
579+
for c = 1; l/c > 200; c++ {
558580
}
559581
return c, (l / c) + 1
560582
}
@@ -571,13 +593,11 @@ type series struct {
571593

572594
// AggrChunk is a chunk that is composed of a set of aggregates for the same underlying data.
573595
// Not all aggregates must be present.
574-
type AggrChunk struct {
575-
b []byte
576-
}
596+
type AggrChunk []byte
577597

578598
// EncodeAggrChunk encodes a new aggregate chunk from the array of chunks for each aggregate.
579599
// Each array entry corresponds to the respective AggrType number.
580-
func EncodeAggrChunk(chks [5]chunkenc.Chunk) *AggrChunk {
600+
func EncodeAggrChunk(chks [5]chunkenc.Chunk) AggrChunk {
581601
var mask byte
582602
var all []chunkenc.Chunk
583603

@@ -599,22 +619,22 @@ func EncodeAggrChunk(chks [5]chunkenc.Chunk) *AggrChunk {
599619
b = append(b, byte(c.Encoding()))
600620
b = append(b, c.Bytes()...)
601621
}
602-
return &AggrChunk{b: b}
622+
return AggrChunk(b)
603623
}
604624

605-
func (c *AggrChunk) Bytes() []byte {
606-
return c.b
625+
func (c AggrChunk) Bytes() []byte {
626+
return []byte(c)
607627
}
608628

609-
func (c *AggrChunk) Appender() (chunkenc.Appender, error) {
629+
func (c AggrChunk) Appender() (chunkenc.Appender, error) {
610630
return nil, errors.New("not implemented")
611631
}
612632

613-
func (c *AggrChunk) Iterator() chunkenc.Iterator {
633+
func (c AggrChunk) Iterator() chunkenc.Iterator {
614634
return chunkenc.NewNopIterator()
615635
}
616636

617-
func (c *AggrChunk) NumSamples() int {
637+
func (c AggrChunk) NumSamples() int {
618638
x, err := c.nth(0)
619639
if err != nil {
620640
return 0
@@ -629,13 +649,13 @@ var ErrAggrNotExist = errors.New("aggregate does not exist")
629649
// It picks the highest number possible to prevent future collisions with upstream encodings.
630650
const ChunkEncAggr = chunkenc.Encoding(0xff)
631651

632-
func (c *AggrChunk) Encoding() chunkenc.Encoding {
652+
func (c AggrChunk) Encoding() chunkenc.Encoding {
633653
return ChunkEncAggr
634654
}
635655

636656
// nth returns the nth chunk present in the aggregated chunk.
637-
func (c *AggrChunk) nth(n uint8) (chunkenc.Chunk, error) {
638-
b := c.b[1:]
657+
func (c AggrChunk) nth(n uint8) (chunkenc.Chunk, error) {
658+
b := c[1:]
639659
var x []byte
640660

641661
for i := uint8(0); i <= n; i++ {
@@ -650,8 +670,8 @@ func (c *AggrChunk) nth(n uint8) (chunkenc.Chunk, error) {
650670
}
651671

652672
// position returns at which position the chunk for the type is located.
653-
func (c *AggrChunk) position(t AggrType) (ok bool, p uint8) {
654-
mask := uint8(c.b[0])
673+
func (c AggrChunk) position(t AggrType) (ok bool, p uint8) {
674+
mask := uint8(c[0])
655675

656676
if mask&(1<<(7-t)) == 0 {
657677
return false, 0
@@ -665,7 +685,7 @@ func (c *AggrChunk) position(t AggrType) (ok bool, p uint8) {
665685
}
666686

667687
// Get returns the sub-chunk for the given aggregate type if it exists.
668-
func (c *AggrChunk) Get(t AggrType) (chunkenc.Chunk, error) {
688+
func (c AggrChunk) Get(t AggrType) (chunkenc.Chunk, error) {
669689
ok, p := c.position(t)
670690
if !ok {
671691
return nil, ErrAggrNotExist
@@ -700,6 +720,10 @@ func (it *CounterSeriesIterator) Next() bool {
700720
return it.Next()
701721
}
702722
t, v := it.chks[it.i].At()
723+
724+
if math.IsNaN(v) {
725+
return it.Next()
726+
}
703727
// First sample sets the initial counter state.
704728
if it.total == 0 {
705729
it.total++
@@ -748,12 +772,53 @@ func (it *CounterSeriesIterator) Err() error {
748772
return it.chks[it.i].Err()
749773
}
750774

751-
type aggrPool struct{}
775+
// AverageChunkIterator emits an artifical series of average samples based in aggregate
776+
// chunks with sum and count aggregates.
777+
type AverageChunkIterator struct {
778+
cntIt chunkenc.Iterator
779+
sumIt chunkenc.Iterator
780+
t int64
781+
v float64
782+
err error
783+
}
784+
785+
func NewAverageChunkIterator(cnt, sum chunkenc.Iterator) *AverageChunkIterator {
786+
return &AverageChunkIterator{cntIt: cnt, sumIt: sum}
787+
}
788+
789+
func (it *AverageChunkIterator) Next() bool {
790+
cok, sok := it.cntIt.Next(), it.sumIt.Next()
791+
if cok != sok {
792+
it.err = errors.New("sum and count iterator not aligned")
793+
return false
794+
}
795+
if !cok {
796+
return false
797+
}
798+
799+
cntT, cntV := it.cntIt.At()
800+
sumT, sumV := it.sumIt.At()
801+
if cntT != sumT {
802+
it.err = errors.New("sum and count timestamps not aligned")
803+
}
804+
it.t, it.v = cntT, sumV/cntV
805+
return true
806+
}
807+
808+
func (it *AverageChunkIterator) At() (int64, float64) {
809+
return it.t, it.v
810+
}
811+
812+
func (it *AverageChunkIterator) Err() error {
813+
return it.err
814+
}
815+
816+
type AggrChunkPool struct{}
752817

753-
func (p aggrPool) Get(e chunkenc.Encoding, b []byte) (chunkenc.Chunk, error) {
754-
return &AggrChunk{b: b}, nil
818+
func (p AggrChunkPool) Get(e chunkenc.Encoding, b []byte) (chunkenc.Chunk, error) {
819+
return AggrChunk(b), nil
755820
}
756821

757-
func (p aggrPool) Put(c chunkenc.Chunk) error {
822+
func (p AggrChunkPool) Put(c chunkenc.Chunk) error {
758823
return nil
759824
}

pkg/compact/downsample/downsample_test.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@ package downsample
33
import (
44
"context"
55
"io/ioutil"
6+
"math"
67
"os"
78
"path/filepath"
89
"testing"
910

11+
"github.com/prometheus/prometheus/pkg/value"
12+
1013
"github.com/prometheus/tsdb/chunks"
1114

1215
"github.com/improbable-eng/thanos/pkg/block"
@@ -58,11 +61,13 @@ type testAggrSeries struct {
5861
}
5962

6063
func TestDownsampleRaw(t *testing.T) {
64+
staleMarker := math.Float64frombits(value.StaleNaN)
65+
6166
input := []*downsampleTestSet{
6267
{
6368
lset: labels.FromStrings("__name__", "a"),
6469
inRaw: []sample{
65-
{20, 1}, {40, 2}, {60, 3}, {80, 1}, {100, 2}, {120, 5}, {180, 10}, {250, 1},
70+
{20, 1}, {40, 2}, {60, 3}, {80, 1}, {100, 2}, {101, staleMarker}, {120, 5}, {180, 10}, {250, 1},
6671
},
6772
output: map[AggrType][]sample{
6873
AggrCount: {{99, 4}, {199, 3}, {299, 1}},
@@ -119,7 +124,7 @@ type testSeries struct {
119124
data []sample
120125
}
121126

122-
func encodeTestAggrSeries(v map[AggrType][]sample) (*AggrChunk, int64, int64) {
127+
func encodeTestAggrSeries(v map[AggrType][]sample) (AggrChunk, int64, int64) {
123128
b := newAggrChunkBuilder(false)
124129

125130
for at, d := range v {
@@ -193,7 +198,7 @@ func testDownsample(t *testing.T, data []*downsampleTestSet, meta *block.Meta, r
193198
testutil.Ok(t, err)
194199
defer indexr.Close()
195200

196-
chunkr, err := chunks.NewDirReader(filepath.Join(dir, id.String(), "chunks"), aggrPool{})
201+
chunkr, err := chunks.NewDirReader(filepath.Join(dir, id.String(), "chunks"), AggrChunkPool{})
197202
testutil.Ok(t, err)
198203
defer chunkr.Close()
199204

@@ -215,7 +220,7 @@ func testDownsample(t *testing.T, data []*downsampleTestSet, meta *block.Meta, r
215220
testutil.Ok(t, err)
216221

217222
for _, at := range []AggrType{AggrCount, AggrSum, AggrMin, AggrMax, AggrCounter} {
218-
c, err := chk.(*AggrChunk).Get(at)
223+
c, err := chk.(AggrChunk).Get(at)
219224
if err == ErrAggrNotExist {
220225
continue
221226
}
@@ -238,12 +243,30 @@ func testDownsample(t *testing.T, data []*downsampleTestSet, meta *block.Meta, r
238243
}
239244
}
240245

246+
func TestAverageChunkIterator(t *testing.T) {
247+
sum := []sample{{100, 30}, {200, 40}, {300, 5}, {400, -10}}
248+
cnt := []sample{{100, 1}, {200, 5}, {300, 2}, {400, 10}}
249+
exp := []sample{{100, 30}, {200, 8}, {300, 2.5}, {400, -1}}
250+
251+
x := NewAverageChunkIterator(newSampleIterator(cnt), newSampleIterator(sum))
252+
253+
var res []sample
254+
for x.Next() {
255+
t, v := x.At()
256+
res = append(res, sample{t, v})
257+
}
258+
testutil.Ok(t, x.Err())
259+
testutil.Equals(t, exp, res)
260+
}
261+
241262
func TestCounterSeriesIterator(t *testing.T) {
263+
staleMarker := math.Float64frombits(value.StaleNaN)
264+
242265
chunks := [][]sample{
243266
{{100, 10}, {200, 20}, {300, 10}, {400, 20}, {400, 5}},
244267
{{500, 10}, {600, 20}, {700, 30}, {800, 40}, {800, 10}}, // no actual reset
245268
{{900, 5}, {1000, 10}, {1100, 15}}, // actual reset
246-
{{1200, 20}, {1300, 40}}, // no special last sample, no reset
269+
{{1200, 20}, {1250, staleMarker}, {1300, 40}}, // no special last sample, no reset
247270
{{1400, 30}, {1500, 30}, {1600, 50}}, // no special last sample, reset
248271
}
249272
exp := []sample{

0 commit comments

Comments
 (0)