diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 0daac24c568..1eb1e6489e5 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -1043,21 +1043,31 @@ impl TriggersAdapterTrait for TriggersAdapter { ptr: BlockPtr, offset: BlockNumber, root: Option, - ) -> Result, Error> { - let block: Option = self + ) -> Result)>, Error> { + let (ptr, data) = match self .chain_store .cheap_clone() .ancestor_block(ptr, offset, root) .await? - .map(|x| x.0) - .map(json::from_value) - .transpose()?; - Ok(block.map(|block| { - BlockFinality::NonFinal(EthereumBlockWithCalls { - ethereum_block: block, - calls: None, - }) - })) + { + Some(pair) => pair, + None => return Ok(None), + }; + + let block = data + .map(|data| json::from_value::(data)) + .transpose()? + .map(Arc::new) + .map(BlockFinality::Final); + + Ok(Some((ptr, block))) + } + + async fn load_block_by_hash( + &self, + _block_hash: &BlockHash, + ) -> Result, Error> { + unimplemented!() } async fn parent_ptr(&self, block: &BlockPtr) -> Result, Error> { diff --git a/chain/ethereum/src/polling_block_stream.rs b/chain/ethereum/src/polling_block_stream.rs index a215f775685..6d8d08f85ea 100644 --- a/chain/ethereum/src/polling_block_stream.rs +++ b/chain/ethereum/src/polling_block_stream.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Error}; -use graph::tokio; +use graph::{bail, tokio}; use std::cmp; use std::collections::VecDeque; use std::pin::Pin; @@ -430,9 +430,9 @@ impl PollingBlockStreamContext { // It's easiest to start over at this point. Ok(ReconciliationStep::Retry) } - Some(head_ancestor) => { + Some((head_ancestor, block)) => { // Check if there was an interceding skipped (null) block. - if head_ancestor.number() != subgraph_ptr.number + 1 { + if head_ancestor.number != subgraph_ptr.number + 1 { warn!( ctx.logger, "skipped block detected: {}", @@ -442,7 +442,16 @@ impl PollingBlockStreamContext { // We stopped one block short, so we'll compare the parent hash to the // subgraph ptr. - if head_ancestor.parent_hash().as_ref() == Some(&subgraph_ptr.hash) { + if head_ancestor.parent_hash == subgraph_ptr.hash { + let head_ancestor = + match self.adapter.load_block_by_hash(&head_ancestor.hash).await? { + Some(blk) => blk, + None => bail!( + "error loading block data for block {:?}", + head_ancestor.hash.hash_hex() + ), + }; + // The subgraph ptr is an ancestor of the head block. // We cannot use an RPC call here to find the first interesting block // due to the race conditions previously mentioned, diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index 5e0b4060d6a..db7ef8b8a5f 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -4,7 +4,7 @@ use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor; use graph::blockchain::substreams_block_stream::SubstreamsBlockStream; use graph::blockchain::{ BasicBlockchainBuilder, BlockIngestor, BlockTime, BlockchainBuilder, BlockchainKind, - NoopDecoderHook, NoopRuntimeAdapter, Trigger, TriggerFilterWrapper, + ExtendedBlockPtr, NoopDecoderHook, NoopRuntimeAdapter, Trigger, TriggerFilterWrapper, }; use graph::cheap_clone::CheapClone; use graph::components::network_provider::ChainName; @@ -325,6 +325,10 @@ impl TriggersAdapterTrait for TriggersAdapter { panic!("Should never be called since not used by FirehoseBlockStream") } + async fn load_block_by_hash(&self, _block_hash: &BlockHash) -> Result> { + unimplemented!() + } + async fn load_block_ptrs_by_numbers( &self, _logger: Logger, @@ -408,7 +412,7 @@ impl TriggersAdapterTrait for TriggersAdapter { _ptr: BlockPtr, _offset: BlockNumber, _root: Option, - ) -> Result, Error> { + ) -> Result, Error> { panic!("Should never be called since FirehoseBlockStream cannot resolve it") } diff --git a/chain/substreams/src/trigger.rs b/chain/substreams/src/trigger.rs index 405b6f8a116..f02ce117d82 100644 --- a/chain/substreams/src/trigger.rs +++ b/chain/substreams/src/trigger.rs @@ -1,7 +1,8 @@ use anyhow::Error; use graph::{ blockchain::{ - self, block_stream::BlockWithTriggers, BlockPtr, EmptyNodeCapabilities, MappingTriggerTrait, + self, block_stream::BlockWithTriggers, BlockPtr, EmptyNodeCapabilities, ExtendedBlockPtr, + MappingTriggerTrait, }, components::{ store::{DeploymentLocator, SubgraphFork}, @@ -131,7 +132,10 @@ impl blockchain::TriggersAdapter for TriggersAdapter { _ptr: BlockPtr, _offset: BlockNumber, _root: Option, - ) -> Result, Error> { + ) -> Result, Error> { + unimplemented!() + } + async fn load_block_by_hash(&self, _block_hash: &BlockHash) -> Result, Error> { unimplemented!() } diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 3189265499f..8b0753ac5b0 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -15,7 +15,9 @@ use thiserror::Error; use tokio::sync::mpsc::{self, Receiver, Sender}; use super::substreams_block_stream::SubstreamsLogData; -use super::{Block, BlockPtr, BlockTime, Blockchain, Trigger, TriggerFilterWrapper}; +use super::{ + Block, BlockPtr, BlockTime, Blockchain, ExtendedBlockPtr, Trigger, TriggerFilterWrapper, +}; use crate::anyhow::Result; use crate::components::store::{BlockNumber, DeploymentLocator, SourceableStore}; use crate::data::subgraph::UnifiedMappingApiVersion; @@ -415,6 +417,10 @@ impl TriggersAdapterWrapper { futures03::future::try_join_all(futures).await } + + pub async fn load_block_by_hash(&self, block_hash: &BlockHash) -> Result> { + self.adapter.load_block_by_hash(block_hash).await + } } fn create_subgraph_trigger_from_entities( @@ -507,7 +513,7 @@ impl TriggersAdapterWrapper { ptr: BlockPtr, offset: BlockNumber, root: Option, - ) -> Result, Error> { + ) -> Result)>, Error> { self.adapter.ancestor_block(ptr, offset, root).await } @@ -593,7 +599,7 @@ impl TriggersAdapterWrapper { #[async_trait] pub trait TriggersAdapter: Send + Sync { - // Return the block that is `offset` blocks before the block pointed to by `ptr` from the local + // Return the block pointer to the block that is `offset` blocks before the block pointed to by `ptr` from the local // cache. An offset of 0 means the block itself, an offset of 1 means the block's parent etc. If // `root` is passed, short-circuit upon finding a child of `root`. If the block is not in the // local cache, return `None`. @@ -602,7 +608,7 @@ pub trait TriggersAdapter: Send + Sync { ptr: BlockPtr, offset: BlockNumber, root: Option, - ) -> Result, Error>; + ) -> Result)>, Error>; // Returns a sequence of blocks in increasing order of block number. // Each block will include all of its triggers that match the given `filter`. @@ -635,6 +641,8 @@ pub trait TriggersAdapter: Send + Sync { /// Get pointer to parent of `block`. This is called when reverting `block`. async fn chain_head_ptr(&self) -> Result, Error>; + async fn load_block_by_hash(&self, block_hash: &BlockHash) -> Result>; + async fn load_block_ptrs_by_numbers( &self, logger: Logger, diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index d53557f8160..217f099ba9e 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -257,9 +257,12 @@ impl TriggersAdapter for MockTriggersAdapter { _ptr: BlockPtr, _offset: BlockNumber, _root: Option, - ) -> Result, Error> { + ) -> Result)>, Error> { todo!() } + async fn load_block_by_hash(&self, _block_hash: &BlockHash) -> Result> { + unimplemented!() + } async fn load_block_ptrs_by_numbers( &self, @@ -531,7 +534,15 @@ impl ChainStore for MockChainStore { _block_ptr: BlockPtr, _offset: BlockNumber, _root: Option, - ) -> Result, Error> { + ) -> Result)>, Error> { + unimplemented!() + } + async fn ancestor_block_ptr( + self: Arc, + _block_ptr: BlockPtr, + _offset: BlockNumber, + _root: Option, + ) -> Result, Error> { unimplemented!() } fn cleanup_cached_blocks( diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index f3e2642e840..b0137a91682 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -385,6 +385,13 @@ impl ExtendedBlockPtr { } } + pub fn as_block_ptr(&self) -> BlockPtr { + BlockPtr { + hash: self.hash.clone(), + number: self.number, + } + } + /// Encodes the block hash into a hexadecimal string **without** a "0x" prefix. /// Hashes are stored in the database in this format. pub fn hash_hex(&self) -> String { @@ -613,7 +620,13 @@ impl BlockTime { (10, 0) }; - u64::from_str_radix(&ts[idx..], radix).map(|ts| BlockTime::since_epoch(ts as i64, 0)) + u64::from_str_radix(&ts[idx..], radix).map(|ts| { + if ts == 0 { + BlockTime::NONE + } else { + BlockTime::since_epoch(ts as i64, 0) + } + }) } /// Construct a block time that is the given number of seconds and diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 83073f557b8..f75e957d625 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -553,7 +553,14 @@ pub trait ChainStore: ChainHeadStore { block_ptr: BlockPtr, offset: BlockNumber, root: Option, - ) -> Result, Error>; + ) -> Result)>, Error>; + + async fn ancestor_block_ptr( + self: Arc, + block_ptr: BlockPtr, + offset: BlockNumber, + root: Option, + ) -> Result, Error>; /// Remove old blocks from the cache we maintain in the database and /// return a pair containing the number of the oldest block retained diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 905568a5637..5cfef42d5f5 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -117,10 +117,11 @@ pub async fn info( let head_block = chain_store.cheap_clone().chain_head_ptr().await?; let ancestor = match &head_block { None => None, - Some(head_block) => chain_store - .ancestor_block(head_block.clone(), offset, None) - .await? - .map(|x| x.1), + Some(head_block) => { + chain_store + .ancestor_block(head_block.clone(), offset, None) + .await? + } }; row("name", chain.name); @@ -132,7 +133,11 @@ pub async fn info( } print_ptr("head block", head_block, hashes); row("reorg threshold", offset); - print_ptr("reorg ancestor", ancestor, hashes); + print_ptr( + "reorg ancestor", + ancestor.map(|ptr| ptr.as_block_ptr()), + hashes, + ); Ok(()) } diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 0ef848aa07d..0aba33786b3 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -98,7 +98,7 @@ mod data { sql_types::{BigInt, Bytea, Integer, Jsonb}, update, }; - use graph::blockchain::{Block, BlockHash, BlockTime}; + use graph::blockchain::{Block, BlockHash, BlockTime, ExtendedBlockPtr}; use graph::data::store::scalar::Bytes; use graph::internal_error; use graph::prelude::ethabi::ethereum_types::H160; @@ -1224,7 +1224,7 @@ mod data { and a.block_offset < $2 {short_circuit_predicate} ) - select a.block_hash as hash, b.number as number + select a.block_hash as hash, b.number as number, b.parent_hash from ancestors a inner join {block_ptrs_table_name} b on a.block_hash = b.hash order by a.block_offset desc limit 1 @@ -1243,13 +1243,17 @@ mod data { block_ptr: BlockPtr, offset: BlockNumber, root: Option, - ) -> Result, Error> { + load_block: bool, + ) -> Result)>, Error> { + const TIMESTAMP_QUERY: &str = + "coalesce(data->'block'->>'timestamp', data->>'timestamp')"; + let short_circuit_predicate = match root { Some(_) => "and b.parent_hash <> $3", None => "", }; - let data_and_ptr = match self { + let ptr: Option = match self { Storage::Shared => { let query = self.ancestor_block_query(short_circuit_predicate, "ethereum_blocks"); @@ -1261,6 +1265,8 @@ mod data { hash: String, #[diesel(sql_type = BigInt)] number: i64, + #[diesel(sql_type = Text)] + parent_hash: String, } let block = match root { @@ -1280,21 +1286,48 @@ mod data { match block { None => None, - Some(block) => Some(( - b::table - .filter(b::hash.eq(&block.hash)) - .select(b::data) - .first::(conn)?, - BlockPtr::new( - BlockHash::from_str(&block.hash)?, - i32::try_from(block.number).unwrap(), - ), - )), + Some(block) => { + if load_block { + let (ts, data) = b::table + .filter(b::hash.eq(&block.hash)) + .select((sql::>(TIMESTAMP_QUERY), b::data)) + .first::<(Option, json::Value)>(conn)?; + let ts = ts + .map(|ts| BlockTime::from_hex_str(&ts)) + .transpose()? + .unwrap_or(BlockTime::NONE); + + return Ok(Some(( + ExtendedBlockPtr { + hash: BlockHash::from_str(&block.hash)?, + number: block.number as i32, + parent_hash: BlockHash::from_str(&block.parent_hash)?, + timestamp: ts, + }, + Some(data), + ))); + } else { + let ts = b::table + .filter(b::hash.eq(&block.hash)) + .select(sql::>(TIMESTAMP_QUERY)) + .first::>(conn)? + .map(|ts| BlockTime::from_hex_str(&ts)) + .transpose()? + .unwrap_or(BlockTime::NONE); + + Some(ExtendedBlockPtr { + hash: BlockHash::from_str(&block.hash)?, + number: block.number as i32, + parent_hash: BlockHash::from_str(&block.parent_hash)?, + timestamp: ts, + }) + } + } } } Storage::Private(Schema { - blocks, block_pointers, + blocks, .. }) => { let query = self.ancestor_block_query( @@ -1308,6 +1341,8 @@ mod data { hash: Vec, #[diesel(sql_type = BigInt)] number: i64, + #[diesel(sql_type = Bytea)] + parent_hash: Vec, } let block = match root { @@ -1325,38 +1360,46 @@ mod data { match block { None => None, - Some(block) => Some(( - blocks - .table() - .filter(blocks.hash().eq(&block.hash)) - .select(blocks.data()) - .first::(conn)?, - BlockPtr::from((block.hash, block.number)), - )), + Some(block) => { + if load_block { + let (ts, data) = block_pointers + .table() + .filter(block_pointers.hash().eq(&block.hash)) + .select((block_pointers.timestamp(), blocks.data().nullable())) + .inner_join( + blocks.table().on(block_pointers.hash().eq(blocks.hash())), + ) + .first::<(BlockTime, Option)>(conn)?; + + return Ok(Some(( + ExtendedBlockPtr { + hash: BlockHash::from(block.hash), + number: block.number as i32, + parent_hash: BlockHash::from(block.parent_hash), + timestamp: ts, + }, + data, + ))); + } else { + let ts = block_pointers + .table() + .filter(block_pointers.hash().eq(&block.hash)) + .select(block_pointers.timestamp()) + .first::(conn)?; + + Some(ExtendedBlockPtr { + hash: BlockHash::from(block.hash), + number: block.number as i32, + parent_hash: BlockHash::from(block.parent_hash), + timestamp: ts, + }) + } + } } } }; - // We need to deal with chain stores where some entries have a - // toplevel 'blocks' field and others directly contain what - // would be in the 'blocks' field. Make sure the value we return - // has a 'block' entry - // - // see also 7736e440-4c6b-11ec-8c4d-b42e99f52061 - let data_and_ptr = { - use graph::prelude::serde_json::json; - - data_and_ptr.map(|(data, ptr)| { - ( - match data.get("block") { - Some(_) => data, - None => json!({ "block": data, "transaction_receipts": [] }), - }, - ptr, - ) - }) - }; - Ok(data_and_ptr) + Ok(ptr.map(|ptr| (ptr, None))) } pub(super) fn delete_blocks_before( @@ -2605,7 +2648,7 @@ impl ChainStoreTrait for ChainStore { block_ptr: BlockPtr, offset: BlockNumber, root: Option, - ) -> Result, Error> { + ) -> Result)>, Error> { ensure!( block_ptr.number >= offset, "block offset {} for block `{}` points to before genesis block", @@ -2614,12 +2657,9 @@ impl ChainStoreTrait for ChainStore { ); // Check the local cache first. - let block_cache = self - .recent_blocks_cache - .get_ancestor(&block_ptr, offset) - .and_then(|x| Some(x.0).zip(x.1)); - if let Some((ptr, data)) = block_cache { - return Ok(Some((data, ptr))); + let block_cache = self.recent_blocks_cache.get_ancestor(&block_ptr, offset); + if let Some(ptr) = block_cache { + return Ok(Some(ptr)); } let block_ptr_clone = block_ptr.clone(); @@ -2629,7 +2669,42 @@ impl ChainStoreTrait for ChainStore { .with_conn(move |conn, _| { chain_store .storage - .ancestor_block(conn, block_ptr_clone, offset, root) + .ancestor_block(conn, block_ptr_clone, offset, root, true) + .map_err(StoreError::from) + .map_err(CancelableError::from) + }) + .await + .map_err(Into::into) + } + + async fn ancestor_block_ptr( + self: Arc, + block_ptr: BlockPtr, + offset: BlockNumber, + root: Option, + ) -> Result, Error> { + ensure!( + block_ptr.number >= offset, + "block offset {} for block `{}` points to before genesis block", + offset, + block_ptr.hash_hex() + ); + + // Check the local cache first. + let block_cache = self.recent_blocks_cache.get_ancestor(&block_ptr, offset); + if let Some((ptr, _)) = block_cache { + return Ok(Some(ptr)); + } + + let block_ptr_clone = block_ptr.clone(); + let chain_store = self.cheap_clone(); + + self.pool + .with_conn(move |conn, _| { + chain_store + .storage + .ancestor_block(conn, block_ptr_clone, offset, root, false) + .map(|b| b.map(|b| b.0)) .map_err(StoreError::from) .map_err(CancelableError::from) }) @@ -2802,6 +2877,8 @@ impl ChainStoreTrait for ChainStore { } mod recent_blocks_cache { + use serde_json::Value; + use super::*; use std::collections::BTreeMap; @@ -2833,7 +2910,7 @@ mod recent_blocks_cache { &self, child_ptr: &BlockPtr, offset: BlockNumber, - ) -> Option<(&BlockPtr, Option<&json::Value>)> { + ) -> Option<(ExtendedBlockPtr, Option)> { let child = self.blocks.get(&child_ptr.number)?; if &child.ptr != child_ptr { return None; @@ -2847,7 +2924,18 @@ mod recent_blocks_cache { } child = parent; } - Some((&child.ptr, child.data.as_ref())) + Some(( + ExtendedBlockPtr { + hash: child.ptr.hash.clone(), + number: child.ptr.number, + parent_hash: child.parent_hash.clone(), + timestamp: child + .timestamp() + .and_then(|ts| Some(BlockTime::try_from(ts)))? + .unwrap_or(BlockTime::NONE), + }, + child.data.clone(), + )) } fn chain_head(&self) -> Option<&BlockPtr> { @@ -2919,12 +3007,8 @@ mod recent_blocks_cache { &self, child: &BlockPtr, offset: BlockNumber, - ) -> Option<(BlockPtr, Option)> { - let block_opt = self - .inner - .read() - .get_ancestor(child, offset) - .map(|b| (b.0.clone(), b.1.cloned())); + ) -> Option<(ExtendedBlockPtr, Option)> { + let block_opt = self.inner.read().get_ancestor(child, offset); let inner = self.inner.read(); if block_opt.is_some() { diff --git a/store/test-store/tests/postgres/chain_head.rs b/store/test-store/tests/postgres/chain_head.rs index cf501f1438f..413da9b50ed 100644 --- a/store/test-store/tests/postgres/chain_head.rs +++ b/store/test-store/tests/postgres/chain_head.rs @@ -310,14 +310,19 @@ fn check_ancestor( ))? .ok_or_else(|| anyhow!("block {} has no ancestor at offset {}", child.hash, offset))?; - let act_ptr = act.1; + let act_ptr = act.as_block_ptr(); let exp_ptr = exp.block_ptr(); if exp_ptr != act_ptr { return Err(anyhow!("expected ptr `{}` but got `{}`", exp_ptr, act_ptr)); } - let act_block = json::from_value::(act.0)?; + let act_block = executor::block_on(store.cheap_clone().blocks(vec![act_ptr.hash])) + .expect("block should exist and be loaded"); + assert_eq!(1, act_block.len()); + + let act_block = act_block.into_iter().next().unwrap(); + let act_block = json::from_value::(act_block)?; let act_hash = format!("{:x}", act_block.block.hash.unwrap()); let exp_hash = &exp.hash; diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index cc99e406c1c..4c6c6677594 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -13,8 +13,8 @@ use graph::blockchain::block_stream::{ BlockWithTriggers, FirehoseCursor, }; use graph::blockchain::{ - Block, BlockHash, BlockPtr, Blockchain, BlockchainMap, ChainIdentifier, RuntimeAdapter, - TriggerFilterWrapper, TriggersAdapter, TriggersAdapterSelector, + Block, BlockHash, BlockPtr, Blockchain, BlockchainMap, ChainIdentifier, ExtendedBlockPtr, + RuntimeAdapter, TriggerFilterWrapper, TriggersAdapter, TriggersAdapterSelector, }; use graph::cheap_clone::CheapClone; use graph::components::link_resolver::{ArweaveClient, ArweaveResolver, FileSizeLimit}; @@ -985,10 +985,14 @@ impl TriggersAdapter for MockTriggersAdapter { _ptr: BlockPtr, _offset: BlockNumber, _root: Option, - ) -> Result::Block>, Error> { + ) -> Result, Error> { todo!() } + async fn load_block_by_hash(&self, _block_hash: &BlockHash) -> Result, Error> { + unimplemented!() + } + async fn load_block_ptrs_by_numbers( &self, _logger: Logger,