Skip to content

Commit 0c36331

Browse files
authored
Exposes FailurePolicy to the Jobs api (dapr#737)
* Exposes FailurePolicy to the Jobs api Signed-off-by: Albert Callarisa <albert@diagrid.io> * Added documentation for the jobs api Signed-off-by: Albert Callarisa <albert@diagrid.io> * Use pointers for optional fields Also: - Refactor how exposed type is converted to proto type - Remove unnecesary log - Input validation for job api calls Signed-off-by: Albert Callarisa <albert@diagrid.io> * clearer variable type Signed-off-by: Albert Callarisa <albert@diagrid.io> * Remove unnecessary helper function Signed-off-by: Albert Callarisa <albert@diagrid.io> * Remove unnecessary (and non-idiomatic) constructor. We can pass the right types directly when building jobs, there's no need for a constructor. The constructor was used previously when the types were private, but now they are exposed so the constructor is unnecessary. Signed-off-by: Albert Callarisa <albert@diagrid.io> * Use `dapr/kit` for inline pointers. Adjusted docs and examples. Signed-off-by: Albert Callarisa <albert@diagrid.io> * Use functional options to build job objects Signed-off-by: Albert Callarisa <albert@diagrid.io> * Add unit tests for the job builder Signed-off-by: Albert Callarisa <albert@diagrid.io> * fix examples validator Signed-off-by: Albert Callarisa <albert@diagrid.io> * Fix lint errors Signed-off-by: Albert Callarisa <albert@diagrid.io> * fix examples dependencies Signed-off-by: Albert Callarisa <albert@diagrid.io> * go mod tidy Signed-off-by: Albert Callarisa <albert@diagrid.io> * Fix job example validation Signed-off-by: Albert Callarisa <albert@diagrid.io> --------- Signed-off-by: Albert Callarisa <albert@diagrid.io>
1 parent 16374cd commit 0c36331

File tree

17 files changed

+581
-281
lines changed

17 files changed

+581
-281
lines changed

.github/workflows/validate_examples.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ jobs:
166166
"configuration",
167167
"conversation",
168168
"crypto",
169-
"dist-scheduler",
169+
"jobs",
170170
"grpc-service",
171171
"hello-world",
172172
"pubsub",

client/client_test.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"google.golang.org/grpc/credentials/insecure"
3333
"google.golang.org/grpc/test/bufconn"
3434
"google.golang.org/protobuf/types/known/anypb"
35+
"google.golang.org/protobuf/types/known/durationpb"
3536
"google.golang.org/protobuf/types/known/emptypb"
3637

3738
commonv1pb "github.com/dapr/dapr/pkg/proto/common/v1"
@@ -563,10 +564,11 @@ func (s *testDaprServer) ScheduleJobAlpha1(ctx context.Context, in *pb.ScheduleJ
563564

564565
func (s *testDaprServer) GetJobAlpha1(ctx context.Context, in *pb.GetJobRequest) (*pb.GetJobResponse, error) {
565566
var (
566-
schedule = "@every 10s"
567-
dueTime = "10s"
568-
repeats uint32 = 4
569-
ttl = "10s"
567+
schedule = "@every 10s"
568+
dueTime = "10s"
569+
repeats uint32 = 4
570+
ttl = "10s"
571+
maxRetries uint32 = 4
570572
)
571573
return &pb.GetJobResponse{
572574
Job: &pb.Job{
@@ -576,6 +578,14 @@ func (s *testDaprServer) GetJobAlpha1(ctx context.Context, in *pb.GetJobRequest)
576578
DueTime: &dueTime,
577579
Ttl: &ttl,
578580
Data: nil,
581+
FailurePolicy: &commonv1pb.JobFailurePolicy{
582+
Policy: &commonv1pb.JobFailurePolicy_Constant{
583+
Constant: &commonv1pb.JobFailurePolicyConstant{
584+
MaxRetries: &maxRetries,
585+
Interval: &durationpb.Duration{Seconds: 10},
586+
},
587+
},
588+
},
579589
},
580590
}, nil
581591
}

client/jobs.go

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
/*
2+
Copyright 2021 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package client
15+
16+
import (
17+
"context"
18+
"errors"
19+
"time"
20+
21+
"google.golang.org/protobuf/types/known/anypb"
22+
"google.golang.org/protobuf/types/known/durationpb"
23+
24+
commonpb "github.com/dapr/dapr/pkg/proto/common/v1"
25+
runtimepb "github.com/dapr/dapr/pkg/proto/runtime/v1"
26+
"github.com/dapr/kit/ptr"
27+
)
28+
29+
type FailurePolicy interface {
30+
GetPBFailurePolicy() *commonpb.JobFailurePolicy
31+
}
32+
33+
type JobFailurePolicyConstant struct {
34+
MaxRetries *uint32
35+
Interval *time.Duration
36+
}
37+
38+
func (f *JobFailurePolicyConstant) GetPBFailurePolicy() *commonpb.JobFailurePolicy {
39+
constantfp := &commonpb.JobFailurePolicyConstant{}
40+
if f.MaxRetries != nil {
41+
constantfp.MaxRetries = f.MaxRetries
42+
}
43+
if f.Interval != nil {
44+
constantfp.Interval = durationpb.New(*f.Interval)
45+
}
46+
return &commonpb.JobFailurePolicy{
47+
Policy: &commonpb.JobFailurePolicy_Constant{
48+
Constant: constantfp,
49+
},
50+
}
51+
}
52+
53+
type JobFailurePolicyDrop struct{}
54+
55+
func (f *JobFailurePolicyDrop) GetPBFailurePolicy() *commonpb.JobFailurePolicy {
56+
return &commonpb.JobFailurePolicy{
57+
Policy: &commonpb.JobFailurePolicy_Drop{
58+
Drop: &commonpb.JobFailurePolicyDrop{},
59+
},
60+
}
61+
}
62+
63+
type Job struct {
64+
Name string
65+
Schedule *string
66+
Repeats *uint32
67+
DueTime *string
68+
TTL *string
69+
Data *anypb.Any
70+
FailurePolicy FailurePolicy
71+
}
72+
73+
type JobOption func(*Job)
74+
75+
func NewJob(name string, opts ...JobOption) *Job {
76+
job := &Job{
77+
Name: name,
78+
}
79+
for _, opt := range opts {
80+
opt(job)
81+
}
82+
return job
83+
}
84+
85+
func WithJobSchedule(schedule string) JobOption {
86+
return func(job *Job) {
87+
job.Schedule = &schedule
88+
}
89+
}
90+
91+
func WithJobRepeats(repeats uint32) JobOption {
92+
return func(job *Job) {
93+
job.Repeats = &repeats
94+
}
95+
}
96+
97+
func WithJobDueTime(dueTime string) JobOption {
98+
return func(job *Job) {
99+
job.DueTime = &dueTime
100+
}
101+
}
102+
103+
func WithJobTTL(ttl string) JobOption {
104+
return func(job *Job) {
105+
job.TTL = &ttl
106+
}
107+
}
108+
109+
func WithJobData(data *anypb.Any) JobOption {
110+
return func(job *Job) {
111+
job.Data = data
112+
}
113+
}
114+
115+
func WithJobConstantFailurePolicy() JobOption {
116+
return func(job *Job) {
117+
job.FailurePolicy = &JobFailurePolicyConstant{}
118+
}
119+
}
120+
121+
func WithJobConstantFailurePolicyMaxRetries(maxRetries uint32) JobOption {
122+
return func(job *Job) {
123+
if job.FailurePolicy == nil {
124+
job.FailurePolicy = &JobFailurePolicyConstant{}
125+
}
126+
if constantPolicy, ok := job.FailurePolicy.(*JobFailurePolicyConstant); ok {
127+
constantPolicy.MaxRetries = &maxRetries
128+
} else {
129+
job.FailurePolicy = &JobFailurePolicyConstant{
130+
MaxRetries: &maxRetries,
131+
}
132+
}
133+
}
134+
}
135+
136+
func WithJobConstantFailurePolicyInterval(interval time.Duration) JobOption {
137+
return func(job *Job) {
138+
if job.FailurePolicy == nil {
139+
job.FailurePolicy = &JobFailurePolicyConstant{}
140+
}
141+
if constantPolicy, ok := job.FailurePolicy.(*JobFailurePolicyConstant); ok {
142+
constantPolicy.Interval = &interval
143+
} else {
144+
job.FailurePolicy = &JobFailurePolicyConstant{
145+
Interval: &interval,
146+
}
147+
}
148+
}
149+
}
150+
151+
func WithJobDropFailurePolicy() JobOption {
152+
return func(job *Job) {
153+
job.FailurePolicy = &JobFailurePolicyDrop{}
154+
}
155+
}
156+
157+
// ScheduleJobAlpha1 raises and schedules a job.
158+
func (c *GRPCClient) ScheduleJobAlpha1(ctx context.Context, job *Job) error {
159+
if job.Name == "" {
160+
return errors.New("job name is required")
161+
}
162+
if job.Data == nil {
163+
return errors.New("job data is required")
164+
}
165+
166+
jobRequest := &runtimepb.Job{
167+
Name: job.Name,
168+
Data: job.Data,
169+
Schedule: job.Schedule,
170+
Repeats: job.Repeats,
171+
DueTime: job.DueTime,
172+
Ttl: job.TTL,
173+
}
174+
175+
if job.FailurePolicy != nil {
176+
jobRequest.FailurePolicy = job.FailurePolicy.GetPBFailurePolicy()
177+
}
178+
_, err := c.protoClient.ScheduleJobAlpha1(ctx, &runtimepb.ScheduleJobRequest{
179+
Job: jobRequest,
180+
})
181+
return err
182+
}
183+
184+
// GetJobAlpha1 retrieves a scheduled job.
185+
func (c *GRPCClient) GetJobAlpha1(ctx context.Context, name string) (*Job, error) {
186+
if name == "" {
187+
return nil, errors.New("job name is required")
188+
}
189+
190+
resp, err := c.protoClient.GetJobAlpha1(ctx, &runtimepb.GetJobRequest{
191+
Name: name,
192+
})
193+
if err != nil {
194+
return nil, err
195+
}
196+
197+
var failurePolicy FailurePolicy
198+
switch policy := resp.GetJob().GetFailurePolicy().GetPolicy().(type) {
199+
case *commonpb.JobFailurePolicy_Constant:
200+
interval := time.Duration(policy.Constant.GetInterval().GetSeconds()) * time.Second
201+
failurePolicy = &JobFailurePolicyConstant{
202+
MaxRetries: ptr.Of(policy.Constant.GetMaxRetries()),
203+
Interval: &interval,
204+
}
205+
case *commonpb.JobFailurePolicy_Drop:
206+
failurePolicy = &JobFailurePolicyDrop{}
207+
}
208+
209+
return &Job{
210+
Name: resp.GetJob().GetName(),
211+
Schedule: ptr.Of(resp.GetJob().GetSchedule()),
212+
Repeats: ptr.Of(resp.GetJob().GetRepeats()),
213+
DueTime: ptr.Of(resp.GetJob().GetDueTime()),
214+
TTL: ptr.Of(resp.GetJob().GetTtl()),
215+
Data: resp.GetJob().GetData(),
216+
FailurePolicy: failurePolicy,
217+
}, nil
218+
}
219+
220+
// DeleteJobAlpha1 deletes a scheduled job.
221+
func (c *GRPCClient) DeleteJobAlpha1(ctx context.Context, name string) error {
222+
if name == "" {
223+
return errors.New("job name is required")
224+
}
225+
226+
_, err := c.protoClient.DeleteJobAlpha1(ctx, &runtimepb.DeleteJobRequest{
227+
Name: name,
228+
})
229+
return err
230+
}

client/jobs_test.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
Copyright 2021 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package client
15+
16+
import (
17+
"testing"
18+
"time"
19+
20+
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
22+
"google.golang.org/protobuf/types/known/anypb"
23+
)
24+
25+
func TestSchedulingAlpha1(t *testing.T) {
26+
ctx := t.Context()
27+
28+
t.Run("schedule job - valid", func(t *testing.T) {
29+
job := NewJob("test",
30+
WithJobSchedule("test"),
31+
WithJobData(&anypb.Any{}),
32+
WithJobConstantFailurePolicy(),
33+
)
34+
35+
err := testClient.ScheduleJobAlpha1(ctx, job)
36+
37+
require.NoError(t, err)
38+
})
39+
40+
t.Run("get job - valid", func(t *testing.T) {
41+
expected := NewJob("name",
42+
WithJobSchedule("@every 10s"),
43+
WithJobRepeats(4),
44+
WithJobDueTime("10s"),
45+
WithJobTTL("10s"),
46+
WithJobConstantFailurePolicy(),
47+
WithJobConstantFailurePolicyMaxRetries(4),
48+
WithJobConstantFailurePolicyInterval(time.Second*10),
49+
)
50+
51+
resp, err := testClient.GetJobAlpha1(ctx, "name")
52+
require.NoError(t, err)
53+
assert.Equal(t, expected, resp)
54+
})
55+
56+
t.Run("delete job - valid", func(t *testing.T) {
57+
err := testClient.DeleteJobAlpha1(ctx, "name")
58+
59+
require.NoError(t, err)
60+
})
61+
}
62+
63+
func TestJobBuilder(t *testing.T) {
64+
t.Run("basic job creation", func(t *testing.T) {
65+
job := NewJob("test-job")
66+
67+
assert.Equal(t, "test-job", job.Name)
68+
assert.Nil(t, job.Schedule)
69+
assert.Nil(t, job.Repeats)
70+
assert.Nil(t, job.DueTime)
71+
assert.Nil(t, job.TTL)
72+
assert.Nil(t, job.Data)
73+
assert.Nil(t, job.FailurePolicy)
74+
})
75+
76+
t.Run("job with all options and constant failure policy", func(t *testing.T) {
77+
job := NewJob("full-job",
78+
WithJobSchedule("@every 10m"),
79+
WithJobRepeats(5),
80+
WithJobDueTime("2024-12-31T23:59:59Z"),
81+
WithJobTTL("2h"),
82+
WithJobData(&anypb.Any{TypeUrl: "test", Value: []byte("test-data")}),
83+
WithJobConstantFailurePolicy(),
84+
WithJobConstantFailurePolicyMaxRetries(3),
85+
WithJobConstantFailurePolicyInterval(time.Minute*2),
86+
)
87+
88+
assert.Equal(t, "full-job", job.Name)
89+
assert.Equal(t, "@every 10m", *job.Schedule)
90+
assert.Equal(t, uint32(5), *job.Repeats)
91+
assert.Equal(t, "2024-12-31T23:59:59Z", *job.DueTime)
92+
assert.Equal(t, "2h", *job.TTL)
93+
assert.Equal(t, &anypb.Any{TypeUrl: "test", Value: []byte("test-data")}, job.Data)
94+
constantPolicy, ok := job.FailurePolicy.(*JobFailurePolicyConstant)
95+
require.True(t, ok)
96+
assert.Equal(t, uint32(3), *constantPolicy.MaxRetries)
97+
assert.Equal(t, time.Minute*2, *constantPolicy.Interval)
98+
})
99+
100+
t.Run("job with all options and drop failure policy", func(t *testing.T) {
101+
job := NewJob("full-job",
102+
WithJobSchedule("@every 10m"),
103+
WithJobRepeats(5),
104+
WithJobDueTime("2024-12-31T23:59:59Z"),
105+
WithJobTTL("2h"),
106+
WithJobData(&anypb.Any{TypeUrl: "test", Value: []byte("test-data")}),
107+
WithJobDropFailurePolicy(),
108+
)
109+
110+
assert.Equal(t, "full-job", job.Name)
111+
assert.Equal(t, "@every 10m", *job.Schedule)
112+
assert.Equal(t, uint32(5), *job.Repeats)
113+
assert.Equal(t, "2024-12-31T23:59:59Z", *job.DueTime)
114+
assert.Equal(t, "2h", *job.TTL)
115+
assert.Equal(t, &anypb.Any{TypeUrl: "test", Value: []byte("test-data")}, job.Data)
116+
_, ok := job.FailurePolicy.(*JobFailurePolicyDrop)
117+
require.True(t, ok)
118+
})
119+
}

0 commit comments

Comments
 (0)