Skip to content

Commit d577345

Browse files
committed
chain/ethereum: Monomorphize PollingBlockStream
It is only used for ethereum, so there's not point in pretending that other chains could be using it.
1 parent 7bbd26e commit d577345

File tree

1 file changed

+26
-42
lines changed

1 file changed

+26
-42
lines changed

chain/ethereum/src/polling_block_stream.rs

Lines changed: 26 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,20 @@ use graph::blockchain::block_stream::{
1111
BlockStream, BlockStreamError, BlockStreamEvent, BlockWithTriggers, ChainHeadUpdateStream,
1212
FirehoseCursor, TriggersAdapterWrapper, BUFFERED_BLOCK_STREAM_SIZE,
1313
};
14-
use graph::blockchain::{Block, BlockPtr, Blockchain, TriggerFilterWrapper};
14+
use graph::blockchain::{Block, BlockPtr, TriggerFilterWrapper};
1515
use graph::futures03::{stream::Stream, Future, FutureExt};
1616
use graph::prelude::{ChainStore, CheapClone, DeploymentHash, NodeId, BLOCK_NUMBER_MAX};
1717
use graph::slog::{debug, info, trace, warn, Logger};
1818

1919
use graph::components::store::BlockNumber;
2020
use graph::data::subgraph::UnifiedMappingApiVersion;
2121

22+
use crate::Chain;
23+
2224
// A high number here forces a slow start.
2325
const STARTING_PREVIOUS_TRIGGERS_PER_BLOCK: f64 = 1_000_000.0;
2426

25-
enum BlockStreamState<C>
26-
where
27-
C: Blockchain,
28-
{
27+
enum BlockStreamState {
2928
/// Starting or restarting reconciliation.
3029
///
3130
/// Valid next states: Reconciliation
@@ -34,13 +33,13 @@ where
3433
/// The BlockStream is reconciling the subgraph store state with the chain store state.
3534
///
3635
/// Valid next states: YieldingBlocks, Idle, BeginReconciliation (in case of revert)
37-
Reconciliation(Pin<Box<dyn Future<Output = Result<NextBlocks<C>, Error>> + Send>>),
36+
Reconciliation(Pin<Box<dyn Future<Output = Result<NextBlocks, Error>> + Send>>),
3837

3938
/// The BlockStream is emitting blocks that must be processed in order to bring the subgraph
4039
/// store up to date with the chain store.
4140
///
4241
/// Valid next states: BeginReconciliation
43-
YieldingBlocks(Box<VecDeque<BlockWithTriggers<C>>>),
42+
YieldingBlocks(Box<VecDeque<BlockWithTriggers<Chain>>>),
4443

4544
/// The BlockStream experienced an error and is pausing before attempting to produce
4645
/// blocks again.
@@ -57,16 +56,13 @@ where
5756

5857
/// A single next step to take in reconciling the state of the subgraph store with the state of the
5958
/// chain store.
60-
enum ReconciliationStep<C>
61-
where
62-
C: Blockchain,
63-
{
59+
enum ReconciliationStep {
6460
/// Revert(to) the block the subgraph should be reverted to, so it becomes the new subgraph
6561
/// head.
6662
Revert(BlockPtr),
6763

6864
/// Move forwards, processing one or more blocks. Second element is the block range size.
69-
ProcessDescendantBlocks(Vec<BlockWithTriggers<C>>, BlockNumber),
65+
ProcessDescendantBlocks(Vec<BlockWithTriggers<Chain>>, BlockNumber),
7066

7167
/// This step is a no-op, but we need to check again for a next step.
7268
Retry,
@@ -76,18 +72,15 @@ where
7672
Done,
7773
}
7874

79-
struct PollingBlockStreamContext<C>
80-
where
81-
C: Blockchain,
82-
{
75+
struct PollingBlockStreamContext {
8376
chain_store: Arc<dyn ChainStore>,
84-
adapter: Arc<TriggersAdapterWrapper<C>>,
77+
adapter: Arc<TriggersAdapterWrapper<Chain>>,
8578
node_id: NodeId,
8679
subgraph_id: DeploymentHash,
8780
// This is not really a block number, but the (unsigned) difference
8881
// between two block numbers
8982
reorg_threshold: BlockNumber,
90-
filter: Arc<TriggerFilterWrapper<C>>,
83+
filter: Arc<TriggerFilterWrapper<Chain>>,
9184
start_blocks: Vec<BlockNumber>,
9285
logger: Logger,
9386
previous_triggers_per_block: f64,
@@ -100,7 +93,7 @@ where
10093
current_block: Option<BlockPtr>,
10194
}
10295

103-
impl<C: Blockchain> Clone for PollingBlockStreamContext<C> {
96+
impl Clone for PollingBlockStreamContext {
10497
fn clone(&self) -> Self {
10598
Self {
10699
chain_store: self.chain_store.cheap_clone(),
@@ -121,37 +114,31 @@ impl<C: Blockchain> Clone for PollingBlockStreamContext<C> {
121114
}
122115
}
123116

124-
pub struct PollingBlockStream<C: Blockchain> {
125-
state: BlockStreamState<C>,
117+
pub struct PollingBlockStream {
118+
state: BlockStreamState,
126119
consecutive_err_count: u32,
127120
chain_head_update_stream: ChainHeadUpdateStream,
128-
ctx: PollingBlockStreamContext<C>,
121+
ctx: PollingBlockStreamContext,
129122
}
130123

131124
// This is the same as `ReconciliationStep` but without retries.
132-
enum NextBlocks<C>
133-
where
134-
C: Blockchain,
135-
{
125+
enum NextBlocks {
136126
/// Blocks and range size
137-
Blocks(VecDeque<BlockWithTriggers<C>>, BlockNumber),
127+
Blocks(VecDeque<BlockWithTriggers<Chain>>, BlockNumber),
138128

139129
// The payload is block the subgraph should be reverted to, so it becomes the new subgraph head.
140130
Revert(BlockPtr),
141131
Done,
142132
}
143133

144-
impl<C> PollingBlockStream<C>
145-
where
146-
C: Blockchain,
147-
{
134+
impl PollingBlockStream {
148135
pub fn new(
149136
chain_store: Arc<dyn ChainStore>,
150137
chain_head_update_stream: ChainHeadUpdateStream,
151-
adapter: Arc<TriggersAdapterWrapper<C>>,
138+
adapter: Arc<TriggersAdapterWrapper<Chain>>,
152139
node_id: NodeId,
153140
subgraph_id: DeploymentHash,
154-
filter: Arc<TriggerFilterWrapper<C>>,
141+
filter: Arc<TriggerFilterWrapper<Chain>>,
155142
start_blocks: Vec<BlockNumber>,
156143
reorg_threshold: BlockNumber,
157144
logger: Logger,
@@ -184,12 +171,9 @@ where
184171
}
185172
}
186173

187-
impl<C> PollingBlockStreamContext<C>
188-
where
189-
C: Blockchain,
190-
{
174+
impl PollingBlockStreamContext {
191175
/// Perform reconciliation steps until there are blocks to yield or we are up-to-date.
192-
async fn next_blocks(&self) -> Result<NextBlocks<C>, Error> {
176+
async fn next_blocks(&self) -> Result<NextBlocks, Error> {
193177
let ctx = self.clone();
194178

195179
loop {
@@ -214,7 +198,7 @@ where
214198
}
215199

216200
/// Determine the next reconciliation step. Does not modify Store or ChainStore.
217-
async fn get_next_step(&self) -> Result<ReconciliationStep<C>, Error> {
201+
async fn get_next_step(&self) -> Result<ReconciliationStep, Error> {
218202
let ctx = self.clone();
219203
let start_blocks = self.start_blocks.clone();
220204
let max_block_range_size = self.max_block_range_size;
@@ -500,14 +484,14 @@ where
500484
}
501485
}
502486

503-
impl<C: Blockchain> BlockStream<C> for PollingBlockStream<C> {
487+
impl BlockStream<Chain> for PollingBlockStream {
504488
fn buffer_size_hint(&self) -> usize {
505489
BUFFERED_BLOCK_STREAM_SIZE
506490
}
507491
}
508492

509-
impl<C: Blockchain> Stream for PollingBlockStream<C> {
510-
type Item = Result<BlockStreamEvent<C>, BlockStreamError>;
493+
impl Stream for PollingBlockStream {
494+
type Item = Result<BlockStreamEvent<Chain>, BlockStreamError>;
511495

512496
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
513497
let result = loop {

0 commit comments

Comments
 (0)