From 0069c1451d5ba4d921da736199b6b3bd10a43b31 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 4 Jun 2025 15:57:50 -0700 Subject: [PATCH] Weigh as you go --- graph/src/components/store/write.rs | 85 +++++++++++++++++------------ 1 file changed, 50 insertions(+), 35 deletions(-) diff --git a/graph/src/components/store/write.rs b/graph/src/components/store/write.rs index 76c71ce5e39..f765918c8d2 100644 --- a/graph/src/components/store/write.rs +++ b/graph/src/components/store/write.rs @@ -331,7 +331,11 @@ impl RowGroup { } } - pub fn push(&mut self, emod: EntityModification, block: BlockNumber) -> Result<(), StoreError> { + pub fn push( + &mut self, + emod: EntityModification, + block: BlockNumber, + ) -> Result { let is_forward = self .rows .last() @@ -441,8 +445,10 @@ impl RowGroup { } /// Append `row` to `self.rows` by combining it with a previously - /// existing row, if that is possible - fn append_row(&mut self, row: EntityModification) -> Result<(), StoreError> { + /// existing row, if that is possible. + /// + /// Returns the cache weight that was added by this operation + fn append_row(&mut self, row: EntityModification) -> Result { if self.immutable { match row { EntityModification::Insert { .. } => { @@ -456,10 +462,10 @@ impl RowGroup { )); } } - return Ok(()); + return Ok(0); } - if let Some(prev_row) = self.prev_row_mut(row.id()) { + let weight = if let Some(prev_row) = self.prev_row_mut(row.id()) { use EntityModification::*; if row.block() <= prev_row.block() { @@ -482,7 +488,7 @@ impl RowGroup { // `prev_row` and either ignore `row` since it is not needed, or // turn it into an `Insert`, which also does not require // clamping an old version - match (&*prev_row, &row) { + let weight: usize = match (&*prev_row, &row) { (Insert { end: None, .. } | Overwrite { end: None, .. }, Insert { .. }) | (Remove { .. }, Overwrite { .. }) | ( @@ -493,19 +499,20 @@ impl RowGroup { "impossible combination of entity operations: {:?} and then {:?}", prev_row, row - )) + )); } (Remove { .. }, Remove { .. }) => { // Ignore the new row, since prev_row is already a // delete. This can happen when subgraphs delete // entities without checking if they even exist + 0 } ( Insert { end: Some(_), .. } | Overwrite { end: Some(_), .. } | Remove { .. }, Insert { .. }, ) => { // prev_row was deleted - self.push_row(row); + self.push_row(row) } ( Insert { end: None, .. } | Overwrite { end: None, .. }, @@ -513,24 +520,28 @@ impl RowGroup { ) => { prev_row.clamp(*block)?; let row = row.as_insert(&self.entity_type)?; - self.push_row(row); + self.push_row(row) } (Insert { end: None, .. } | Overwrite { end: None, .. }, Remove { block, .. }) => { prev_row.clamp(*block)?; + 0 } - } + }; + weight } else { - self.push_row(row); - } - Ok(()) + self.push_row(row) + }; + Ok(weight) } - fn push_row(&mut self, row: EntityModification) { + fn push_row(&mut self, row: EntityModification) -> usize { self.last_mod.insert(row.id().clone(), self.rows.len()); + let weight = row.weight(); self.rows.push(row); + weight } - fn append(&mut self, group: RowGroup) -> Result<(), StoreError> { + fn append(&mut self, group: RowGroup) -> Result { if self.entity_type != group.entity_type { return Err(internal_error!( "Can not append a row group for {} to a row group for {}", @@ -540,11 +551,12 @@ impl RowGroup { } self.rows.reserve(group.rows.len()); + let mut weight = 0; for row in group.rows { - self.append_row(row)?; + weight += self.append_row(row)?; } - Ok(()) + Ok(weight) } pub fn ids(&self) -> impl Iterator { @@ -559,11 +571,15 @@ impl RowGroupForPerfTest { Self(RowGroup::new(entity_type, immutable)) } - pub fn push(&mut self, emod: EntityModification, block: BlockNumber) -> Result<(), StoreError> { + pub fn push( + &mut self, + emod: EntityModification, + block: BlockNumber, + ) -> Result { self.0.push(emod, block) } - pub fn append_row(&mut self, row: EntityModification) -> Result<(), StoreError> { + pub fn append_row(&mut self, row: EntityModification) -> Result { self.0.append_row(row) } } @@ -648,11 +664,12 @@ impl RowGroups { self.groups.iter().map(|group| group.row_count()).sum() } - fn append(&mut self, other: RowGroups) -> Result<(), StoreError> { + fn append(&mut self, other: RowGroups) -> Result { + let mut weight = 0; for group in other.groups { - self.group_entry(&group.entity_type).append(group)?; + weight += self.group_entry(&group.entity_type).append(group)?; } - Ok(()) + Ok(weight) } } @@ -750,15 +767,16 @@ impl Batch { let mut mods = RowGroups::new(); + let mut indirect_weight = 0; for m in raw_mods { - mods.group_entry(&m.key().entity_type).push(m, block)?; + indirect_weight += mods.group_entry(&m.key().entity_type).push(m, block)?; } let data_sources = DataSources::new(block_ptr.cheap_clone(), data_sources); let offchain_to_remove = DataSources::new(block_ptr.cheap_clone(), offchain_to_remove); let first_block = block_ptr.number; let block_times = vec![(block, block_time)]; - let mut batch = Self { + let batch = Self { block_ptr, first_block, block_times, @@ -769,9 +787,8 @@ impl Batch { offchain_to_remove, error: None, is_non_fatal_errors_active, - indirect_weight: 0, + indirect_weight, }; - batch.weigh(); Ok(batch) } @@ -783,7 +800,8 @@ impl Batch { self.block_ptr = batch.block_ptr; self.block_times.append(&mut batch.block_times); self.firehose_cursor = batch.firehose_cursor; - self.mods.append(batch.mods)?; + let weight = self.mods.append(batch.mods)?; + self.indirect_weight += weight; self.data_sources.append(batch.data_sources); self.deterministic_errors .append(&mut batch.deterministic_errors); @@ -803,7 +821,7 @@ impl Batch { if let Err(e) = &res { self.error = Some(e.clone()); } - self.weigh(); + // self.weigh(); res } @@ -853,10 +871,6 @@ impl Batch { pub fn groups<'a>(&'a self) -> impl Iterator { self.mods.groups.iter() } - - fn weigh(&mut self) { - self.indirect_weight = self.mods.indirect_weight(); - } } impl CacheWeight for Batch { @@ -1130,11 +1144,12 @@ mod test { } } - fn append(&mut self, mods: &[Mod]) -> Result<(), StoreError> { + fn append(&mut self, mods: &[Mod]) -> Result { + let mut weight = 0; for m in mods { - self.group.append_row(EntityModification::from(m))? + weight += self.group.append_row(EntityModification::from(m))?; } - Ok(()) + Ok(weight) } fn with(mods: &[Mod]) -> Result {