Skip to content

Commit 6f2ae7d

Browse files
fix: config api (dapr#200)
* Fix: Finished Configuration API Signed-off-by: LaurenceLiZhixin <382673304@qq.com> * Fix: add configuration validation Signed-off-by: LaurenceLiZhixin <382673304@qq.com> * Fix: fix validation test Signed-off-by: LaurenceLiZhixin <382673304@qq.com> * Fix: remove validation Signed-off-by: LaurenceLiZhixin <382673304@qq.com> * add ut Signed-off-by: LaurenceLiZhixin <382673304@qq.com> * Fix: comment Signed-off-by: LaurenceLiZhixin <382673304@qq.com>
1 parent f0e0931 commit 6f2ae7d

File tree

12 files changed

+688
-13
lines changed

12 files changed

+688
-13
lines changed

actor/config/config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ func GetConfigFromOptions(opts ...Option) *ActorConfig {
3535
conf := &ActorConfig{
3636
SerializerType: constant.DefaultSerializerType,
3737
}
38-
for _, opt := range opts {
39-
opt(conf)
38+
for _, o := range opts {
39+
o(conf)
4040
}
4141
return conf
4242
}

client/client.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,18 @@ type Client interface {
110110
// ExecuteStateTransaction provides way to execute multiple operations on a specified store.
111111
ExecuteStateTransaction(ctx context.Context, storeName string, meta map[string]string, ops []*StateOperation) error
112112

113+
// GetConfigurationItem can get target configuration item by storeName and key
114+
GetConfigurationItem(ctx context.Context, storeName, key string, opts ...ConfigurationOpt) (*ConfigurationItem, error)
115+
116+
// GetConfigurationItems can get a list of configuration item by storeName and keys
117+
GetConfigurationItems(ctx context.Context, storeName string, keys []string, opts ...ConfigurationOpt) ([]*ConfigurationItem, error)
118+
119+
// SubscribeConfigurationItems can subscribe the change of configuration items by storeName and keys, and return subscription id
120+
SubscribeConfigurationItems(ctx context.Context, storeName string, keys []string, handler ConfigurationHandleFunction, opts ...ConfigurationOpt) error
121+
122+
// UnsubscribeConfigurationItems can stop the subscription with target store's and id
123+
UnsubscribeConfigurationItems(ctx context.Context, storeName string, id string, opts ...ConfigurationOpt) error
124+
113125
// DeleteBulkState deletes content for multiple keys from store.
114126
DeleteBulkState(ctx context.Context, storeName string, keys []string, meta map[string]string) error
115127

client/client_test.go

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@ import (
1919
"fmt"
2020
"net"
2121
"os"
22+
"strconv"
23+
"sync"
2224
"testing"
25+
"time"
2326

2427
"github.com/golang/protobuf/ptypes/empty"
28+
"github.com/google/uuid"
29+
"github.com/pkg/errors"
2530
"github.com/stretchr/testify/assert"
26-
2731
"google.golang.org/grpc"
2832
"google.golang.org/grpc/test/bufconn"
2933
"google.golang.org/protobuf/types/known/anypb"
@@ -130,7 +134,8 @@ func TestShutdown(t *testing.T) {
130134
func getTestClient(ctx context.Context) (client Client, closer func()) {
131135
s := grpc.NewServer()
132136
pb.RegisterDaprServer(s, &testDaprServer{
133-
state: make(map[string][]byte),
137+
state: make(map[string][]byte),
138+
configurationSubscriptionID: map[string]chan struct{}{},
134139
})
135140

136141
l := bufconn.Listen(testBufSize)
@@ -161,7 +166,8 @@ func getTestClient(ctx context.Context) (client Client, closer func()) {
161166
func getTestClientWithSocket(ctx context.Context) (client Client, closer func()) {
162167
s := grpc.NewServer()
163168
pb.RegisterDaprServer(s, &testDaprServer{
164-
state: make(map[string][]byte),
169+
state: make(map[string][]byte),
170+
configurationSubscriptionID: map[string]chan struct{}{},
165171
})
166172

167173
var lc net.ListenConfig
@@ -191,7 +197,9 @@ func getTestClientWithSocket(ctx context.Context) (client Client, closer func())
191197

192198
type testDaprServer struct {
193199
pb.UnimplementedDaprServer
194-
state map[string][]byte
200+
state map[string][]byte
201+
configurationSubscriptionIDMapLoc sync.Mutex
202+
configurationSubscriptionID map[string]chan struct{}
195203
}
196204

197205
func (s *testDaprServer) InvokeService(ctx context.Context, req *pb.InvokeServiceRequest) (*commonv1pb.InvokeResponse, error) {
@@ -348,3 +356,62 @@ func (s *testDaprServer) UnregisterActorTimer(context.Context, *pb.UnregisterAct
348356
func (s *testDaprServer) Shutdown(ctx context.Context, req *empty.Empty) (*empty.Empty, error) {
349357
return &empty.Empty{}, nil
350358
}
359+
360+
func (s *testDaprServer) GetConfigurationAlpha1(ctx context.Context, in *pb.GetConfigurationRequest) (*pb.GetConfigurationResponse, error) {
361+
if in.GetStoreName() == "" {
362+
return &pb.GetConfigurationResponse{}, errors.New("store name notfound")
363+
}
364+
items := make([]*commonv1pb.ConfigurationItem, 0)
365+
for _, v := range in.GetKeys() {
366+
items = append(items, &commonv1pb.ConfigurationItem{
367+
Key: v,
368+
Value: v + valueSuffix,
369+
})
370+
}
371+
return &pb.GetConfigurationResponse{
372+
Items: items,
373+
}, nil
374+
}
375+
376+
func (s *testDaprServer) SubscribeConfigurationAlpha1(in *pb.SubscribeConfigurationRequest, server pb.Dapr_SubscribeConfigurationAlpha1Server) error {
377+
stopCh := make(chan struct{})
378+
id, _ := uuid.NewUUID()
379+
s.configurationSubscriptionIDMapLoc.Lock()
380+
s.configurationSubscriptionID[id.String()] = stopCh
381+
s.configurationSubscriptionIDMapLoc.Unlock()
382+
for i := 0; i < 5; i++ {
383+
select {
384+
case <-stopCh:
385+
return nil
386+
default:
387+
}
388+
items := make([]*commonv1pb.ConfigurationItem, 0)
389+
for _, v := range in.GetKeys() {
390+
items = append(items, &commonv1pb.ConfigurationItem{
391+
Key: v,
392+
Value: v + "_" + strconv.Itoa(i),
393+
},
394+
)
395+
}
396+
if err := server.Send(&pb.SubscribeConfigurationResponse{
397+
Id: id.String(),
398+
Items: items,
399+
}); err != nil {
400+
return err
401+
}
402+
time.Sleep(time.Second)
403+
}
404+
return nil
405+
}
406+
407+
func (s *testDaprServer) UnsubscribeConfigurationAlpha1(ctx context.Context, in *pb.UnsubscribeConfigurationRequest) (*pb.UnsubscribeConfigurationResponse, error) {
408+
s.configurationSubscriptionIDMapLoc.Lock()
409+
defer s.configurationSubscriptionIDMapLoc.Unlock()
410+
ch, ok := s.configurationSubscriptionID[in.Id]
411+
if !ok {
412+
return &pb.UnsubscribeConfigurationResponse{Ok: true}, nil
413+
}
414+
close(ch)
415+
delete(s.configurationSubscriptionID, in.Id)
416+
return &pb.UnsubscribeConfigurationResponse{Ok: true}, nil
417+
}

client/configuration.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
8+
"github.com/pkg/errors"
9+
10+
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
11+
)
12+
13+
type ConfigurationItem struct {
14+
Key string
15+
Value string
16+
Version string
17+
Metadata map[string]string
18+
}
19+
20+
type ConfigurationOpt func(map[string]string)
21+
22+
func WithConfigurationMetadata(key, value string) ConfigurationOpt {
23+
return func(m map[string]string) {
24+
m[key] = value
25+
}
26+
}
27+
28+
func (c *GRPCClient) GetConfigurationItem(ctx context.Context, storeName, key string, opts ...ConfigurationOpt) (*ConfigurationItem, error) {
29+
items, err := c.GetConfigurationItems(ctx, storeName, []string{key}, opts...)
30+
if err != nil {
31+
return nil, err
32+
}
33+
if len(items) == 0 {
34+
return nil, nil
35+
}
36+
return items[0], nil
37+
}
38+
39+
func (c *GRPCClient) GetConfigurationItems(ctx context.Context, storeName string, keys []string, opts ...ConfigurationOpt) ([]*ConfigurationItem, error) {
40+
metadata := make(map[string]string)
41+
for _, opt := range opts {
42+
opt(metadata)
43+
}
44+
rsp, err := c.protoClient.GetConfigurationAlpha1(ctx, &pb.GetConfigurationRequest{
45+
StoreName: storeName,
46+
Keys: keys,
47+
Metadata: metadata,
48+
})
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
configItems := make([]*ConfigurationItem, 0)
54+
for _, v := range rsp.Items {
55+
configItems = append(configItems, &ConfigurationItem{
56+
Key: v.Key,
57+
Value: v.Value,
58+
Version: v.Version,
59+
Metadata: v.Metadata,
60+
})
61+
}
62+
return configItems, nil
63+
}
64+
65+
type ConfigurationHandleFunction func(string, []*ConfigurationItem)
66+
67+
func (c *GRPCClient) SubscribeConfigurationItems(ctx context.Context, storeName string, keys []string, handler ConfigurationHandleFunction, opts ...ConfigurationOpt) error {
68+
metadata := make(map[string]string)
69+
for _, opt := range opts {
70+
opt(metadata)
71+
}
72+
73+
client, err := c.protoClient.SubscribeConfigurationAlpha1(ctx, &pb.SubscribeConfigurationRequest{
74+
StoreName: storeName,
75+
Keys: keys,
76+
Metadata: metadata,
77+
})
78+
if err != nil {
79+
return errors.Errorf("subscribe configuration failed with error = %s", err)
80+
}
81+
82+
var subcribeID string
83+
stopCh := make(chan struct{})
84+
go func() {
85+
for {
86+
rsp, err := client.Recv()
87+
if err == io.EOF || rsp == nil {
88+
// receive goroutine would close if unsubscribe is called
89+
fmt.Println("dapr configuration subscribe finished.")
90+
close(stopCh)
91+
break
92+
}
93+
subcribeID = rsp.Id
94+
configurationItems := make([]*ConfigurationItem, 0)
95+
for _, v := range rsp.Items {
96+
configurationItems = append(configurationItems, &ConfigurationItem{
97+
Key: v.Key,
98+
Value: v.Value,
99+
Version: v.Version,
100+
Metadata: v.Metadata,
101+
})
102+
}
103+
handler(rsp.Id, configurationItems)
104+
}
105+
}()
106+
select {
107+
case <-ctx.Done():
108+
return c.UnsubscribeConfigurationItems(context.Background(), storeName, subcribeID)
109+
case <-stopCh:
110+
return nil
111+
}
112+
}
113+
114+
func (c *GRPCClient) UnsubscribeConfigurationItems(ctx context.Context, storeName string, id string, opts ...ConfigurationOpt) error {
115+
alpha1, err := c.protoClient.UnsubscribeConfigurationAlpha1(ctx, &pb.UnsubscribeConfigurationRequest{
116+
StoreName: storeName,
117+
Id: id,
118+
})
119+
if err != nil {
120+
return err
121+
}
122+
if !alpha1.Ok {
123+
return errors.Errorf("unsubscribe error message = %s", alpha1.GetMessage())
124+
}
125+
return nil
126+
}

client/configuration_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"testing"
7+
"time"
8+
9+
"go.uber.org/atomic"
10+
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
const (
15+
valueSuffix = "_value"
16+
)
17+
18+
func TestGetConfigurationItem(t *testing.T) {
19+
ctx := context.Background()
20+
21+
t.Run("get configuration item", func(t *testing.T) {
22+
resp, err := testClient.GetConfigurationItem(ctx, "example-config", "mykey")
23+
assert.Nil(t, err)
24+
assert.Equal(t, "mykey"+valueSuffix, resp.Value)
25+
})
26+
27+
t.Run("get configuration item with invalid storeName", func(t *testing.T) {
28+
_, err := testClient.GetConfigurationItem(ctx, "", "mykey")
29+
assert.NotNil(t, err)
30+
})
31+
}
32+
33+
func TestGetConfigurationItems(t *testing.T) {
34+
ctx := context.Background()
35+
36+
t.Run("Test get configuration items", func(t *testing.T) {
37+
resp, err := testClient.GetConfigurationItems(ctx, "example-config", []string{"mykey1", "mykey2", "mykey3"})
38+
assert.Nil(t, err)
39+
for i, v := range resp {
40+
assert.Equal(t, "mykey"+strconv.Itoa(i+1)+valueSuffix, v.Value)
41+
}
42+
})
43+
}
44+
45+
func TestSubscribeConfigurationItems(t *testing.T) {
46+
ctx := context.Background()
47+
48+
counter := 0
49+
totalCounter := 0
50+
t.Run("Test subscribe configuration items", func(t *testing.T) {
51+
err := testClient.SubscribeConfigurationItems(ctx, "example-config",
52+
[]string{"mykey", "mykey2", "mykey3"}, func(s string, items []*ConfigurationItem) {
53+
counter++
54+
for _, v := range items {
55+
assert.Equal(t, v.Value, v.Key+"_"+strconv.Itoa(counter-1))
56+
totalCounter++
57+
}
58+
})
59+
assert.Nil(t, err)
60+
})
61+
time.Sleep(time.Second*5 + time.Millisecond*500)
62+
assert.Equal(t, 5, counter)
63+
assert.Equal(t, 15, totalCounter)
64+
}
65+
66+
func TestUnSubscribeConfigurationItems(t *testing.T) {
67+
ctx := context.Background()
68+
69+
counter := atomic.Int32{}
70+
totalCounter := atomic.Int32{}
71+
t.Run("Test unsubscribe configuration items", func(t *testing.T) {
72+
subscribeID := ""
73+
subscribeIDChan := make(chan string)
74+
go func() {
75+
err := testClient.SubscribeConfigurationItems(ctx, "example-config",
76+
[]string{"mykey", "mykey2", "mykey3"}, func(id string, items []*ConfigurationItem) {
77+
counter.Inc()
78+
for _, v := range items {
79+
assert.Equal(t, v.Value, v.Key+"_"+strconv.Itoa(int(counter.Load()-1)))
80+
totalCounter.Inc()
81+
}
82+
select {
83+
case subscribeIDChan <- id:
84+
default:
85+
}
86+
})
87+
assert.Nil(t, err)
88+
}()
89+
subscribeID = <-subscribeIDChan
90+
time.Sleep(time.Second * 2)
91+
time.Sleep(time.Millisecond * 500)
92+
err := testClient.UnsubscribeConfigurationItems(ctx, "example-config", subscribeID)
93+
assert.Nil(t, err)
94+
})
95+
time.Sleep(time.Second * 5)
96+
assert.Equal(t, 3, int(counter.Load()))
97+
assert.Equal(t, 9, int(totalCounter.Load()))
98+
}

client/pubsub.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ func (c *GRPCClient) PublishEvent(ctx context.Context, pubsubName, topicName str
4444
PubsubName: pubsubName,
4545
Topic: topicName,
4646
}
47-
for _, opt := range opts {
48-
opt(request)
47+
for _, o := range opts {
48+
o(request)
4949
}
5050

5151
if data != nil {

examples/actor/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,11 @@ dapr stop --app-id actor-serving
9696
== APP == get user = {Name: Age:1}
9797
== APP == get user = {Name: Age:2}
9898
✅ Exited App successfully
99-
10099
```
100+
101101
- server side
102-
```
103102

103+
```
104104
== APP == call get user req = &{abc 123}
105105
== APP == get req = laurence
106106
== APP == get post request = laurence

0 commit comments

Comments
 (0)