Skip to content

Commit 0cd3e1e

Browse files
authored
add support for keyspace notifications (RedisJSON#317)
* add support for keyspace notifications * upgrade redismodule-rs to 0.15.0
1 parent 6c7a9a1 commit 0cd3e1e

File tree

3 files changed

+55
-23
lines changed

3 files changed

+55
-23
lines changed

Cargo.lock

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ serde_json = "1.0"
1414
serde = "1.0"
1515
libc = "0.2"
1616
jsonpath_lib = { git="https://github.com/RedisJSON/jsonpath.git", branch="public-parser" }
17-
redis-module = { version="0.14", features = ["experimental-api"]}
17+
redis-module = { version="0.15", features = ["experimental-api"]}
1818

1919
[features]
2020
# Workaround to allow cfg(feature = "test") in redismodue-rs dependencies:

src/lib.rs

+52-20
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
#[macro_use]
22
extern crate redis_module;
33

4-
use redis_module::native_types::RedisType;
54
use redis_module::raw::RedisModuleTypeMethods;
5+
use redis_module::{native_types::RedisType, NotifyEvent};
66
use redis_module::{raw as rawmod, NextArg};
77
use redis_module::{Context, RedisError, RedisResult, RedisValue, REDIS_OK};
88
use serde_json::{Number, Value};
@@ -41,6 +41,11 @@ static REDIS_JSON_TYPE: RedisType = RedisType::new(
4141
aux_load: None,
4242
aux_save: None,
4343
aux_save_triggers: rawmod::Aux::Before as i32,
44+
45+
free_effort: None,
46+
unlink: None,
47+
copy: None,
48+
defrag: None,
4449
},
4550
);
4651

@@ -69,15 +74,16 @@ fn json_del(ctx: &Context, args: Vec<String>) -> RedisResult {
6974
let key = args.next_string()?;
7075
let path = backwards_compat_path(args.next_string()?);
7176

72-
let key = ctx.open_key_writable(&key);
73-
let deleted = match key.get_value::<RedisJSON>(&REDIS_JSON_TYPE)? {
77+
let redis_key = ctx.open_key_writable(&key);
78+
let deleted = match redis_key.get_value::<RedisJSON>(&REDIS_JSON_TYPE)? {
7479
Some(doc) => {
7580
let res = if path == "$" {
76-
key.delete()?;
81+
redis_key.delete()?;
7782
1
7883
} else {
7984
doc.delete_path(&path)?
8085
};
86+
ctx.notify_keyspace_event(NotifyEvent::MODULE, "json_del", key.as_str());
8187
ctx.replicate_verbatim();
8288
res
8389
}
@@ -116,6 +122,7 @@ fn json_set(ctx: &Context, args: Vec<String>) -> RedisResult {
116122
match (current, set_option) {
117123
(Some(ref mut doc), ref op) => {
118124
if doc.set_value(&value, &path, op, format)? {
125+
ctx.notify_keyspace_event(NotifyEvent::MODULE, "json_set", key.as_str());
119126
ctx.replicate_verbatim();
120127
REDIS_OK
121128
} else {
@@ -127,6 +134,7 @@ fn json_set(ctx: &Context, args: Vec<String>) -> RedisResult {
127134
let doc = RedisJSON::from_str(&value, format)?;
128135
if path == "$" {
129136
redis_key.set_value(&REDIS_JSON_TYPE, doc)?;
137+
ctx.notify_keyspace_event(NotifyEvent::MODULE, "json_set", key.as_str());
130138
ctx.replicate_verbatim();
131139
REDIS_OK
132140
} else {
@@ -256,24 +264,36 @@ fn json_type(ctx: &Context, args: Vec<String>) -> RedisResult {
256264
/// JSON.NUMINCRBY <key> <path> <number>
257265
///
258266
fn json_num_incrby(ctx: &Context, args: Vec<String>) -> RedisResult {
259-
json_num_op(ctx, args, |i1, i2| i1 + i2, |f1, f2| f1 + f2)
267+
json_num_op(ctx, "json_incrby", args, |i1, i2| i1 + i2, |f1, f2| f1 + f2)
260268
}
261269

262270
///
263271
/// JSON.NUMMULTBY <key> <path> <number>
264272
///
265273
fn json_num_multby(ctx: &Context, args: Vec<String>) -> RedisResult {
266-
json_num_op(ctx, args, |i1, i2| i1 * i2, |f1, f2| f1 * f2)
274+
json_num_op(ctx, "json_multby", args, |i1, i2| i1 * i2, |f1, f2| f1 * f2)
267275
}
268276

269277
///
270278
/// JSON.NUMPOWBY <key> <path> <number>
271279
///
272280
fn json_num_powby(ctx: &Context, args: Vec<String>) -> RedisResult {
273-
json_num_op(ctx, args, |i1, i2| i1.pow(i2 as u32), |f1, f2| f1.powf(f2))
281+
json_num_op(
282+
ctx,
283+
"json_numpowby",
284+
args,
285+
|i1, i2| i1.pow(i2 as u32),
286+
|f1, f2| f1.powf(f2),
287+
)
274288
}
275289

276-
fn json_num_op<I, F>(ctx: &Context, args: Vec<String>, op_i64: I, op_f64: F) -> RedisResult
290+
fn json_num_op<I, F>(
291+
ctx: &Context,
292+
cmd: &str,
293+
args: Vec<String>,
294+
op_i64: I,
295+
op_f64: F,
296+
) -> RedisResult
277297
where
278298
I: Fn(i64, i64) -> i64,
279299
F: Fn(f64, f64) -> f64,
@@ -284,15 +304,17 @@ where
284304
let path = backwards_compat_path(args.next_string()?);
285305
let number = args.next_string()?;
286306

287-
let key = ctx.open_key_writable(&key);
307+
let redis_key = ctx.open_key_writable(&key);
288308

289-
key.get_value::<RedisJSON>(&REDIS_JSON_TYPE)?
309+
redis_key
310+
.get_value::<RedisJSON>(&REDIS_JSON_TYPE)?
290311
.ok_or_else(RedisError::nonexistent_key)
291312
.and_then(|doc| {
292313
doc.value_op(&path, |value| {
293314
do_json_num_op(&number, value, &op_i64, &op_f64)
294315
})
295316
.map(|v| {
317+
ctx.notify_keyspace_event(NotifyEvent::MODULE, cmd, key.as_str());
296318
ctx.replicate_verbatim();
297319
v.to_string().into()
298320
})
@@ -360,13 +382,15 @@ fn json_str_append(ctx: &Context, args: Vec<String>) -> RedisResult {
360382
json = path_or_json;
361383
}
362384

363-
let key = ctx.open_key_writable(&key);
385+
let redis_key = ctx.open_key_writable(&key);
364386

365-
key.get_value::<RedisJSON>(&REDIS_JSON_TYPE)?
387+
redis_key
388+
.get_value::<RedisJSON>(&REDIS_JSON_TYPE)?
366389
.ok_or_else(RedisError::nonexistent_key)
367390
.and_then(|doc| {
368391
doc.value_op(&path, |value| do_json_str_append(&json, value))
369392
.map(|v| {
393+
ctx.notify_keyspace_event(NotifyEvent::MODULE, "json_strappend", key.as_str());
370394
ctx.replicate_verbatim();
371395
v.as_str().map_or(usize::MAX, |v| v.len()).into()
372396
})
@@ -401,13 +425,15 @@ fn json_arr_append(ctx: &Context, args: Vec<String>) -> RedisResult {
401425
// We require at least one JSON item to append
402426
args.peek().ok_or(RedisError::WrongArity)?;
403427

404-
let key = ctx.open_key_writable(&key);
428+
let redis_key = ctx.open_key_writable(&key);
405429

406-
key.get_value::<RedisJSON>(&REDIS_JSON_TYPE)?
430+
redis_key
431+
.get_value::<RedisJSON>(&REDIS_JSON_TYPE)?
407432
.ok_or_else(RedisError::nonexistent_key)
408433
.and_then(|doc| {
409434
doc.value_op(&path, |value| do_json_arr_append(args.clone(), value))
410435
.map(|v| {
436+
ctx.notify_keyspace_event(NotifyEvent::MODULE, "json_arrappend", key.as_str());
411437
ctx.replicate_verbatim();
412438
v.as_array().map_or(usize::MAX, |v| v.len()).into()
413439
})
@@ -470,15 +496,17 @@ fn json_arr_insert(ctx: &Context, args: Vec<String>) -> RedisResult {
470496
// We require at least one JSON item to append
471497
args.peek().ok_or(RedisError::WrongArity)?;
472498

473-
let key = ctx.open_key_writable(&key);
499+
let redis_key = ctx.open_key_writable(&key);
474500

475-
key.get_value::<RedisJSON>(&REDIS_JSON_TYPE)?
501+
redis_key
502+
.get_value::<RedisJSON>(&REDIS_JSON_TYPE)?
476503
.ok_or_else(RedisError::nonexistent_key)
477504
.and_then(|doc| {
478505
doc.value_op(&path, |value| {
479506
do_json_arr_insert(args.clone(), index, value)
480507
})
481508
.map(|v| {
509+
ctx.notify_keyspace_event(NotifyEvent::MODULE, "json_arrinsert", key.as_str());
482510
ctx.replicate_verbatim();
483511
v.as_array().map_or(usize::MAX, |v| v.len()).into()
484512
})
@@ -537,14 +565,16 @@ fn json_arr_pop(ctx: &Context, args: Vec<String>) -> RedisResult {
537565
})
538566
.unwrap_or(("$".to_string(), i64::MAX));
539567

540-
let key = ctx.open_key_writable(&key);
568+
let redis_key = ctx.open_key_writable(&key);
541569
let mut res = Value::Null;
542570

543-
key.get_value::<RedisJSON>(&REDIS_JSON_TYPE)?
571+
redis_key
572+
.get_value::<RedisJSON>(&REDIS_JSON_TYPE)?
544573
.ok_or_else(RedisError::nonexistent_key)
545574
.and_then(|doc| {
546575
doc.value_op(&path, |value| do_json_arr_pop(index, &mut res, value))
547576
.map(|v| {
577+
ctx.notify_keyspace_event(NotifyEvent::MODULE, "json_arrpop", key.as_str());
548578
ctx.replicate_verbatim();
549579
v
550580
})
@@ -587,13 +617,15 @@ fn json_arr_trim(ctx: &Context, args: Vec<String>) -> RedisResult {
587617
let start = args.next_i64()?;
588618
let stop = args.next_i64()?;
589619

590-
let key = ctx.open_key_writable(&key);
620+
let redis_key = ctx.open_key_writable(&key);
591621

592-
key.get_value::<RedisJSON>(&REDIS_JSON_TYPE)?
622+
redis_key
623+
.get_value::<RedisJSON>(&REDIS_JSON_TYPE)?
593624
.ok_or_else(RedisError::nonexistent_key)
594625
.and_then(|doc| {
595626
doc.value_op(&path, |value| do_json_arr_trim(start, stop, &value))
596627
.map(|v| {
628+
ctx.notify_keyspace_event(NotifyEvent::MODULE, "json_arrtrim", key.as_str());
597629
ctx.replicate_verbatim();
598630
v.as_array().map_or(usize::MAX, |v| v.len()).into()
599631
})

0 commit comments

Comments
 (0)