@@ -67,8 +67,42 @@ func (q *sqlQuerier) Ping(ctx context.Context) (time.Duration, error) {
67
67
return time .Since (start ), err
68
68
}
69
69
70
- // InTx performs database operations inside a transaction.
71
70
func (q * sqlQuerier ) InTx (function func (Store ) error , txOpts * sql.TxOptions ) error {
71
+ _ , inTx := q .db .(* sqlx.Tx )
72
+ isolation := sql .LevelDefault
73
+ if txOpts != nil {
74
+ isolation = txOpts .Isolation
75
+ }
76
+
77
+ // If we are not already in a transaction, and we are running in serializable
78
+ // mode, we need to run the transaction in a retry loop. The caller should be
79
+ // prepared to allow retries if using serializable mode.
80
+ // If we are in a transaction already, the parent InTx call will handle the retry.
81
+ // We do not want to duplicate those retries.
82
+ if ! inTx && isolation == sql .LevelSerializable {
83
+ // This is an arbitrarily chosen number.
84
+ const retryAmount = 3
85
+ var err error
86
+ attempts := 0
87
+ for attempts = 0 ; attempts < retryAmount ; attempts ++ {
88
+ err = q .runTx (function , txOpts )
89
+ if err == nil {
90
+ // Transaction succeeded.
91
+ return nil
92
+ }
93
+ if err != nil && ! IsSerializedError (err ) {
94
+ // We should only retry if the error is a serialization error.
95
+ return err
96
+ }
97
+ }
98
+ // Transaction kept failing in serializable mode.
99
+ return xerrors .Errorf ("transaction failed after %d attempts: %w" , attempts , err )
100
+ }
101
+ return q .runTx (function , txOpts )
102
+ }
103
+
104
+ // InTx performs database operations inside a transaction.
105
+ func (q * sqlQuerier ) runTx (function func (Store ) error , txOpts * sql.TxOptions ) error {
72
106
if _ , ok := q .db .(* sqlx.Tx ); ok {
73
107
// If the current inner "db" is already a transaction, we just reuse it.
74
108
// We do not need to handle commit/rollback as the outer tx will handle
0 commit comments