Skip to content

Commit c8f3533

Browse files
authored
Add PubSub event metadata (dapr#490)
* Map event metadata Signed-off-by: Joni Collinge <jonathancollinge@live.com> * Feedback Signed-off-by: Joni Collinge <jonathancollinge@live.com> * Lint Signed-off-by: Joni Collinge <jonathancollinge@live.com> --------- Signed-off-by: Joni Collinge <jonathancollinge@live.com>
1 parent 61158e8 commit c8f3533

File tree

7 files changed

+163
-17
lines changed

7 files changed

+163
-17
lines changed

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ require (
2020
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
2121
go.opentelemetry.io/otel v1.16.0 // indirect
2222
go.opentelemetry.io/otel/trace v1.16.0 // indirect
23-
golang.org/x/net v0.15.0 // indirect
24-
golang.org/x/sys v0.12.0 // indirect
25-
golang.org/x/text v0.13.0 // indirect
23+
golang.org/x/net v0.19.0 // indirect
24+
golang.org/x/sys v0.15.0 // indirect
25+
golang.org/x/text v0.14.0 // indirect
2626
google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 // indirect
2727
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
2828
)

go.sum

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,22 +36,22 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
3636
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
3737
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
3838
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
39-
golang.org/x/net v0.15.0 h1:ugBLEUaxABaB5AJqW9enI0ACdci2RUd4eP51NTBvuJ8=
40-
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
39+
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
40+
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
4141
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
4242
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
4343
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
4444
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
4545
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
4646
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
4747
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
48-
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
49-
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
48+
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
49+
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
5050
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
5151
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
5252
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
53-
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
54-
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
53+
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
54+
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
5555
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
5656
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
5757
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=

service/common/type.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ type TopicEvent struct {
4545
Topic string `json:"topic"`
4646
// PubsubName is name of the pub/sub this message came from
4747
PubsubName string `json:"pubsubname"`
48+
// Metadata is the custom metadata attached to the event.
49+
Metadata map[string]string `json:"metadata,omitempty"`
4850
}
4951

5052
func (e *TopicEvent) Struct(target interface{}) error {

service/grpc/topic.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"strings"
2323

2424
"github.com/golang/protobuf/ptypes/empty"
25+
"google.golang.org/grpc/metadata"
2526

2627
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
2728
"github.com/dapr/go-sdk/service/common"
@@ -127,6 +128,7 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq
127128
RawData: in.GetData(),
128129
Topic: in.GetTopic(),
129130
PubsubName: in.GetPubsubName(),
131+
Metadata: getCustomMetadataFromContext(ctx),
130132
}
131133
h := sub.DefaultHandler
132134
if in.GetPath() != "" {
@@ -154,3 +156,16 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *runtimev1pb.TopicEventReq
154156
in.GetPubsubName(), in.GetTopic(),
155157
)
156158
}
159+
160+
func getCustomMetadataFromContext(ctx context.Context) map[string]string {
161+
md := make(map[string]string)
162+
meta, ok := metadata.FromIncomingContext(ctx)
163+
if ok {
164+
for k, v := range meta {
165+
if strings.HasPrefix(strings.ToLower(k), "metadata.") {
166+
md[k[9:]] = v[0]
167+
}
168+
}
169+
}
170+
return md
171+
}

service/grpc/topic_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"testing"
2020

2121
"github.com/stretchr/testify/require"
22+
"google.golang.org/grpc/metadata"
2223

2324
"github.com/golang/protobuf/ptypes/empty"
2425
"github.com/stretchr/testify/assert"
@@ -136,6 +137,32 @@ func TestTopic(t *testing.T) {
136137
require.NoError(t, err)
137138
})
138139

140+
t.Run("topic event for valid topic with metadata", func(t *testing.T) {
141+
sub2 := &common.Subscription{
142+
PubsubName: "messages",
143+
Topic: "test2",
144+
}
145+
err := server.AddTopicEventHandler(sub2, func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
146+
assert.Equal(t, "value1", e.Metadata["key1"])
147+
return false, nil
148+
})
149+
require.NoError(t, err)
150+
151+
in := &runtime.TopicEventRequest{
152+
Id: "a123",
153+
Source: "test",
154+
Type: "test",
155+
SpecVersion: "v1.0",
156+
DataContentType: "text/plain",
157+
Data: []byte("test"),
158+
Topic: sub2.Topic,
159+
PubsubName: sub2.PubsubName,
160+
}
161+
ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"Metadata.key1": "value1"}))
162+
_, err = server.OnTopicEvent(ctx, in)
163+
require.NoError(t, err)
164+
})
165+
139166
stopTestServer(t, server)
140167
}
141168

service/http/topic.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"errors"
2020
"io"
2121
"net/http"
22+
"strings"
2223

2324
"github.com/go-chi/chi/v5"
2425

@@ -278,6 +279,7 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
278279
Subject: in.Subject,
279280
PubsubName: in.PubsubName,
280281
Topic: in.Topic,
282+
Metadata: getCustomMetdataFromHeaders(r),
281283
}
282284

283285
w.Header().Add("Content-Type", "application/json")
@@ -301,6 +303,16 @@ func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicE
301303
return nil
302304
}
303305

306+
func getCustomMetdataFromHeaders(r *http.Request) map[string]string {
307+
md := make(map[string]string)
308+
for k, v := range r.Header {
309+
if strings.HasPrefix(strings.ToLower(k), "metadata.") {
310+
md[k[9:]] = v[0]
311+
}
312+
}
313+
return md
314+
}
315+
304316
func writeStatus(w http.ResponseWriter, s string) {
305317
status := &common.SubscriptionResponse{Status: s}
306318
if err := json.NewEncoder(w).Encode(status); err != nil {

service/http/topic_test.go

Lines changed: 98 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ func TestEventHandler(t *testing.T) {
148148

149149
func TestEventDataHandling(t *testing.T) {
150150
tests := map[string]struct {
151-
data string
152-
result interface{}
151+
data string
152+
expectedData interface{}
153153
}{
154154
"JSON nested": {
155155
data: `{
@@ -166,7 +166,7 @@ func TestEventDataHandling(t *testing.T) {
166166
"message":"hello"
167167
}
168168
}`,
169-
result: map[string]interface{}{
169+
expectedData: map[string]interface{}{
170170
"message": "hello",
171171
},
172172
},
@@ -183,7 +183,7 @@ func TestEventDataHandling(t *testing.T) {
183183
"datacontenttype" : "application/json",
184184
"data" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ=="
185185
}`,
186-
result: map[string]interface{}{
186+
expectedData: map[string]interface{}{
187187
"message": "hello",
188188
},
189189
},
@@ -200,7 +200,7 @@ func TestEventDataHandling(t *testing.T) {
200200
"datacontenttype" : "application/json",
201201
"data_base64" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ=="
202202
}`,
203-
result: map[string]interface{}{
203+
expectedData: map[string]interface{}{
204204
"message": "hello",
205205
},
206206
},
@@ -217,7 +217,7 @@ func TestEventDataHandling(t *testing.T) {
217217
"datacontenttype" : "application/octet-stream",
218218
"data_base64" : "eyJtZXNzYWdlIjoiaGVsbG8ifQ=="
219219
}`,
220-
result: []byte(`{"message":"hello"}`),
220+
expectedData: []byte(`{"message":"hello"}`),
221221
},
222222
"JSON string escaped": {
223223
data: `{
@@ -232,7 +232,7 @@ func TestEventDataHandling(t *testing.T) {
232232
"datacontenttype" : "application/json",
233233
"data" : "{\"message\":\"hello\"}"
234234
}`,
235-
result: map[string]interface{}{
235+
expectedData: map[string]interface{}{
236236
"message": "hello",
237237
},
238238
},
@@ -264,7 +264,85 @@ func TestEventDataHandling(t *testing.T) {
264264
t.Run(name, func(t *testing.T) {
265265
makeEventRequest(t, s, "/test", tt.data, http.StatusOK)
266266
<-recv
267-
assert.Equal(t, tt.result, topicEvent.Data)
267+
assert.Equal(t, tt.expectedData, topicEvent.Data)
268+
})
269+
}
270+
}
271+
272+
func TestEventMetadataHandling(t *testing.T) {
273+
tests := map[string]struct {
274+
metadata map[string]string
275+
expectedMetadata map[string]string
276+
}{
277+
"single key-value pair with prefix": {
278+
metadata: map[string]string{
279+
"metadata.key1": "value1",
280+
},
281+
expectedMetadata: map[string]string{
282+
"key1": "value1",
283+
},
284+
},
285+
"multiple key-value pairs with prefix": {
286+
metadata: map[string]string{
287+
"metadata.key1": "value1",
288+
"metadata.key2": "value2",
289+
},
290+
expectedMetadata: map[string]string{
291+
"key1": "value1",
292+
"key2": "value2",
293+
},
294+
},
295+
"some keys with prefix and some without": {
296+
metadata: map[string]string{
297+
"metadata.key1": "value1",
298+
"key2": "value2",
299+
},
300+
expectedMetadata: map[string]string{
301+
"key1": "value1",
302+
},
303+
},
304+
}
305+
306+
s := newServer("", nil)
307+
308+
sub := &common.Subscription{
309+
PubsubName: "messages",
310+
Topic: "test",
311+
Route: "/test",
312+
Metadata: map[string]string{},
313+
}
314+
315+
recv := make(chan struct{}, 1)
316+
var topicEvent *common.TopicEvent
317+
handler := func(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
318+
topicEvent = e
319+
recv <- struct{}{}
320+
321+
return false, nil
322+
}
323+
err := s.AddTopicEventHandler(sub, handler)
324+
require.NoErrorf(t, err, "error adding event handler")
325+
326+
s.registerBaseHandler()
327+
328+
for name, tt := range tests {
329+
t.Run(name, func(t *testing.T) {
330+
makeEventRequestWithMetadata(t, s, "/test", `{
331+
"specversion" : "1.0",
332+
"type" : "com.github.pull.create",
333+
"source" : "https://github.com/cloudevents/spec/pull",
334+
"subject" : "123",
335+
"id" : "A234-1234-1234",
336+
"time" : "2018-04-05T17:31:00Z",
337+
"comexampleextension1" : "value",
338+
"comexampleothervalue" : 5,
339+
"datacontenttype" : "application/json",
340+
"data" : {
341+
"message":"hello"
342+
}
343+
}`, http.StatusOK, tt.metadata)
344+
<-recv
345+
assert.Equal(t, tt.expectedMetadata, topicEvent.Metadata)
268346
})
269347
}
270348
}
@@ -357,6 +435,18 @@ func makeEventRequest(t *testing.T, s *Server, route, data string, expectedStatu
357435
testRequest(t, s, req, expectedStatusCode)
358436
}
359437

438+
func makeEventRequestWithMetadata(t *testing.T, s *Server, route, data string, expectedStatusCode int, metadata map[string]string) {
439+
t.Helper()
440+
441+
req, err := http.NewRequest(http.MethodPost, route, strings.NewReader(data))
442+
require.NoErrorf(t, err, "error creating request: %s", data)
443+
req.Header.Set("Content-Type", "application/json")
444+
for k, v := range metadata {
445+
req.Header.Set(k, v)
446+
}
447+
testRequest(t, s, req, expectedStatusCode)
448+
}
449+
360450
func TestAddingInvalidEventHandlers(t *testing.T) {
361451
s := newServer("", nil)
362452
err := s.AddTopicEventHandler(nil, testTopicFunc)

0 commit comments

Comments
 (0)