Skip to content

Commit f6242f5

Browse files
committed
Fixed io.Reader. Implemented io.Closer
1 parent 4619135 commit f6242f5

File tree

1 file changed

+25
-1
lines changed

1 file changed

+25
-1
lines changed

redisqueue/queue.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,19 @@ import (
66
"github.com/gomodule/redigo/redis"
77
"strconv"
88
"time"
9+
"io"
910
)
1011

12+
var ErrClosed = fmt.Errorf("use of closed queue")
13+
1114
// Queue holds a reference to a redis connection and a queue name.
1215
type Queue struct {
1316
pool *redis.Pool
1417
c redis.Conn
1518
readBuf bytes.Buffer
1619
usePool bool
1720
Name string
21+
closed bool
1822
}
1923

2024
// New defines a new Queue with redis.Pool or single redis.Conn
@@ -49,8 +53,19 @@ func (q *Queue) Write(p []byte) (n int, err error) {
4953
return
5054
}
5155

56+
func (q *Queue) Close () error {
57+
q.closed = true
58+
59+
return q.FlushQueue()
60+
}
61+
5262
// Implements io.Reader
5363
func (q *Queue) Read(p []byte) (n int, err error) {
64+
65+
if q.closed {
66+
return 0, ErrClosed
67+
}
68+
5469
var available int64
5570
available, err = q.Pending()
5671
if err != nil {
@@ -68,7 +83,12 @@ func (q *Queue) Read(p []byte) (n int, err error) {
6883
}
6984
}
7085

71-
return q.readBuf.Read(p)
86+
n, err = q.readBuf.Read(p)
87+
if err != nil && err == io.EOF {
88+
err = nil
89+
}
90+
91+
return
7292
}
7393

7494
// Push pushes a single job on to the queue. The job string can be any format, as the queue doesn't really care.
@@ -78,6 +98,10 @@ func (q *Queue) Push(job string) (bool, error) {
7898

7999
// Schedule schedule a job at some point in the future, or some point in the past. Scheduling a job far in the past is the same as giving it a high priority, as jobs are popped in order of due date.
80100
func (q *Queue) Schedule(job string, when time.Time) (bool, error) {
101+
if q.closed {
102+
return false, ErrClosed
103+
}
104+
81105
var c redis.Conn
82106
if q.usePool {
83107
c = q.pool.Get()

0 commit comments

Comments
 (0)