Skip to content

Commit 59ed846

Browse files
committed
util/singleflight: add fork of singleflight with generics
Forked from golang.org/x/sync/singleflight at the x/sync repo's commit 67f06af15bc961c363a7260195bcd53487529a21 Updates golang/go#53427 Change-Id: Iec2b47b7777940017bb9b3db9bd7d93ba4a2e394 Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
1 parent 757ecf7 commit 59ed846

File tree

2 files changed

+549
-0
lines changed

2 files changed

+549
-0
lines changed

util/singleflight/singleflight.go

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
// Copyright (c) 2022 Tailscale Inc & AUTHORS All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
// Copyright 2013 The Go Authors. All rights reserved.
6+
// Use of this source code is governed by a BSD-style
7+
// license that can be found in the LICENSE file.
8+
9+
// Package singleflight provides a duplicate function call suppression
10+
// mechanism.
11+
//
12+
// This is a Tailscale fork of Go's singleflight package which has had several
13+
// homes in the past:
14+
//
15+
// * https://github.com/golang/go/commit/61d3b2db6292581fc07a3767ec23ec94ad6100d1
16+
// * https://github.com/golang/groupcache/tree/master/singleflight
17+
// * https://pkg.go.dev/golang.org/x/sync/singleflight
18+
//
19+
// This fork adds generics.
20+
package singleflight // import "tailscale.com/util/singleflight"
21+
22+
import (
23+
"bytes"
24+
"errors"
25+
"fmt"
26+
"runtime"
27+
"runtime/debug"
28+
"sync"
29+
)
30+
31+
// errGoexit indicates the runtime.Goexit was called in
32+
// the user given function.
33+
var errGoexit = errors.New("runtime.Goexit was called")
34+
35+
// A panicError is an arbitrary value recovered from a panic
36+
// with the stack trace during the execution of given function.
37+
type panicError struct {
38+
value interface{}
39+
stack []byte
40+
}
41+
42+
// Error implements error interface.
43+
func (p *panicError) Error() string {
44+
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
45+
}
46+
47+
func newPanicError(v interface{}) error {
48+
stack := debug.Stack()
49+
50+
// The first line of the stack trace is of the form "goroutine N [status]:"
51+
// but by the time the panic reaches Do the goroutine may no longer exist
52+
// and its status will have changed. Trim out the misleading line.
53+
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
54+
stack = stack[line+1:]
55+
}
56+
return &panicError{value: v, stack: stack}
57+
}
58+
59+
// call is an in-flight or completed singleflight.Do call
60+
type call[V any] struct {
61+
wg sync.WaitGroup
62+
63+
// These fields are written once before the WaitGroup is done
64+
// and are only read after the WaitGroup is done.
65+
val V
66+
err error
67+
68+
// forgotten indicates whether Forget was called with this call's key
69+
// while the call was still in flight.
70+
forgotten bool
71+
72+
// These fields are read and written with the singleflight
73+
// mutex held before the WaitGroup is done, and are read but
74+
// not written after the WaitGroup is done.
75+
dups int
76+
chans []chan<- Result[V]
77+
}
78+
79+
// Group represents a class of work and forms a namespace in
80+
// which units of work can be executed with duplicate suppression.
81+
type Group[K comparable, V any] struct {
82+
mu sync.Mutex // protects m
83+
m map[K]*call[V] // lazily initialized
84+
}
85+
86+
// Result holds the results of Do, so they can be passed
87+
// on a channel.
88+
type Result[V any] struct {
89+
Val V
90+
Err error
91+
Shared bool
92+
}
93+
94+
// Do executes and returns the results of the given function, making
95+
// sure that only one execution is in-flight for a given key at a
96+
// time. If a duplicate comes in, the duplicate caller waits for the
97+
// original to complete and receives the same results.
98+
// The return value shared indicates whether v was given to multiple callers.
99+
func (g *Group[K, V]) Do(key K, fn func() (V, error)) (v V, err error, shared bool) {
100+
g.mu.Lock()
101+
if g.m == nil {
102+
g.m = make(map[K]*call[V])
103+
}
104+
if c, ok := g.m[key]; ok {
105+
c.dups++
106+
g.mu.Unlock()
107+
c.wg.Wait()
108+
109+
if e, ok := c.err.(*panicError); ok {
110+
panic(e)
111+
} else if c.err == errGoexit {
112+
runtime.Goexit()
113+
}
114+
return c.val, c.err, true
115+
}
116+
c := new(call[V])
117+
c.wg.Add(1)
118+
g.m[key] = c
119+
g.mu.Unlock()
120+
121+
g.doCall(c, key, fn)
122+
return c.val, c.err, c.dups > 0
123+
}
124+
125+
// DoChan is like Do but returns a channel that will receive the
126+
// results when they are ready.
127+
//
128+
// The returned channel will not be closed.
129+
func (g *Group[K, V]) DoChan(key K, fn func() (V, error)) <-chan Result[V] {
130+
ch := make(chan Result[V], 1)
131+
g.mu.Lock()
132+
if g.m == nil {
133+
g.m = make(map[K]*call[V])
134+
}
135+
if c, ok := g.m[key]; ok {
136+
c.dups++
137+
c.chans = append(c.chans, ch)
138+
g.mu.Unlock()
139+
return ch
140+
}
141+
c := &call[V]{chans: []chan<- Result[V]{ch}}
142+
c.wg.Add(1)
143+
g.m[key] = c
144+
g.mu.Unlock()
145+
146+
go g.doCall(c, key, fn)
147+
148+
return ch
149+
}
150+
151+
// doCall handles the single call for a key.
152+
func (g *Group[K, V]) doCall(c *call[V], key K, fn func() (V, error)) {
153+
normalReturn := false
154+
recovered := false
155+
156+
// use double-defer to distinguish panic from runtime.Goexit,
157+
// more details see https://golang.org/cl/134395
158+
defer func() {
159+
// the given function invoked runtime.Goexit
160+
if !normalReturn && !recovered {
161+
c.err = errGoexit
162+
}
163+
164+
c.wg.Done()
165+
g.mu.Lock()
166+
defer g.mu.Unlock()
167+
if !c.forgotten {
168+
delete(g.m, key)
169+
}
170+
171+
if e, ok := c.err.(*panicError); ok {
172+
// In order to prevent the waiting channels from being blocked forever,
173+
// needs to ensure that this panic cannot be recovered.
174+
if len(c.chans) > 0 {
175+
go panic(e)
176+
select {} // Keep this goroutine around so that it will appear in the crash dump.
177+
} else {
178+
panic(e)
179+
}
180+
} else if c.err == errGoexit {
181+
// Already in the process of goexit, no need to call again
182+
} else {
183+
// Normal return
184+
for _, ch := range c.chans {
185+
ch <- Result[V]{c.val, c.err, c.dups > 0}
186+
}
187+
}
188+
}()
189+
190+
func() {
191+
defer func() {
192+
if !normalReturn {
193+
// Ideally, we would wait to take a stack trace until we've determined
194+
// whether this is a panic or a runtime.Goexit.
195+
//
196+
// Unfortunately, the only way we can distinguish the two is to see
197+
// whether the recover stopped the goroutine from terminating, and by
198+
// the time we know that, the part of the stack trace relevant to the
199+
// panic has been discarded.
200+
if r := recover(); r != nil {
201+
c.err = newPanicError(r)
202+
}
203+
}
204+
}()
205+
206+
c.val, c.err = fn()
207+
normalReturn = true
208+
}()
209+
210+
if !normalReturn {
211+
recovered = true
212+
}
213+
}
214+
215+
// Forget tells the singleflight to forget about a key. Future calls
216+
// to Do for this key will call the function rather than waiting for
217+
// an earlier call to complete.
218+
func (g *Group[K, V]) Forget(key K) {
219+
g.mu.Lock()
220+
if c, ok := g.m[key]; ok {
221+
c.forgotten = true
222+
}
223+
delete(g.m, key)
224+
g.mu.Unlock()
225+
}

0 commit comments

Comments
 (0)