Skip to content

Commit f0e0931

Browse files
authored
Add option to disable topic validation (dapr#271)
* Add option to disable topic validation Signed-off-by: yaron2 <schneider.yaron@live.com> * linter Signed-off-by: yaron2 <schneider.yaron@live.com>
1 parent f1f5c7a commit f0e0931

File tree

4 files changed

+51
-2
lines changed

4 files changed

+51
-2
lines changed

service/common/type.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ type Subscription struct {
9999
Match string `json:"match"`
100100
// Priority is the priority in which to evaluate the match (lower to higher).
101101
Priority int `json:"priority"`
102+
// DisableTopicValidation allows to receive events from publisher topics that differ from the subscribed topic.
103+
DisableTopicValidation bool `json:"disableTopicValidation"`
102104
}
103105

104106
const (

service/grpc/topic.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,17 @@ func (s *Server) OnTopicEvent(ctx context.Context, in *pb.TopicEventRequest) (*p
8585
return &pb.TopicEventResponse{Status: pb.TopicEventResponse_DROP}, errors.New("pub/sub and topic names required")
8686
}
8787
key := in.PubsubName + "-" + in.Topic
88-
if sub, ok := s.topicRegistrar[key]; ok {
88+
noValidationKey := in.PubsubName
89+
90+
var sub *internal.TopicRegistration
91+
var ok bool
92+
93+
sub, ok = s.topicRegistrar[key]
94+
if !ok {
95+
sub, ok = s.topicRegistrar[noValidationKey]
96+
}
97+
98+
if ok {
8999
data := interface{}(in.Data)
90100
if len(in.Data) > 0 {
91101
mediaType, _, err := mime.ParseMediaType(in.DataContentType)

service/grpc/topic_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,36 @@ func TestTopic(t *testing.T) {
137137
stopTestServer(t, server)
138138
}
139139

140+
func TestTopicWithValidationDisabled(t *testing.T) {
141+
ctx := context.Background()
142+
143+
sub := &common.Subscription{
144+
PubsubName: "messages",
145+
Topic: "*",
146+
DisableTopicValidation: true,
147+
}
148+
server := getTestServer()
149+
150+
err := server.AddTopicEventHandler(sub, eventHandler)
151+
assert.Nil(t, err)
152+
153+
startTestServer(server)
154+
155+
in := &runtime.TopicEventRequest{
156+
Id: "a123",
157+
Source: "test",
158+
Type: "test",
159+
SpecVersion: "v1.0",
160+
DataContentType: "text/plain",
161+
Data: []byte("test"),
162+
Topic: "test",
163+
PubsubName: sub.PubsubName,
164+
}
165+
166+
_, err = server.OnTopicEvent(ctx, in)
167+
assert.NoError(t, err)
168+
}
169+
140170
func TestTopicWithErrors(t *testing.T) {
141171
ctx := context.Background()
142172

service/internal/topicregistrar.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,14 @@ func (m TopicRegistrar) AddSubscription(sub *common.Subscription, fn common.Topi
2929
if fn == nil {
3030
return fmt.Errorf("topic handler required")
3131
}
32-
key := sub.PubsubName + "-" + sub.Topic
32+
33+
var key string
34+
if !sub.DisableTopicValidation {
35+
key = sub.PubsubName + "-" + sub.Topic
36+
} else {
37+
key = sub.PubsubName
38+
}
39+
3340
ts, ok := m[key]
3441
if !ok {
3542
ts = &TopicRegistration{

0 commit comments

Comments
 (0)