In the Rust language, structs can be serialised to multiple formats using a library called serde. In the
last post we saved our word2vec model into a binary format using serde and bincode. In this post, we will look into serialising with serde and avro-rs, an avro implementation in rust and implement sending serialised objects over a tcp stream using framing.
As usual the code is on [:github:]
Data (De) Serialisation
Avro supports several data types for serialisation including more complex types such as arrays.
#[derive(Debug, Deserialize, Serialize)]
pub struct Test {
a: i32,
b: f64,
c: String,
d: Vec<i32>
}
fn schema() -> Schema {
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "int", "default": 0},
{"name": "b", "type": "double", "default": 1.0},
{"name": "c", "type": "string", "default": "dansen"},
{"name": "d", "type": {"type": "array", "items": "int"}}
]
}
"#;
avro_rs::Schema::parse_str(raw_schema).unwrap()
}
In the code above, we define a struct that we want to serialise and define the serialisation schema in json.
Nothing more is needed. With this information we can serialise and deserialise the `Test` struct. We
can even (de) serialise a collection of these structs. The serialisation results in a vector of bytes and
given this vector of bytes we can deserialise into a vector of our original struct.
#[derive(Debug, Deserialize, Serialize)]
fn write_data(data: &[Test], schema: Schema) -> Vec<u8> {
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);
for x in data.iter() {
writer.append_ser(x).unwrap();
}
writer.flush().unwrap();
writer.into_inner()
}
fn read_data(data: &[u8], schema: Schema) -> Vec<Test> {
let reader = Reader::with_schema(&schema, data).unwrap();
reader
.map(|record| from_value::<Test>(&record.unwrap()).unwrap())
.collect()
}
Framed Writing and Reading over TCP/IP
In order to send avro over the network, it seems recommended to frame the message. That means
breaking our byte stream into chunks we send over, each preceded by it's length and terminated
by a 0 sized chunk.
Writing over the network with a fixed buffer or chunk size can be implemented by sending slices of the given size over the network, each preceded by the length of it (which really only matters for the last one). As described above we send the size of the buffer as a 4-byte integer, big endian.
When reading we simply read the size of the next buffer, then read it and append it into a vector of bytes until we hit the zero size buffer marking the end. Then we simply use the method above to de serialise into a vector of structs
Avro messages are framed as a list of buffers. Framing is a layer between messages and the transport. It exists to optimize certain operations. The format of framed message data is: a series of buffers, where each buffer consists of: a four-byte, big-endian buffer length, followed by that many bytes of buffer data. A message is always terminated by a zero-length buffer. Framing is transparent to request and response message formats (described below). Any message may be presented as a single or multiple buffers. Framing can permit readers to more efficiently get different buffers from different sources and for writers to more efficiently store different buffers to different destinations. In particular, it can reduce the number of times large binary objects are copied. For example, if an RPC parameter consists of a megabyte of file data, that data can be copied directly to a socket from a file descriptor, and, on the other end, it could be written directly to a file descriptor, never entering user space. A simple, recommended, framing policy is for writers to create a new segment whenever a single binary object is written that is larger than a normal output buffer. Small objects are then appended in buffers, while larger objects are written as their own buffers. When a reader then tries to read a large object the runtime can hand it an entire buffer directly, without having to copy it.from Avro Page .
Writing over the network with a fixed buffer or chunk size can be implemented by sending slices of the given size over the network, each preceded by the length of it (which really only matters for the last one). As described above we send the size of the buffer as a 4-byte integer, big endian.
pub fn send_framed(objects: &[Test], address: String, schema: Schema, buf_size: usize) {
let data = write_data(objects, schema);
let mut stream = TcpStream::connect(address).unwrap();
let n = data.len();
for i in (0..n).step_by(buf_size) {
// determine size of bytes to write
let start = i;
let stop = usize::min(i + buf_size, n);
// send length of buffer
let mut buffer_length = [0; 4];
BigEndian::write_u32(&mut buffer_length, (stop - start) as u32);
stream.write(&buffer_length).unwrap();
// write actual data
stream.write(&data[start..stop]).unwrap();
}
// terminate by 0 sized
let mut buffer_length = [0; 4];
BigEndian::write_u32(&mut buffer_length, 0);
stream.write(&buffer_length).unwrap();
stream.flush().unwrap();
}
When reading we simply read the size of the next buffer, then read it and append it into a vector of bytes until we hit the zero size buffer marking the end. Then we simply use the method above to de serialise into a vector of structs
pub fn read_framed(stream: &mut TcpStream, schema: Schema) -> Vec<Test> {
let mut message: Vec<u8> = Vec::new();
let mut first_buf = [0; 4];
stream.read(&mut first_buf).unwrap();
let mut next = BigEndian::read_u32(&first_buf);
// read while we see non empty buffers
while next > 0 {
// append all bytes to final object
let mut object = vec![0; next as usize];
stream.read(&mut object).unwrap();
message.append(&mut object);
// read next buffer size
let mut next_buf = [0; 4];
stream.read(&mut next_buf).unwrap();
next = BigEndian::read_u32(&next_buf);
}
read_data(&message, schema)
}
Thats it, now we can send avro framed over the network and de-serialise on the other end.
Thanks for this helpful example.
ReplyDeleteHow would you deal with union's. I struggle to find a good example on how to model an Avro union in Rust.