-
-
Notifications
You must be signed in to change notification settings - Fork 163
/
Copy pathevent_store.go
228 lines (202 loc) · 6.4 KB
/
event_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package exporter
import (
"context"
"fmt"
"math"
"sync"
"time"
)
const minOffset = int64(math.MinInt64)
type eventStoreImpl[T ExportableEvent] struct {
// events is a list of events to store
events []EventStoreItem[T]
// mutex to protect the events and consumers
mutex sync.RWMutex
// consumers is a map of consumers with their name as key
consumers map[string]*consumer
// lastOffset is the last offset used for the Event store
lastOffset int64
// stopPeriodicCleaning is a channel to stop the periodic cleaning goroutine
stopPeriodicCleaning chan struct{}
// cleanQueueInterval is the duration between each cleaning
cleanQueueInterval time.Duration
}
func NewEventStore[T ExportableEvent](cleanQueueInterval time.Duration) EventStore[T] {
store := &eventStoreImpl[T]{
events: make([]EventStoreItem[T], 0),
mutex: sync.RWMutex{},
lastOffset: minOffset,
stopPeriodicCleaning: make(chan struct{}),
cleanQueueInterval: cleanQueueInterval,
consumers: make(map[string]*consumer),
}
go store.periodicCleanQueue()
return store
}
type EventList[T ExportableEvent] struct {
Events []T
InitialOffset int64
NewOffset int64
}
// EventStore is the interface to store events and consume them.
// It is a simple implementation of a queue with offsets.
type EventStore[T ExportableEvent] interface {
// AddConsumer is adding a new consumer to the Event store.
// note that you can't add a consumer after the Event store has been started.
AddConsumer(consumerID string)
// Add is adding item of type T in the Event store.
Add(data T)
// GetPendingEventCount is returning the number items available in the Event store for this consumer.
GetPendingEventCount(consumerID string) (int64, error)
// GetTotalEventCount returns the total number of events in the store.
GetTotalEventCount() int64
// ProcessPendingEvents is processing all the available item in the Event store for this consumer
// with the process events function in parameter,
ProcessPendingEvents(
consumerID string,
processEventsFunc func(context.Context, []T) error,
) error
// Stop is closing the Event store and stop the periodic cleaning.
Stop()
}
type EventStoreItem[T ExportableEvent] struct {
Offset int64
Data T
}
type consumer struct {
Offset int64
}
// AddConsumer is adding a new consumer to the Event store.
// note that you can't add a consumer after the Event store has been started.
func (e *eventStoreImpl[T]) AddConsumer(consumerID string) {
e.consumers[consumerID] = &consumer{Offset: e.lastOffset}
}
// ProcessPendingEvents is processing all the available item in the Event store for this consumer
// with the process events function in parameter,
func (e *eventStoreImpl[T]) ProcessPendingEvents(
consumerID string, processEventsFunc func(context.Context, []T) error) error {
e.mutex.Lock()
defer e.mutex.Unlock()
eventList, err := e.fetchPendingEvents(consumerID)
if err != nil {
return err
}
err = processEventsFunc(context.Background(), eventList.Events)
if err != nil {
return err
}
err = e.updateConsumerOffset(consumerID, eventList.NewOffset)
if err != nil {
return err
}
return nil
}
// GetTotalEventCount returns the total number of events in the store.
func (e *eventStoreImpl[T]) GetTotalEventCount() int64 {
e.mutex.RLock()
defer e.mutex.RUnlock()
return int64(len(e.events))
}
// GetPendingEventCount is returning the number items available in the Event store for this consumer.
func (e *eventStoreImpl[T]) GetPendingEventCount(consumerID string) (int64, error) {
e.mutex.RLock()
defer e.mutex.RUnlock()
consumer, err := e.getConsumer(consumerID)
if err != nil {
return 0, err
}
return e.lastOffset - consumer.Offset, nil
}
// Add is adding item of type T in the Event store.
func (e *eventStoreImpl[T]) Add(data T) {
e.mutex.Lock()
defer e.mutex.Unlock()
e.lastOffset++
e.events = append(e.events, EventStoreItem[T]{Offset: e.lastOffset, Data: data})
}
// fetchPendingEvents is returning all the available item in the Event store for this consumer.
// WARNING: please call this function only in a function that has locked the mutex first.
func (e *eventStoreImpl[T]) fetchPendingEvents(consumerID string) (*EventList[T], error) {
currentConsumer, err := e.getConsumer(consumerID)
if err != nil {
return nil, err
}
events := make([]T, 0)
for _, event := range e.events {
if event.Offset > currentConsumer.Offset {
events = append(events, event.Data)
}
}
return &EventList[T]{
Events: events,
InitialOffset: currentConsumer.Offset,
NewOffset: e.lastOffset,
}, nil
}
// getConsumer checks if the consumer exists and returns it.
func (e *eventStoreImpl[T]) getConsumer(consumerID string) (*consumer, error) {
currentConsumer, ok := e.consumers[consumerID]
if !ok {
return nil, fmt.Errorf("consumer with name %s not found", consumerID)
}
return currentConsumer, nil
}
// updateConsumerOffset updates the offset of the consumer to the new offset.
// WARNING: please call this function only in a function that has locked the mutex first.
func (e *eventStoreImpl[T]) updateConsumerOffset(consumerID string, offset int64) error {
if offset > e.lastOffset {
return fmt.Errorf(
"invalid offset: offset %d is greater than the last offset %d",
offset,
e.lastOffset,
)
}
currentConsumer, err := e.getConsumer(consumerID)
if err != nil {
return err
}
currentConsumer.Offset = e.lastOffset
return nil
}
// cleanQueue removes all events that have been consumed by all consumers
func (e *eventStoreImpl[T]) cleanQueue() {
e.mutex.Lock()
defer e.mutex.Unlock()
if len(e.events) == 0 {
// nothing to remove
return
}
consumerMinOffset := minOffset
for _, currentConsumer := range e.consumers {
if consumerMinOffset == minOffset || currentConsumer.Offset < consumerMinOffset {
consumerMinOffset = currentConsumer.Offset
}
}
if consumerMinOffset <= minOffset {
// nothing to remove
return
}
for i, event := range e.events {
if event.Offset == consumerMinOffset {
e.events = e.events[i+1:]
break
}
}
}
// periodicCleanQueue periodically cleans the queue
func (e *eventStoreImpl[T]) periodicCleanQueue() {
ticker := time.NewTicker(e.cleanQueueInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
e.cleanQueue()
case <-e.stopPeriodicCleaning:
return
}
}
}
// Stop is closing the Event store and stop the periodic cleaning.
func (e *eventStoreImpl[T]) Stop() {
close(e.stopPeriodicCleaning)
}