-
-
Notifications
You must be signed in to change notification settings - Fork 163
/
Copy pathdata_exporter.go
179 lines (161 loc) · 4.71 KB
/
data_exporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package exporter
import (
"context"
"fmt"
"log"
"log/slog"
"time"
"github.com/thomaspoignant/go-feature-flag/utils/fflog"
)
const (
defaultFlushInterval = 60 * time.Second
defaultMaxEventInMemory = int64(100000)
)
type DataExporter[T ExportableEvent] interface {
// Start is launching the ticker to periodically flush the data
Start()
// Stop is stopping the ticker
Stop()
// Flush is sending the data to the exporter
Flush()
// IsBulk return false if we should directly send the data as soon as it is produce
IsBulk() bool
// GetConsumerID return the consumer ID used in the event store
GetConsumerID() string
// GetMaxEventInMemory return the maximum number of event you keep in the cache before calling Flush()
GetMaxEventInMemory() int64
}
type Config struct {
Exporter CommonExporter
FlushInterval time.Duration
MaxEventInMemory int64
}
type dataExporterImpl[T ExportableEvent] struct {
ctx context.Context
consumerID string
eventStore *EventStore[T]
logger *fflog.FFLogger
exporter Config
daemonChan chan struct{}
ticker *time.Ticker
}
// NewDataExporter create a new DataExporter with the given exporter and his consumer information to consume the data
// from the shared event store.
func NewDataExporter[T ExportableEvent](ctx context.Context, exporter Config, consumerID string,
eventStore *EventStore[T], logger *fflog.FFLogger) DataExporter[T] {
if ctx == nil {
ctx = context.Background()
}
if exporter.FlushInterval == 0 {
exporter.FlushInterval = defaultFlushInterval
}
if exporter.MaxEventInMemory == 0 {
exporter.MaxEventInMemory = defaultMaxEventInMemory
}
return &dataExporterImpl[T]{
ctx: ctx,
consumerID: consumerID,
eventStore: eventStore,
logger: logger,
exporter: exporter,
daemonChan: make(chan struct{}),
ticker: time.NewTicker(exporter.FlushInterval),
}
}
// Start is launching the ticker to periodically flush the data
// If we have a live exporter we don't start the daemon.
func (d *dataExporterImpl[T]) Start() {
// we don't start the daemon if we are not in bulk mode
if !d.IsBulk() {
return
}
for {
select {
case <-d.ticker.C:
d.Flush()
case <-d.daemonChan:
// stop the daemon
return
}
}
}
// Stop is flushing the daya and stopping the ticker
func (d *dataExporterImpl[T]) Stop() {
// we don't start the daemon if we are not in bulk mode
if !d.IsBulk() {
d.Flush()
return
}
d.ticker.Stop()
close(d.daemonChan)
d.Flush()
}
// Flush is sending the data to the exporter
func (d *dataExporterImpl[T]) Flush() {
store := *d.eventStore
err := store.ProcessPendingEvents(d.consumerID, d.sendEvents)
if err != nil {
d.logger.Error(err.Error())
return
}
}
// IsBulk return false if we should directly send the data as soon as it is produce
func (d *dataExporterImpl[T]) IsBulk() bool {
return d.exporter.Exporter.IsBulk()
}
// GetConsumerID return the consumer ID used in the event store
func (d *dataExporterImpl[T]) GetConsumerID() string {
return d.consumerID
}
// GetMaxEventInMemory return the maximum number of event you keep in the cache before calling Flush()
func (d *dataExporterImpl[T]) GetMaxEventInMemory() int64 {
return d.exporter.MaxEventInMemory
}
// sendEvents is sending the events to the exporter.
func (d *dataExporterImpl[T]) sendEvents(ctx context.Context, events []T) error {
if len(events) == 0 {
return nil
}
switch exp := d.exporter.Exporter.(type) {
case DeprecatedExporterV1:
var legacyLogger *log.Logger
if d.logger != nil {
legacyLogger = d.logger.GetLogLogger(slog.LevelError)
}
switch events := any(events).(type) {
case []FeatureEvent:
// use dc exporter as a DeprecatedExporterV1
err := exp.Export(ctx, legacyLogger, events)
slog.Warn("You are using an exporter with the old logger."+
"Please update your custom exporter to comply to the new Exporter interface.",
slog.Any("err", err))
if err != nil {
return fmt.Errorf("error while exporting data (deprecated): %w", err)
}
default:
return fmt.Errorf("trying to send unknown object to the exporter (deprecated)")
}
case DeprecatedExporterV2:
switch events := any(events).(type) {
case []FeatureEvent:
err := exp.Export(ctx, d.logger, events)
if err != nil {
return fmt.Errorf("error while exporting data: %w", err)
}
default:
return fmt.Errorf("trying to send unknown object to the exporter")
}
case Exporter:
exportableEvents := make([]ExportableEvent, len(events))
for i, event := range events {
exportableEvents[i] = ExportableEvent(event)
}
err := exp.Export(ctx, d.logger, exportableEvents)
if err != nil {
return fmt.Errorf("error while exporting data: %w", err)
}
default:
return fmt.Errorf("this is not a valid exporter")
}
return nil
}