-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy paths3.rs
146 lines (134 loc) · 5.2 KB
/
s3.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#![cfg(feature = "s3-persistence")]
pub mod s3_persister {
use std::collections::HashMap;
use async_trait::async_trait;
use unleash_types::client_features::ClientFeatures;
use crate::persistence::EdgePersistence;
use crate::{
error::EdgeError,
types::{EdgeResult, EdgeToken},
};
use aws_sdk_s3::{
self as s3,
error::SdkError,
operation::{get_object::GetObjectError, put_object::PutObjectError},
primitives::{ByteStream, SdkBody},
};
pub const FEATURES_KEY: &str = "/unleash-features.json";
pub const TOKENS_KEY: &str = "/unleash-tokens.json";
pub struct S3Persister {
client: s3::Client,
bucket: String,
}
impl S3Persister {
pub fn new_with_config(bucket_name: &str, config: s3::config::Config) -> Self {
let client = s3::Client::from_conf(config);
Self {
client,
bucket: bucket_name.to_string(),
}
}
pub async fn new_from_env(bucket_name: &str) -> Self {
let shared_config = aws_config::load_from_env().await;
let client = s3::Client::new(&shared_config);
Self {
client,
bucket: bucket_name.to_string(),
}
}
}
impl From<SdkError<GetObjectError>> for EdgeError {
fn from(err: SdkError<GetObjectError>) -> Self {
EdgeError::PersistenceError(format!("failed to get object {}", err))
}
}
impl From<SdkError<PutObjectError>> for EdgeError {
fn from(err: SdkError<PutObjectError>) -> Self {
EdgeError::PersistenceError(format!("failed to put object {}", err))
}
}
#[async_trait]
impl EdgePersistence for S3Persister {
async fn load_tokens(&self) -> EdgeResult<Vec<EdgeToken>> {
let response = self
.client
.get_object()
.bucket(self.bucket.clone())
.key(TOKENS_KEY)
.response_content_type("application/json")
.send()
.await?;
let data = response.body.collect().await.expect("Failed data");
serde_json::from_slice(&data.to_vec()).map_err(|_| {
EdgeError::PersistenceError("Failed to deserialize tokens".to_string())
})
}
async fn save_tokens(&self, tokens: Vec<EdgeToken>) -> EdgeResult<()> {
let body_data = serde_json::to_vec(&tokens)
.map_err(|_| EdgeError::PersistenceError("Failed to serialize tokens".to_string()))
.map(SdkBody::from)?;
let byte_stream = aws_sdk_s3::primitives::ByteStream::new(body_data);
self.client
.put_object()
.bucket(self.bucket.clone())
.key(TOKENS_KEY)
.body(byte_stream)
.send()
.await
.map(|_| ())
.map_err(|_err| EdgeError::PersistenceError("Failed to save tokens".to_string()))
}
async fn load_features(&self) -> EdgeResult<HashMap<String, ClientFeatures>> {
let query = self
.client
.get_object()
.bucket(self.bucket.clone())
.key(FEATURES_KEY)
.response_content_type("application/json")
.send()
.await
.map_err(|err| {
if err.to_string().contains("NoSuchKey") {
return EdgeError::PersistenceError("No features found".to_string());
}
EdgeError::PersistenceError("Failed to load features".to_string())
});
match query {
Ok(response) => {
let data = response.body.collect().await.expect("Failed data");
let deser: Vec<(String, ClientFeatures)> =
serde_json::from_slice(&data.to_vec()).map_err(|_| {
EdgeError::PersistenceError(
"Failed to deserialize features".to_string(),
)
})?;
Ok(deser
.iter()
.cloned()
.collect::<HashMap<String, ClientFeatures>>())
}
Err(_e) => Ok(HashMap::new()),
}
}
async fn save_features(&self, features: Vec<(String, ClientFeatures)>) -> EdgeResult<()> {
let body_data = serde_json::to_vec(&features).map_err(|_| {
EdgeError::PersistenceError("Failed to serialize features".to_string())
})?;
let byte_stream = ByteStream::new(SdkBody::from(body_data));
match self
.client
.put_object()
.bucket(self.bucket.clone())
.key(FEATURES_KEY)
.body(byte_stream)
.send()
.await
{
Ok(_) => Ok(()),
Err(_s3_err) => Err(EdgeError::PersistenceError(
"Failed to save features".to_string(),
)),
}
}
}
}