9
9
"sort"
10
10
"strings"
11
11
12
+ "github.com/prometheus/prometheus/pkg/value"
13
+
12
14
"github.com/improbable-eng/thanos/pkg/block"
13
15
"github.com/prometheus/tsdb/chunkenc"
14
16
@@ -21,6 +23,13 @@ import (
21
23
"github.com/prometheus/tsdb/labels"
22
24
)
23
25
26
+ // Standard downsampling resolution levels in Thanos.
27
+ const (
28
+ ResLevel0 = int64 (0 ) // raw data
29
+ ResLevel1 = int64 (5 * 60 * 1000 ) // 5 minutes in milliseconds
30
+ ResLevel2 = int64 (60 * 60 * 1000 ) // 1 hour in milliseconds
31
+ )
32
+
24
33
// Downsample downsamples the given block. It writes a new block into dir and returns its ID.
25
34
func Downsample (
26
35
ctx context.Context ,
@@ -54,7 +63,7 @@ func Downsample(
54
63
return id , errors .Wrap (err , "get all postings list" )
55
64
}
56
65
var (
57
- aggrChunks []* AggrChunk
66
+ aggrChunks []AggrChunk
58
67
all []sample
59
68
chks []chunks.Meta
60
69
)
@@ -88,7 +97,7 @@ func Downsample(
88
97
if err != nil {
89
98
return id , errors .Wrapf (err , "get chunk %d" , c .Ref )
90
99
}
91
- aggrChunks = append (aggrChunks , chk .(* AggrChunk ))
100
+ aggrChunks = append (aggrChunks , chk .(AggrChunk ))
92
101
}
93
102
res , err := downsampleAggr (aggrChunks , & all , chks [0 ].MinTime , chks [len (chks )- 1 ].MaxTime , resolution )
94
103
if err != nil {
@@ -99,7 +108,7 @@ func Downsample(
99
108
if pall .Err () != nil {
100
109
return id , errors .Wrap (pall .Err (), "iterate series set" )
101
110
}
102
- comp , err := tsdb .NewLeveledCompactor (nil , log .NewNopLogger (), []int64 {rng }, aggrPool {})
111
+ comp , err := tsdb .NewLeveledCompactor (nil , log .NewNopLogger (), []int64 {rng }, AggrChunkPool {})
103
112
if err != nil {
104
113
return id , errors .Wrap (err , "create compactor" )
105
114
}
@@ -340,7 +349,7 @@ func (b *aggrChunkBuilder) add(t int64, aggr *aggregator) {
340
349
b .added ++
341
350
}
342
351
343
- func (b * aggrChunkBuilder ) encode () * AggrChunk {
352
+ func (b * aggrChunkBuilder ) encode () AggrChunk {
344
353
return EncodeAggrChunk (b .chunks )
345
354
}
346
355
@@ -403,6 +412,9 @@ func downsampleBatch(data []sample, resolution int64, add func(int64, *aggregato
403
412
)
404
413
// Fill up one aggregate chunk with up to m samples.
405
414
for _ , s := range data {
415
+ if value .IsStaleNaN (s .v ) {
416
+ continue
417
+ }
406
418
if s .t > nextT {
407
419
if nextT != - 1 {
408
420
add (nextT , & aggr )
@@ -419,7 +431,7 @@ func downsampleBatch(data []sample, resolution int64, add func(int64, *aggregato
419
431
}
420
432
421
433
// downsampleAggr downsamples a sequence of aggregation chunks to the given resolution.
422
- func downsampleAggr (chks []* AggrChunk , buf * []sample , mint , maxt , resolution int64 ) ([]chunks.Meta , error ) {
434
+ func downsampleAggr (chks []AggrChunk , buf * []sample , mint , maxt , resolution int64 ) ([]chunks.Meta , error ) {
423
435
// We downsample aggregates only along chunk boundaries. This is required for counters
424
436
// to be downsampled correctly since a chunks' last counter value is the true last value
425
437
// of the original series. We need to preserve it even across multiple aggregation iterations.
@@ -442,7 +454,6 @@ func downsampleAggr(chks []*AggrChunk, buf *[]sample, mint, maxt, resolution int
442
454
if err != nil {
443
455
return nil , err
444
456
}
445
-
446
457
res = append (res , chunks.Meta {
447
458
MinTime : mint ,
448
459
MaxTime : maxt ,
@@ -460,8 +471,9 @@ func expandChunkIterator(it chunkenc.Iterator, buf *[]sample) error {
460
471
return it .Err ()
461
472
}
462
473
463
- func downsampleAggrBatch (chks []* AggrChunk , buf * []sample , resolution int64 ) (mint , maxt int64 , c * AggrChunk , err error ) {
474
+ func downsampleAggrBatch (chks []AggrChunk , buf * []sample , resolution int64 ) (mint , maxt int64 , c AggrChunk , err error ) {
464
475
ab := & aggrChunkBuilder {}
476
+ mint , maxt = math .MaxInt64 , math .MinInt64
465
477
466
478
// do does a generic aggregation for count, sum, min, and max aggregates.
467
479
// Counters need special treatment.
@@ -486,6 +498,11 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (mi
486
498
ab .apps [at ], _ = ab .chunks [at ].Appender ()
487
499
488
500
downsampleBatch (* buf , resolution , func (t int64 , a * aggregator ) {
501
+ if t < mint {
502
+ mint = t
503
+ } else if t > maxt {
504
+ maxt = t
505
+ }
489
506
ab .apps [at ].Append (t , f (a ))
490
507
})
491
508
return nil
@@ -529,17 +546,22 @@ func downsampleAggrBatch(chks []*AggrChunk, buf *[]sample, resolution int64) (mi
529
546
return 0 , 0 , nil , err
530
547
}
531
548
if len (* buf ) == 0 {
532
- return ab . mint , ab . maxt , ab .encode (), nil
549
+ return mint , maxt , ab .encode (), nil
533
550
}
534
551
ab .chunks [AggrCounter ] = chunkenc .NewXORChunk ()
535
552
ab .apps [AggrCounter ], _ = ab .chunks [AggrCounter ].Appender ()
536
553
537
554
lastT := downsampleBatch (* buf , resolution , func (t int64 , a * aggregator ) {
555
+ if t < mint {
556
+ mint = t
557
+ } else if t > maxt {
558
+ maxt = t
559
+ }
538
560
ab .apps [AggrCounter ].Append (t , a .counter )
539
561
})
540
562
ab .apps [AggrCounter ].Append (lastT , it .lastV )
541
563
542
- return ab . mint , ab . maxt , ab .encode (), nil
564
+ return mint , maxt , ab .encode (), nil
543
565
}
544
566
545
567
// isCounter guesses whether a series is a counter based on its label set.
@@ -552,9 +574,9 @@ func isCounter(lset labels.Labels) bool {
552
574
553
575
// targetChunkSize computes an intended sample count per chunk given a fixed length
554
576
// of samples in the series.
555
- // It aims to split the series into chunks of length 120 or higher.
577
+ // It aims to split the series into chunks of length 100 or higher.
556
578
func targetChunkSize (l int ) (c , s int ) {
557
- for c = 1 ; l / c > 240 ; c ++ {
579
+ for c = 1 ; l / c > 200 ; c ++ {
558
580
}
559
581
return c , (l / c ) + 1
560
582
}
@@ -571,13 +593,11 @@ type series struct {
571
593
572
594
// AggrChunk is a chunk that is composed of a set of aggregates for the same underlying data.
573
595
// Not all aggregates must be present.
574
- type AggrChunk struct {
575
- b []byte
576
- }
596
+ type AggrChunk []byte
577
597
578
598
// EncodeAggrChunk encodes a new aggregate chunk from the array of chunks for each aggregate.
579
599
// Each array entry corresponds to the respective AggrType number.
580
- func EncodeAggrChunk (chks [5 ]chunkenc.Chunk ) * AggrChunk {
600
+ func EncodeAggrChunk (chks [5 ]chunkenc.Chunk ) AggrChunk {
581
601
var mask byte
582
602
var all []chunkenc.Chunk
583
603
@@ -599,22 +619,22 @@ func EncodeAggrChunk(chks [5]chunkenc.Chunk) *AggrChunk {
599
619
b = append (b , byte (c .Encoding ()))
600
620
b = append (b , c .Bytes ()... )
601
621
}
602
- return & AggrChunk { b : b }
622
+ return AggrChunk ( b )
603
623
}
604
624
605
- func (c * AggrChunk ) Bytes () []byte {
606
- return c . b
625
+ func (c AggrChunk ) Bytes () []byte {
626
+ return [] byte ( c )
607
627
}
608
628
609
- func (c * AggrChunk ) Appender () (chunkenc.Appender , error ) {
629
+ func (c AggrChunk ) Appender () (chunkenc.Appender , error ) {
610
630
return nil , errors .New ("not implemented" )
611
631
}
612
632
613
- func (c * AggrChunk ) Iterator () chunkenc.Iterator {
633
+ func (c AggrChunk ) Iterator () chunkenc.Iterator {
614
634
return chunkenc .NewNopIterator ()
615
635
}
616
636
617
- func (c * AggrChunk ) NumSamples () int {
637
+ func (c AggrChunk ) NumSamples () int {
618
638
x , err := c .nth (0 )
619
639
if err != nil {
620
640
return 0
@@ -629,13 +649,13 @@ var ErrAggrNotExist = errors.New("aggregate does not exist")
629
649
// It picks the highest number possible to prevent future collisions with upstream encodings.
630
650
const ChunkEncAggr = chunkenc .Encoding (0xff )
631
651
632
- func (c * AggrChunk ) Encoding () chunkenc.Encoding {
652
+ func (c AggrChunk ) Encoding () chunkenc.Encoding {
633
653
return ChunkEncAggr
634
654
}
635
655
636
656
// nth returns the nth chunk present in the aggregated chunk.
637
- func (c * AggrChunk ) nth (n uint8 ) (chunkenc.Chunk , error ) {
638
- b := c . b [1 :]
657
+ func (c AggrChunk ) nth (n uint8 ) (chunkenc.Chunk , error ) {
658
+ b := c [1 :]
639
659
var x []byte
640
660
641
661
for i := uint8 (0 ); i <= n ; i ++ {
@@ -650,8 +670,8 @@ func (c *AggrChunk) nth(n uint8) (chunkenc.Chunk, error) {
650
670
}
651
671
652
672
// position returns at which position the chunk for the type is located.
653
- func (c * AggrChunk ) position (t AggrType ) (ok bool , p uint8 ) {
654
- mask := uint8 (c . b [0 ])
673
+ func (c AggrChunk ) position (t AggrType ) (ok bool , p uint8 ) {
674
+ mask := uint8 (c [0 ])
655
675
656
676
if mask & (1 << (7 - t )) == 0 {
657
677
return false , 0
@@ -665,7 +685,7 @@ func (c *AggrChunk) position(t AggrType) (ok bool, p uint8) {
665
685
}
666
686
667
687
// Get returns the sub-chunk for the given aggregate type if it exists.
668
- func (c * AggrChunk ) Get (t AggrType ) (chunkenc.Chunk , error ) {
688
+ func (c AggrChunk ) Get (t AggrType ) (chunkenc.Chunk , error ) {
669
689
ok , p := c .position (t )
670
690
if ! ok {
671
691
return nil , ErrAggrNotExist
@@ -700,6 +720,10 @@ func (it *CounterSeriesIterator) Next() bool {
700
720
return it .Next ()
701
721
}
702
722
t , v := it .chks [it .i ].At ()
723
+
724
+ if math .IsNaN (v ) {
725
+ return it .Next ()
726
+ }
703
727
// First sample sets the initial counter state.
704
728
if it .total == 0 {
705
729
it .total ++
@@ -748,12 +772,53 @@ func (it *CounterSeriesIterator) Err() error {
748
772
return it .chks [it .i ].Err ()
749
773
}
750
774
751
- type aggrPool struct {}
775
+ // AverageChunkIterator emits an artifical series of average samples based in aggregate
776
+ // chunks with sum and count aggregates.
777
+ type AverageChunkIterator struct {
778
+ cntIt chunkenc.Iterator
779
+ sumIt chunkenc.Iterator
780
+ t int64
781
+ v float64
782
+ err error
783
+ }
784
+
785
+ func NewAverageChunkIterator (cnt , sum chunkenc.Iterator ) * AverageChunkIterator {
786
+ return & AverageChunkIterator {cntIt : cnt , sumIt : sum }
787
+ }
788
+
789
+ func (it * AverageChunkIterator ) Next () bool {
790
+ cok , sok := it .cntIt .Next (), it .sumIt .Next ()
791
+ if cok != sok {
792
+ it .err = errors .New ("sum and count iterator not aligned" )
793
+ return false
794
+ }
795
+ if ! cok {
796
+ return false
797
+ }
798
+
799
+ cntT , cntV := it .cntIt .At ()
800
+ sumT , sumV := it .sumIt .At ()
801
+ if cntT != sumT {
802
+ it .err = errors .New ("sum and count timestamps not aligned" )
803
+ }
804
+ it .t , it .v = cntT , sumV / cntV
805
+ return true
806
+ }
807
+
808
+ func (it * AverageChunkIterator ) At () (int64 , float64 ) {
809
+ return it .t , it .v
810
+ }
811
+
812
+ func (it * AverageChunkIterator ) Err () error {
813
+ return it .err
814
+ }
815
+
816
+ type AggrChunkPool struct {}
752
817
753
- func (p aggrPool ) Get (e chunkenc.Encoding , b []byte ) (chunkenc.Chunk , error ) {
754
- return & AggrChunk { b : b } , nil
818
+ func (p AggrChunkPool ) Get (e chunkenc.Encoding , b []byte ) (chunkenc.Chunk , error ) {
819
+ return AggrChunk ( b ) , nil
755
820
}
756
821
757
- func (p aggrPool ) Put (c chunkenc.Chunk ) error {
822
+ func (p AggrChunkPool ) Put (c chunkenc.Chunk ) error {
758
823
return nil
759
824
}
0 commit comments