Skip to content

Commit 95ab34b

Browse files
committed
bugfix: named schema resolution
1 parent e842201 commit 95ab34b

File tree

4 files changed

+68
-14
lines changed

4 files changed

+68
-14
lines changed

CHANGELOG.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7-
# [Unreleased]
7+
## [Unreleased]
8+
9+
### Fixed
10+
- Named schema resolution outside of union variants.
11+
12+
### Updated
13+
- Documentation.
814

915
## 0.2.0 - 2020-10-10
1016

src/reader.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ pub(crate) fn decode_with_resolution<R: Read>(
485485
pub(crate) fn decode<R: Read>(
486486
schema: &Variant,
487487
reader: &mut R,
488-
r_cxt: &Registry,
488+
w_cxt: &Registry,
489489
) -> Result<Value, AvrowErr> {
490490
let value = match schema {
491491
Variant::Null => Value::Null,
@@ -538,7 +538,7 @@ pub(crate) fn decode<R: Read>(
538538

539539
let mut it = Vec::with_capacity(block_count as usize);
540540
for _ in 0..block_count {
541-
let decoded = decode(&**items, reader, r_cxt)?;
541+
let decoded = decode(&**items, reader, w_cxt)?;
542542
it.push(decoded);
543543
}
544544

@@ -550,7 +550,7 @@ pub(crate) fn decode<R: Read>(
550550
let mut hm = HashMap::new();
551551
for _ in 0..block_count {
552552
let key = decode_string(reader)?;
553-
let value = decode(values, reader, r_cxt)?;
553+
let value = decode(values, reader, w_cxt)?;
554554
hm.insert(key, value);
555555
}
556556

@@ -560,7 +560,7 @@ pub(crate) fn decode<R: Read>(
560560
let mut v = IndexMap::with_capacity(fields.len());
561561
for (field_name, field) in fields {
562562
let field_name = field_name.to_string();
563-
let field_value = decode(&field.ty, reader, r_cxt)?;
563+
let field_value = decode(&field.ty, reader, w_cxt)?;
564564
let field_value = FieldValue::new(field_value);
565565
v.insert(field_name, field_value);
566566
}
@@ -571,15 +571,22 @@ pub(crate) fn decode<R: Read>(
571571
};
572572
Value::Record(rec)
573573
}
574+
Variant::Fixed { size, .. } => {
575+
let mut buf = vec![0; *size];
576+
reader
577+
.read_exact(&mut buf)
578+
.map_err(AvrowErr::DecodeFailed)?;
579+
Value::Fixed(buf)
580+
}
574581
Variant::Union { variants } => {
575582
let variant_idx: i64 = reader.read_varint().map_err(AvrowErr::DecodeFailed)?;
576-
decode(&variants[variant_idx as usize], reader, r_cxt)?
583+
decode(&variants[variant_idx as usize], reader, w_cxt)?
577584
}
578585
Variant::Named(schema_name) => {
579-
let schema_variant = r_cxt
586+
let schema_variant = w_cxt
580587
.get(schema_name)
581588
.ok_or(AvrowErr::NamedSchemaNotFound)?;
582-
decode(schema_variant, reader, r_cxt)?
589+
decode(schema_variant, reader, w_cxt)?
583590
}
584591
a => {
585592
return Err(AvrowErr::DecodeFailed(Error::new(

src/value.rs

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ impl Record {
7171
}
7272

7373
/// Creates a record from a [BTreeMap](https://doc.rust-lang.org/std/collections/struct.BTreeMap.html) by consuming it.
74-
/// The values in btree must implement Into<Value>. The name provided must match with the name in the record
74+
/// The values in BTreeMap must implement Into<Value>. The name provided must match with the name in the record
7575
/// schema being provided to the writer.
7676
pub fn from_btree<K: Into<String> + Ord + Display, V: Into<Value>>(
7777
name: &str,
@@ -242,9 +242,13 @@ impl Value {
242242
.write_f64::<LittleEndian>(*d)
243243
.map_err(AvrowErr::EncodeFailed)?;
244244
}
245+
(ref value, Variant::Named(name)) => {
246+
if let Some(schema) = cxt.get(name) {
247+
value.encode(writer, schema, cxt)?;
248+
}
249+
}
245250
// Match with union happens first than more specific match arms
246251
(ref value, Variant::Union { variants, .. }) => {
247-
// the get index function returns the index if the value's schema is in the variants of the union
248252
let (union_idx, schema) = resolve_union(&value, &variants, cxt)?;
249253
let union_idx = union_idx as i32;
250254
writer
@@ -301,7 +305,6 @@ impl Value {
301305
.write_varint(idx as i32)
302306
.map_err(AvrowErr::EncodeFailed)?;
303307
} else {
304-
// perf issues on creating error objects?
305308
return Err(AvrowErr::SchemaDataMismatch);
306309
}
307310
}
@@ -639,6 +642,44 @@ mod tests {
639642
let _r = Record::from_btree("test", rec).unwrap();
640643
}
641644

645+
#[derive(Debug, Serialize, Deserialize, PartialEq)]
646+
struct SomeRecord {
647+
one: Vec<u8>,
648+
two: Vec<u8>,
649+
}
650+
651+
#[test]
652+
fn named_schema_resolves() {
653+
let schema = r##"
654+
{
655+
"type": "record",
656+
"name": "SomeRecord",
657+
"aliases": ["MyRecord"],
658+
"fields" : [
659+
{"name": "one", "type":{"type": "fixed", "size": 5, "name": "md5"}},
660+
{"name": "two", "type":"md5"}
661+
]
662+
}
663+
"##;
664+
665+
let schema = crate::Schema::from_str(schema).unwrap();
666+
let mut writer = crate::Writer::with_codec(&schema, vec![], crate::Codec::Null).unwrap();
667+
668+
let value = SomeRecord {
669+
one: vec![0u8, 1, 2, 3, 4],
670+
two: vec![0u8, 1, 2, 3, 4],
671+
};
672+
673+
writer.serialize(&value).unwrap();
674+
675+
let output = writer.into_inner().unwrap();
676+
let reader = crate::Reader::new(output.as_slice()).unwrap();
677+
for i in reader {
678+
let r: SomeRecord = from_value(&i).unwrap();
679+
assert_eq!(r, value);
680+
}
681+
}
682+
642683
#[derive(Debug, Serialize, Deserialize)]
643684
struct Mentees {
644685
id: i32,

src/writer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
//! The Writer is the primary interface for writing values in avro encoded format.
22
33
use crate::codec::Codec;
4-
use crate::schema::Schema;
5-
use crate::value::Value;
64
use crate::config::{DEFAULT_FLUSH_INTERVAL, MAGIC_BYTES, SYNC_MARKER_SIZE};
75
use crate::error::{AvrowErr, AvrowResult};
86
use crate::schema::Registry;
7+
use crate::schema::Schema;
98
use crate::schema::Variant;
109
use crate::serde_avro;
1110
use crate::util::{encode_long, encode_raw_bytes};
1211
use crate::value::Map;
13-
use serde::Serialize;
12+
use crate::value::Value;
1413
use rand::{thread_rng, Rng};
14+
use serde::Serialize;
1515
use std::collections::HashMap;
1616
use std::default::Default;
1717
use std::io::Write;

0 commit comments

Comments
 (0)