Skip to content

Commit 121e268

Browse files
committed
store: Do not crash the node on invalid StoreEvent
It is possible to receive invalid store events when a node is starting up because Postgres still has events sent by a previous version of the code. In that case, the best we can do is ignore these events. Once we've seen enough valid events, go back to panicing on invalid events.
1 parent 18f2269 commit 121e268

File tree

1 file changed

+32
-11
lines changed

1 file changed

+32
-11
lines changed

store/postgres/src/store_events.rs

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use futures::sync::mpsc::{channel, Sender};
2-
use std::collections::HashMap;
3-
use std::sync::{Arc, Mutex, RwLock};
2+
use std::sync::{atomic::Ordering, Arc, Mutex, RwLock};
3+
use std::{collections::HashMap, sync::atomic::AtomicUsize};
44
use uuid::Uuid;
55

66
use crate::notification_listener::{NotificationListener, SafeChannelName};
@@ -9,12 +9,14 @@ use graph::prelude::serde_json;
99
use graph::prelude::*;
1010

1111
pub struct StoreEventListener {
12+
logger: Logger,
1213
notification_listener: NotificationListener,
1314
}
1415

1516
impl StoreEventListener {
1617
pub fn new(logger: &Logger, postgres_url: String) -> Self {
1718
StoreEventListener {
19+
logger: logger.clone(),
1820
notification_listener: NotificationListener::new(
1921
logger,
2022
postgres_url,
@@ -34,17 +36,36 @@ impl EventProducer<StoreEvent> for StoreEventListener {
3436
) -> Option<Box<dyn Stream<Item = StoreEvent, Error = ()> + Send>> {
3537
self.notification_listener.take_event_stream().map(
3638
|stream| -> Box<dyn Stream<Item = _, Error = _> + Send> {
37-
Box::new(stream.map(|notification| {
38-
// Create StoreEvent from JSON
39-
let change: StoreEvent = serde_json::from_value(notification.payload.clone())
40-
.unwrap_or_else(|_| {
41-
panic!(
39+
let logger = self.logger.clone();
40+
Box::new(stream.filter_map(move |notification| {
41+
// When graph-node is starting up, it is possible that
42+
// Postgres still has old messages queued up that we
43+
// can't decode anymore. It is safe to skip them; once
44+
// We've seen 10 valid messages, we can assume that
45+
// whatever old messages Postgres had queued have been
46+
// cleared. Seeing an invalid message after that
47+
// definitely indicates trouble.
48+
let num_valid = AtomicUsize::new(0);
49+
serde_json::from_value(notification.payload.clone()).map_or_else(
50+
|_err| {
51+
error!(
52+
&logger,
4253
"invalid store event received from database: {:?}",
4354
notification.payload
44-
)
45-
});
46-
47-
change
55+
);
56+
if num_valid.load(Ordering::SeqCst) > 10 {
57+
panic!(
58+
"invalid store event received from database: {:?}",
59+
notification.payload
60+
);
61+
}
62+
None
63+
},
64+
|change| {
65+
num_valid.fetch_add(1, Ordering::SeqCst);
66+
Some(change)
67+
},
68+
)
4869
}))
4970
},
5071
)

0 commit comments

Comments
 (0)