Skip to content

Commit e03fdaa

Browse files
authored
add support for recovery section in event streams (zalando#2421)
1 parent 102a22e commit e03fdaa

File tree

7 files changed

+133
-18
lines changed

7 files changed

+133
-18
lines changed

pkg/apis/acid.zalan.do/v1/postgresql_type.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -247,16 +247,18 @@ type ConnectionPooler struct {
247247

248248
// Stream defines properties for creating FabricEventStream resources
249249
type Stream struct {
250-
ApplicationId string `json:"applicationId"`
251-
Database string `json:"database"`
252-
Tables map[string]StreamTable `json:"tables"`
253-
Filter map[string]*string `json:"filter,omitempty"`
254-
BatchSize *uint32 `json:"batchSize,omitempty"`
250+
ApplicationId string `json:"applicationId"`
251+
Database string `json:"database"`
252+
Tables map[string]StreamTable `json:"tables"`
253+
Filter map[string]*string `json:"filter,omitempty"`
254+
BatchSize *uint32 `json:"batchSize,omitempty"`
255+
EnableRecovery *bool `json:"enableRecovery,omitempty"`
255256
}
256257

257258
// StreamTable defines properties of outbox tables for FabricEventStreams
258259
type StreamTable struct {
259-
EventType string `json:"eventType"`
260-
IdColumn *string `json:"idColumn,omitempty"`
261-
PayloadColumn *string `json:"payloadColumn,omitempty"`
260+
EventType string `json:"eventType"`
261+
RecoveryEventType string `json:"recoveryEventType"`
262+
IdColumn *string `json:"idColumn,omitempty"`
263+
PayloadColumn *string `json:"payloadColumn,omitempty"`
262264
}

pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/apis/zalando.org/v1/fabriceventstream.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@ type FabricEventStreamList struct {
3333

3434
// EventStream defines the source, flow and sink of the event stream
3535
type EventStream struct {
36-
EventStreamFlow EventStreamFlow `json:"flow"`
37-
EventStreamSink EventStreamSink `json:"sink"`
38-
EventStreamSource EventStreamSource `json:"source"`
36+
EventStreamFlow EventStreamFlow `json:"flow"`
37+
EventStreamSink EventStreamSink `json:"sink"`
38+
EventStreamSource EventStreamSource `json:"source"`
39+
EventStreamRecovery EventStreamRecovery `json:"recovery"`
3940
}
4041

4142
// EventStreamFlow defines the flow characteristics of the event stream
@@ -51,6 +52,12 @@ type EventStreamSink struct {
5152
MaxBatchSize *uint32 `json:"maxBatchSize,omitempty"`
5253
}
5354

55+
// EventStreamRecovery defines the target of dead letter queue
56+
type EventStreamRecovery struct {
57+
Type string `json:"type"`
58+
Sink *EventStreamSink `json:"sink"`
59+
}
60+
5461
// EventStreamSource defines the source of the event stream and connection for FES operator
5562
type EventStreamSource struct {
5663
Type string `json:"type"`

pkg/apis/zalando.org/v1/zz_generated.deepcopy.go

Lines changed: 53 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/cluster/streams.go

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,13 @@ func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEvent
145145
streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn)
146146
streamFlow := getEventStreamFlow(stream, table.PayloadColumn)
147147
streamSink := getEventStreamSink(stream, table.EventType)
148+
streamRecovery := getEventStreamRecovery(stream, table.RecoveryEventType, table.EventType)
148149

149150
eventStreams = append(eventStreams, zalandov1.EventStream{
150-
EventStreamFlow: streamFlow,
151-
EventStreamSink: streamSink,
152-
EventStreamSource: streamSource})
151+
EventStreamFlow: streamFlow,
152+
EventStreamRecovery: streamRecovery,
153+
EventStreamSink: streamSink,
154+
EventStreamSource: streamSource})
153155
}
154156
}
155157

@@ -204,6 +206,28 @@ func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1.EventS
204206
}
205207
}
206208

209+
func getEventStreamRecovery(stream acidv1.Stream, recoveryEventType, eventType string) zalandov1.EventStreamRecovery {
210+
if (stream.EnableRecovery != nil && !*stream.EnableRecovery) ||
211+
(stream.EnableRecovery == nil && recoveryEventType == "") {
212+
return zalandov1.EventStreamRecovery{
213+
Type: constants.EventStreamRecoveryNoneType,
214+
}
215+
}
216+
217+
if stream.EnableRecovery != nil && *stream.EnableRecovery && recoveryEventType == "" {
218+
recoveryEventType = fmt.Sprintf("%s-%s", eventType, constants.EventStreamRecoverySuffix)
219+
}
220+
221+
return zalandov1.EventStreamRecovery{
222+
Type: constants.EventStreamRecoveryDLQType,
223+
Sink: &zalandov1.EventStreamSink{
224+
Type: constants.EventStreamSinkNakadiType,
225+
EventType: recoveryEventType,
226+
MaxBatchSize: stream.BatchSize,
227+
},
228+
}
229+
}
230+
207231
func getTableSchema(fullTableName string) (tableName, schemaName string) {
208232
schemaName = "public"
209233
tableName = fullTableName
@@ -381,7 +405,8 @@ func sameStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (matc
381405
for _, curStream := range curEventStreams {
382406
if reflect.DeepEqual(newStream.EventStreamSource, curStream.EventStreamSource) &&
383407
reflect.DeepEqual(newStream.EventStreamFlow, curStream.EventStreamFlow) &&
384-
reflect.DeepEqual(newStream.EventStreamSink, curStream.EventStreamSink) {
408+
reflect.DeepEqual(newStream.EventStreamSink, curStream.EventStreamSink) &&
409+
reflect.DeepEqual(newStream.EventStreamRecovery, curStream.EventStreamRecovery) {
385410
match = true
386411
break
387412
}

pkg/cluster/streams_test.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,11 @@ var (
6565
PayloadColumn: k8sutil.StringToPointer("b_payload"),
6666
},
6767
"data.foobar": acidv1.StreamTable{
68-
EventType: "stream-type-b",
68+
EventType: "stream-type-b",
69+
RecoveryEventType: "stream-type-b-dlq",
6970
},
7071
},
72+
EnableRecovery: util.True(),
7173
Filter: map[string]*string{
7274
"data.bar": k8sutil.StringToPointer("[?(@.source.txId > 500 && @.source.lsn > 123456)]"),
7375
},
@@ -106,6 +108,14 @@ var (
106108
PayloadColumn: k8sutil.StringToPointer("b_payload"),
107109
Type: constants.EventStreamFlowPgGenericType,
108110
},
111+
EventStreamRecovery: zalandov1.EventStreamRecovery{
112+
Type: constants.EventStreamRecoveryDLQType,
113+
Sink: &zalandov1.EventStreamSink{
114+
EventType: fmt.Sprintf("%s-%s", "stream-type-a", constants.EventStreamRecoverySuffix),
115+
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
116+
Type: constants.EventStreamSinkNakadiType,
117+
},
118+
},
109119
EventStreamSink: zalandov1.EventStreamSink{
110120
EventType: "stream-type-a",
111121
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
@@ -136,6 +146,14 @@ var (
136146
EventStreamFlow: zalandov1.EventStreamFlow{
137147
Type: constants.EventStreamFlowPgGenericType,
138148
},
149+
EventStreamRecovery: zalandov1.EventStreamRecovery{
150+
Type: constants.EventStreamRecoveryDLQType,
151+
Sink: &zalandov1.EventStreamSink{
152+
EventType: "stream-type-b-dlq",
153+
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
154+
Type: constants.EventStreamSinkNakadiType,
155+
},
156+
},
139157
EventStreamSink: zalandov1.EventStreamSink{
140158
EventType: "stream-type-b",
141159
MaxBatchSize: k8sutil.UInt32ToPointer(uint32(100)),
@@ -251,7 +269,8 @@ func TestSameStreams(t *testing.T) {
251269
testName := "TestSameStreams"
252270

253271
stream1 := zalandov1.EventStream{
254-
EventStreamFlow: zalandov1.EventStreamFlow{},
272+
EventStreamFlow: zalandov1.EventStreamFlow{},
273+
EventStreamRecovery: zalandov1.EventStreamRecovery{},
255274
EventStreamSink: zalandov1.EventStreamSink{
256275
EventType: "stream-type-a",
257276
},
@@ -263,7 +282,8 @@ func TestSameStreams(t *testing.T) {
263282
}
264283

265284
stream2 := zalandov1.EventStream{
266-
EventStreamFlow: zalandov1.EventStreamFlow{},
285+
EventStreamFlow: zalandov1.EventStreamFlow{},
286+
EventStreamRecovery: zalandov1.EventStreamRecovery{},
267287
EventStreamSink: zalandov1.EventStreamSink{
268288
EventType: "stream-type-b",
269289
},

pkg/util/constants/streams.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,7 @@ const (
1111
EventStreamSourceAuthType = "DatabaseAuthenticationSecret"
1212
EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent"
1313
EventStreamSinkNakadiType = "Nakadi"
14+
EventStreamRecoveryNoneType = "None"
15+
EventStreamRecoveryDLQType = "DeadLetter"
16+
EventStreamRecoverySuffix = "dead-letter-queue"
1417
)

0 commit comments

Comments
 (0)