-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathresp_buf.rs
121 lines (102 loc) · 3.27 KB
/
resp_buf.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use crate::{
resp::{
RespDeserializer, Value, ARRAY_TAG, BLOB_ERROR_TAG, ERROR_TAG, PUSH_TAG, SIMPLE_STRING_TAG,
},
Result,
};
use bytes::{BufMut, Bytes, BytesMut};
use serde::Deserialize;
use std::{fmt, ops::Deref};
/// Represents a [RESP](https://redis.io/docs/reference/protocol-spec/) Buffer incoming from the network
#[derive(Clone)]
pub struct RespBuf(Bytes);
impl RespBuf {
/// Constructs a new `RespBuf` from a `Bytes` buffer
#[inline]
pub fn new(bytes: Bytes) -> Self {
Self(bytes)
}
/// Constructs a new `RespBuf` as a RESP Array from a collection of chunks (byte slices)
pub fn from_chunks(chunks: &Vec<&[u8]>) -> Self {
let mut bytes = BytesMut::new();
bytes.put_u8(ARRAY_TAG);
let mut temp = itoa::Buffer::new();
let str = temp.format(chunks.len());
bytes.put_slice(str.as_bytes());
bytes.put_slice(b"\r\n");
for chunk in chunks {
bytes.put_slice(chunk)
}
Self(bytes.freeze())
}
/// Constructs a new `RespBuf` from a byte slice
#[inline]
pub fn from_slice(data: &[u8]) -> RespBuf {
RespBuf(Bytes::copy_from_slice(data))
}
/// Returns `true` if the RESP Buffer is a push message
#[inline]
pub fn is_push_message(&self) -> bool {
(!self.0.is_empty() && self.0[0] == PUSH_TAG) || self.is_monitor_message()
}
/// Returns `true` if the RESP Buffer is a monitor message
#[inline]
pub fn is_monitor_message(&self) -> bool {
self.0.len() > 1 && self.0[0] == SIMPLE_STRING_TAG && (self.0[1] as char).is_numeric()
}
/// Returns `true` if the RESP Buffer is a Redis error
#[inline]
pub fn is_error(&self) -> bool {
self.0.len() > 1 && (self.0[0] == ERROR_TAG || self.0[0] == BLOB_ERROR_TAG)
}
/// Convert the RESP Buffer to a Rust type `T` by using serde deserialization
#[inline]
pub fn to<'de, T: Deserialize<'de>>(&'de self) -> Result<T> {
let mut deserializer = RespDeserializer::new(&self.0);
T::deserialize(&mut deserializer)
}
/// Returns the internal buffer as a byte slice
#[inline]
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
/// Constructs a new `RespBuf` as a RESP Ok message (+OK\r\n)
#[inline]
pub fn ok() -> RespBuf {
RespBuf(Bytes::from_static(b"+OK\r\n"))
}
/// Constructs a new `RespBuf` as a RESP Nil message (_\r\n)
#[inline]
pub fn nil() -> RespBuf {
RespBuf(Bytes::from_static(b"_\r\n"))
}
}
impl Deref for RespBuf {
type Target = [u8];
#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl fmt::Display for RespBuf {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.to::<Value>() {
Ok(value) => {
let str = format!("{value:?}");
if str.len() > 1000 {
f.write_str(&str[..1000])
} else {
f.write_str(&str)
}
}
Err(e) => f.write_fmt(format_args!("RESP buffer error: {e:?}")),
}
}
}
impl fmt::Debug for RespBuf {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self, f)
}
}