-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathdelta_cache_manager.rs
148 lines (126 loc) · 4.42 KB
/
delta_cache_manager.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
use dashmap::DashMap;
use tokio::sync::broadcast;
use tracing::error;
use unleash_types::client_features::DeltaEvent;
use crate::delta_cache::DeltaCache;
#[derive(Debug, Clone)]
pub enum DeltaCacheUpdate {
Full(String), // environment with a newly inserted cache
Update(String), // environment with an updated delta cache
Deletion(String), // environment removed
}
pub struct DeltaCacheManager {
caches: DashMap<String, DeltaCache>,
update_sender: broadcast::Sender<DeltaCacheUpdate>,
}
impl Default for DeltaCacheManager {
fn default() -> Self {
Self::new()
}
}
impl DeltaCacheManager {
pub fn new() -> Self {
let (tx, _rx) = broadcast::channel::<DeltaCacheUpdate>(16);
Self {
caches: DashMap::new(),
update_sender: tx,
}
}
pub fn subscribe(&self) -> broadcast::Receiver<DeltaCacheUpdate> {
self.update_sender.subscribe()
}
pub fn get(&self, env: &str) -> Option<DeltaCache> {
self.caches.get(env).map(|entry| entry.value().clone())
}
pub fn insert_cache(&self, env: &str, cache: DeltaCache) {
self.caches.insert(env.to_string(), cache);
let _ = self
.update_sender
.send(DeltaCacheUpdate::Full(env.to_string()));
}
pub fn update_cache(&self, env: &str, events: &[DeltaEvent]) {
if let Some(mut cache) = self.caches.get_mut(env) {
cache.add_events(events);
let result = self
.update_sender
.send(DeltaCacheUpdate::Update(env.to_string()));
if let Err(e) = result {
error!("Unexpected broadcast error: {:#?}", e);
}
}
}
pub fn remove_cache(&self, env: &str) {
self.caches.remove(env);
let _ = self
.update_sender
.send(DeltaCacheUpdate::Deletion(env.to_string()));
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::delta_cache::{DeltaCache, DeltaHydrationEvent};
use unleash_types::client_features::{ClientFeature, DeltaEvent, Segment};
#[test]
fn test_insert_and_update_delta_cache() {
let hydration = DeltaHydrationEvent {
event_id: 1,
features: vec![ClientFeature {
name: "feature1".to_string(),
..Default::default()
}],
segments: vec![Segment {
id: 1,
constraints: vec![],
}],
};
let max_length = 5;
let delta_cache = DeltaCache::new(hydration, max_length);
let delta_cache_manager = DeltaCacheManager::new();
let env = "test-env";
let mut rx = delta_cache_manager.subscribe();
delta_cache_manager.insert_cache(env, delta_cache);
match rx.try_recv() {
Ok(DeltaCacheUpdate::Full(e)) => assert_eq!(e, env),
e => panic!("Expected Full update, got {:?}", e),
}
let update_event = DeltaEvent::FeatureUpdated {
event_id: 2,
feature: ClientFeature {
name: "feature1_updated".to_string(),
..Default::default()
},
};
delta_cache_manager.update_cache(env, &[update_event.clone()]);
match rx.try_recv() {
Ok(DeltaCacheUpdate::Update(e)) => assert_eq!(e, env),
e => panic!("Expected Update update, got {:?}", e),
}
let cache = delta_cache_manager.get(env).expect("Cache should exist");
let events = cache.get_events();
assert_eq!(events.last().unwrap(), &update_event);
}
#[test]
fn test_remove_delta_cache() {
let hydration = DeltaHydrationEvent {
event_id: 1,
features: vec![ClientFeature {
name: "feature-a".to_string(),
..Default::default()
}],
segments: vec![],
};
let delta_cache = DeltaCache::new(hydration, 3);
let delta_cache_manager = DeltaCacheManager::new();
let env = "remove-env";
delta_cache_manager.insert_cache(env, delta_cache);
let mut rx = delta_cache_manager.subscribe();
let _ = rx.try_recv();
delta_cache_manager.remove_cache(env);
match rx.try_recv() {
Ok(DeltaCacheUpdate::Deletion(e)) => assert_eq!(e, env),
e => panic!("Expected Deletion update, got {:?}", e),
}
assert!(delta_cache_manager.get(env).is_none());
}
}