@@ -41,15 +41,27 @@ export class WalletAddressModel extends BaseModel<IWalletAddress> {
41
41
const unprocessedAddresses : Array < string > = [ ] ;
42
42
return new Promise ( async resolve => {
43
43
for ( let address of addresses ) {
44
- const updatedAddress = await this . collection . findOneAndUpdate ( {
45
- wallet : wallet . _id , address : address , chain, network
46
- } , { $setOnInsert : { wallet : wallet . _id , address : address , chain, network } } , { returnOriginal : false , upsert : true } ) ;
47
- if ( ! updatedAddress . value ! . processed ) {
48
- unprocessedAddresses . push ( address ) ;
49
- await CoinStorage . collection . updateMany (
50
- { chain, network, address } ,
51
- { $addToSet : { wallets : wallet . _id } }
44
+ try {
45
+ const updatedAddress = await this . collection . findOneAndUpdate (
46
+ {
47
+ wallet : wallet . _id ,
48
+ address : address ,
49
+ chain,
50
+ network
51
+ } ,
52
+ { $setOnInsert : { wallet : wallet . _id , address : address , chain, network } } ,
53
+ { returnOriginal : false , upsert : true }
52
54
) ;
55
+ if ( ! updatedAddress . value ! . processed ) {
56
+ unprocessedAddresses . push ( address ) ;
57
+ await CoinStorage . collection . updateMany (
58
+ { chain, network, address } ,
59
+ { $addToSet : { wallets : wallet . _id } }
60
+ ) ;
61
+ }
62
+ } catch ( err ) {
63
+ // Perhaps a race condition from multiple calls around the same time
64
+ console . error ( 'Likely an upsert race condition in walletAddress updates' ) ;
53
65
}
54
66
}
55
67
@@ -60,7 +72,7 @@ export class WalletAddressModel extends BaseModel<IWalletAddress> {
60
72
let txids = { } ;
61
73
coinStream . on ( 'data' , ( coin : ICoin ) => {
62
74
coinStream . pause ( ) ;
63
- if ( ! unprocessedAddresses . includes ( coin . address ) ) {
75
+ if ( ! unprocessedAddresses . includes ( coin . address ) ) {
64
76
return coinStream . resume ( ) ;
65
77
}
66
78
if ( ! txids [ coin . mintTxid ] ) {
@@ -80,8 +92,11 @@ export class WalletAddressModel extends BaseModel<IWalletAddress> {
80
92
return coinStream . resume ( ) ;
81
93
} ) ;
82
94
coinStream . on ( 'end' , async ( ) => {
83
- for ( const address of unprocessedAddresses ) {
84
- await this . collection . updateOne ( { chain, network, address, wallet : wallet . _id } , { $set : { processed : true } } ) ;
95
+ for ( const address of unprocessedAddresses ) {
96
+ await this . collection . updateOne (
97
+ { chain, network, address, wallet : wallet . _id } ,
98
+ { $set : { processed : true } }
99
+ ) ;
85
100
}
86
101
resolve ( ) ;
87
102
} ) ;
0 commit comments