@@ -11,21 +11,20 @@ use graph::blockchain::block_stream::{
11
11
BlockStream , BlockStreamError , BlockStreamEvent , BlockWithTriggers , ChainHeadUpdateStream ,
12
12
FirehoseCursor , TriggersAdapterWrapper , BUFFERED_BLOCK_STREAM_SIZE ,
13
13
} ;
14
- use graph:: blockchain:: { Block , BlockPtr , Blockchain , TriggerFilterWrapper } ;
14
+ use graph:: blockchain:: { Block , BlockPtr , TriggerFilterWrapper } ;
15
15
use graph:: futures03:: { stream:: Stream , Future , FutureExt } ;
16
16
use graph:: prelude:: { ChainStore , CheapClone , DeploymentHash , NodeId , BLOCK_NUMBER_MAX } ;
17
17
use graph:: slog:: { debug, info, trace, warn, Logger } ;
18
18
19
19
use graph:: components:: store:: BlockNumber ;
20
20
use graph:: data:: subgraph:: UnifiedMappingApiVersion ;
21
21
22
+ use crate :: Chain ;
23
+
22
24
// A high number here forces a slow start.
23
25
const STARTING_PREVIOUS_TRIGGERS_PER_BLOCK : f64 = 1_000_000.0 ;
24
26
25
- enum BlockStreamState < C >
26
- where
27
- C : Blockchain ,
28
- {
27
+ enum BlockStreamState {
29
28
/// Starting or restarting reconciliation.
30
29
///
31
30
/// Valid next states: Reconciliation
@@ -34,13 +33,13 @@ where
34
33
/// The BlockStream is reconciling the subgraph store state with the chain store state.
35
34
///
36
35
/// Valid next states: YieldingBlocks, Idle, BeginReconciliation (in case of revert)
37
- Reconciliation ( Pin < Box < dyn Future < Output = Result < NextBlocks < C > , Error > > + Send > > ) ,
36
+ Reconciliation ( Pin < Box < dyn Future < Output = Result < NextBlocks , Error > > + Send > > ) ,
38
37
39
38
/// The BlockStream is emitting blocks that must be processed in order to bring the subgraph
40
39
/// store up to date with the chain store.
41
40
///
42
41
/// Valid next states: BeginReconciliation
43
- YieldingBlocks ( Box < VecDeque < BlockWithTriggers < C > > > ) ,
42
+ YieldingBlocks ( Box < VecDeque < BlockWithTriggers < Chain > > > ) ,
44
43
45
44
/// The BlockStream experienced an error and is pausing before attempting to produce
46
45
/// blocks again.
@@ -57,16 +56,13 @@ where
57
56
58
57
/// A single next step to take in reconciling the state of the subgraph store with the state of the
59
58
/// chain store.
60
- enum ReconciliationStep < C >
61
- where
62
- C : Blockchain ,
63
- {
59
+ enum ReconciliationStep {
64
60
/// Revert(to) the block the subgraph should be reverted to, so it becomes the new subgraph
65
61
/// head.
66
62
Revert ( BlockPtr ) ,
67
63
68
64
/// Move forwards, processing one or more blocks. Second element is the block range size.
69
- ProcessDescendantBlocks ( Vec < BlockWithTriggers < C > > , BlockNumber ) ,
65
+ ProcessDescendantBlocks ( Vec < BlockWithTriggers < Chain > > , BlockNumber ) ,
70
66
71
67
/// This step is a no-op, but we need to check again for a next step.
72
68
Retry ,
@@ -76,18 +72,15 @@ where
76
72
Done ,
77
73
}
78
74
79
- struct PollingBlockStreamContext < C >
80
- where
81
- C : Blockchain ,
82
- {
75
+ struct PollingBlockStreamContext {
83
76
chain_store : Arc < dyn ChainStore > ,
84
- adapter : Arc < TriggersAdapterWrapper < C > > ,
77
+ adapter : Arc < TriggersAdapterWrapper < Chain > > ,
85
78
node_id : NodeId ,
86
79
subgraph_id : DeploymentHash ,
87
80
// This is not really a block number, but the (unsigned) difference
88
81
// between two block numbers
89
82
reorg_threshold : BlockNumber ,
90
- filter : Arc < TriggerFilterWrapper < C > > ,
83
+ filter : Arc < TriggerFilterWrapper < Chain > > ,
91
84
start_blocks : Vec < BlockNumber > ,
92
85
logger : Logger ,
93
86
previous_triggers_per_block : f64 ,
100
93
current_block : Option < BlockPtr > ,
101
94
}
102
95
103
- impl < C : Blockchain > Clone for PollingBlockStreamContext < C > {
96
+ impl Clone for PollingBlockStreamContext {
104
97
fn clone ( & self ) -> Self {
105
98
Self {
106
99
chain_store : self . chain_store . cheap_clone ( ) ,
@@ -121,37 +114,31 @@ impl<C: Blockchain> Clone for PollingBlockStreamContext<C> {
121
114
}
122
115
}
123
116
124
- pub struct PollingBlockStream < C : Blockchain > {
125
- state : BlockStreamState < C > ,
117
+ pub struct PollingBlockStream {
118
+ state : BlockStreamState ,
126
119
consecutive_err_count : u32 ,
127
120
chain_head_update_stream : ChainHeadUpdateStream ,
128
- ctx : PollingBlockStreamContext < C > ,
121
+ ctx : PollingBlockStreamContext ,
129
122
}
130
123
131
124
// This is the same as `ReconciliationStep` but without retries.
132
- enum NextBlocks < C >
133
- where
134
- C : Blockchain ,
135
- {
125
+ enum NextBlocks {
136
126
/// Blocks and range size
137
- Blocks ( VecDeque < BlockWithTriggers < C > > , BlockNumber ) ,
127
+ Blocks ( VecDeque < BlockWithTriggers < Chain > > , BlockNumber ) ,
138
128
139
129
// The payload is block the subgraph should be reverted to, so it becomes the new subgraph head.
140
130
Revert ( BlockPtr ) ,
141
131
Done ,
142
132
}
143
133
144
- impl < C > PollingBlockStream < C >
145
- where
146
- C : Blockchain ,
147
- {
134
+ impl PollingBlockStream {
148
135
pub fn new (
149
136
chain_store : Arc < dyn ChainStore > ,
150
137
chain_head_update_stream : ChainHeadUpdateStream ,
151
- adapter : Arc < TriggersAdapterWrapper < C > > ,
138
+ adapter : Arc < TriggersAdapterWrapper < Chain > > ,
152
139
node_id : NodeId ,
153
140
subgraph_id : DeploymentHash ,
154
- filter : Arc < TriggerFilterWrapper < C > > ,
141
+ filter : Arc < TriggerFilterWrapper < Chain > > ,
155
142
start_blocks : Vec < BlockNumber > ,
156
143
reorg_threshold : BlockNumber ,
157
144
logger : Logger ,
@@ -184,12 +171,9 @@ where
184
171
}
185
172
}
186
173
187
- impl < C > PollingBlockStreamContext < C >
188
- where
189
- C : Blockchain ,
190
- {
174
+ impl PollingBlockStreamContext {
191
175
/// Perform reconciliation steps until there are blocks to yield or we are up-to-date.
192
- async fn next_blocks ( & self ) -> Result < NextBlocks < C > , Error > {
176
+ async fn next_blocks ( & self ) -> Result < NextBlocks , Error > {
193
177
let ctx = self . clone ( ) ;
194
178
195
179
loop {
@@ -214,7 +198,7 @@ where
214
198
}
215
199
216
200
/// Determine the next reconciliation step. Does not modify Store or ChainStore.
217
- async fn get_next_step ( & self ) -> Result < ReconciliationStep < C > , Error > {
201
+ async fn get_next_step ( & self ) -> Result < ReconciliationStep , Error > {
218
202
let ctx = self . clone ( ) ;
219
203
let start_blocks = self . start_blocks . clone ( ) ;
220
204
let max_block_range_size = self . max_block_range_size ;
@@ -500,14 +484,14 @@ where
500
484
}
501
485
}
502
486
503
- impl < C : Blockchain > BlockStream < C > for PollingBlockStream < C > {
487
+ impl BlockStream < Chain > for PollingBlockStream {
504
488
fn buffer_size_hint ( & self ) -> usize {
505
489
BUFFERED_BLOCK_STREAM_SIZE
506
490
}
507
491
}
508
492
509
- impl < C : Blockchain > Stream for PollingBlockStream < C > {
510
- type Item = Result < BlockStreamEvent < C > , BlockStreamError > ;
493
+ impl Stream for PollingBlockStream {
494
+ type Item = Result < BlockStreamEvent < Chain > , BlockStreamError > ;
511
495
512
496
fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
513
497
let result = loop {
0 commit comments