Skip to content

Commit c2dfec6

Browse files
authored
Add wait() method to block until Sidecar is up. (dapr#329)
* Add wait() method to block until Sidecar is up. App might depend on sidecar right away. This PR adds a Wait() method to enable app to wait for sidecar to be up before invoking the first call. GRPC client creation on Dapr Go SDK is blocking, so waiting for client readiness is less of a problem here than on SDKs where client connection establishment is async. Closes dapr#287 Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org> * Move Wait its own file. If anything, this will make testing and the change more localized. Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org> * Adding unresponsive TCP and Unix servers and tests Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org> * Remove comments, clean code up Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org> * Remove a bit of code duplication on tests Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org> * Fix Wait and test server setup. * Multiple state changes can happen for a single GRPC Connection. previous code assume a single one and was failing miserably. Fixed. * The logic for the test server's tear down was lacking. Fixed Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org> * Rename on aux. method Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org> * Add link to gRPC documentation about connectivity semantics Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org> * Fixing lint errors Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org> * Fixing more lint errors Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org> Signed-off-by: Tiago Alves Macambira <tmacam@burocrata.org>
1 parent b465b1f commit c2dfec6

File tree

3 files changed

+212
-0
lines changed

3 files changed

+212
-0
lines changed

client/client.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ type Client interface {
144144
// Shutdown the sidecar.
145145
Shutdown(ctx context.Context) error
146146

147+
// Wait for a sidecar to become available for at most `timeout` seconds. Returns errWaitTimedOut if timeout is reached.
148+
Wait(ctx context.Context, timeout time.Duration) error
149+
147150
// WithTraceID adds existing trace ID to the outgoing context.
148151
WithTraceID(ctx context.Context, id string) context.Context
149152

client/wait.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
Copyright 2021 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package client
15+
16+
import (
17+
"context"
18+
"errors"
19+
"time"
20+
21+
"google.golang.org/grpc/connectivity"
22+
)
23+
24+
// The following errors are returned from Wait.
25+
var (
26+
// A call to Wait timed out while waiting for a gRPC connection to reach a Ready state.
27+
errWaitTimedOut = errors.New("timed out waiting for client connectivity")
28+
)
29+
30+
func (c *GRPCClient) Wait(ctx context.Context, timeout time.Duration) error {
31+
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
32+
defer cancel()
33+
34+
// SDKs for other languages implement Wait by attempting to connect to a TCP endpoint
35+
// with a timeout. Go's SDKs handles more endpoints than just TCP ones. To simplify
36+
// the code here, we rely on GRPCs connectivity state management instead.
37+
// See https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
38+
for {
39+
curState := c.connection.GetState()
40+
if curState == connectivity.Ready {
41+
return nil
42+
}
43+
44+
select {
45+
case <-timeoutCtx.Done():
46+
return errWaitTimedOut
47+
default:
48+
// Multiple state changes can happen: keep waiting for a successful one or time out
49+
c.connection.WaitForStateChange(timeoutCtx, curState)
50+
}
51+
}
52+
}

client/wait_test.go

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
Copyright 2021 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package client
15+
16+
import (
17+
"context"
18+
"net"
19+
"os"
20+
"sync/atomic"
21+
"testing"
22+
"time"
23+
24+
"github.com/stretchr/testify/assert"
25+
"google.golang.org/grpc"
26+
"google.golang.org/grpc/credentials/insecure"
27+
)
28+
29+
const (
30+
unresponsiveServerHost = "127.0.0.1"
31+
unresponsiveTCPPort = "0" // Port set to 0 so O.S. auto-selects one for us
32+
unresponsiveUnixSocketFilePath = "/tmp/unresponsive-server.socket"
33+
34+
waitTimeout = 5 * time.Second
35+
connectionTimeout = 4 * waitTimeout // Larger than waitTimeout but still bounded
36+
autoCloseTimeout = 2 * connectionTimeout // Server will close connections after this
37+
)
38+
39+
type Server struct {
40+
listener net.Listener
41+
address string
42+
done chan bool
43+
nClientsSeen uint64
44+
}
45+
46+
func (s *Server) Close() {
47+
close(s.done)
48+
if err := s.listener.Close(); err != nil {
49+
logger.Fatal(err)
50+
}
51+
os.Remove(unresponsiveUnixSocketFilePath)
52+
}
53+
54+
func (s *Server) listenButKeepSilent() {
55+
for {
56+
conn, err := s.listener.Accept() // Accept connections but that's it!
57+
if err != nil {
58+
select {
59+
case <-s.done:
60+
return
61+
default:
62+
logger.Fatal(err)
63+
break
64+
}
65+
} else {
66+
go func(conn net.Conn) {
67+
atomic.AddUint64(&s.nClientsSeen, 1)
68+
time.Sleep(autoCloseTimeout)
69+
conn.Close()
70+
}(conn)
71+
}
72+
}
73+
}
74+
75+
func createUnresponsiveTCPServer() (*Server, error) {
76+
return createUnresponsiveServer("tcp", net.JoinHostPort(unresponsiveServerHost, unresponsiveTCPPort))
77+
}
78+
79+
func createUnresponsiveUnixServer() (*Server, error) {
80+
return createUnresponsiveServer("unix", unresponsiveUnixSocketFilePath)
81+
}
82+
83+
func createUnresponsiveServer(network string, unresponsiveServerAddress string) (*Server, error) {
84+
serverListener, err := net.Listen(network, unresponsiveServerAddress)
85+
if err != nil {
86+
logger.Fatalf("Creation of test server on network %s and address %s failed with error: %v",
87+
network, unresponsiveServerAddress, err)
88+
return nil, err
89+
}
90+
91+
server := &Server{
92+
listener: serverListener,
93+
address: serverListener.Addr().String(),
94+
done: make(chan bool),
95+
nClientsSeen: 0,
96+
}
97+
98+
go server.listenButKeepSilent()
99+
100+
return server, nil
101+
}
102+
103+
func createNonBlockingClient(ctx context.Context, serverAddr string) (client Client, err error) {
104+
conn, err := grpc.DialContext(
105+
ctx,
106+
serverAddr,
107+
grpc.WithTransportCredentials(insecure.NewCredentials()),
108+
)
109+
if err != nil {
110+
logger.Fatal(err)
111+
return nil, err
112+
}
113+
return NewClientWithConnection(conn), nil
114+
}
115+
116+
func TestGrpcWaitHappyCase(t *testing.T) {
117+
ctx := context.Background()
118+
119+
err := testClient.Wait(ctx, waitTimeout)
120+
assert.NoError(t, err)
121+
}
122+
123+
func TestGrpcWaitUnresponsiveTcpServer(t *testing.T) {
124+
ctx := context.Background()
125+
126+
server, err := createUnresponsiveTCPServer()
127+
assert.NoError(t, err)
128+
defer server.Close()
129+
130+
clientConnectionTimeoutCtx, cancel := context.WithTimeout(ctx, connectionTimeout)
131+
defer cancel()
132+
client, err := createNonBlockingClient(clientConnectionTimeoutCtx, server.address)
133+
assert.NoError(t, err)
134+
135+
err = client.Wait(ctx, waitTimeout)
136+
assert.Error(t, err)
137+
assert.Equal(t, errWaitTimedOut, err)
138+
assert.Equal(t, uint64(1), atomic.LoadUint64(&server.nClientsSeen))
139+
}
140+
141+
func TestGrpcWaitUnresponsiveUnixServer(t *testing.T) {
142+
ctx := context.Background()
143+
144+
server, err := createUnresponsiveUnixServer()
145+
assert.NoError(t, err)
146+
defer server.Close()
147+
148+
clientConnectionTimeoutCtx, cancel := context.WithTimeout(ctx, connectionTimeout)
149+
defer cancel()
150+
client, err := createNonBlockingClient(clientConnectionTimeoutCtx, "unix://"+server.address)
151+
assert.NoError(t, err)
152+
153+
err = client.Wait(ctx, waitTimeout)
154+
assert.Error(t, err)
155+
assert.Equal(t, errWaitTimedOut, err)
156+
assert.Equal(t, uint64(1), atomic.LoadUint64(&server.nClientsSeen))
157+
}

0 commit comments

Comments
 (0)