Saturday, March 16, 2019

Avro Rust: A first look at data serialisation in rust using avro

Avro is a data serialisation system in which you can define a schema (basically the fields in a record you want to serialise along with their type) in json and then send the data binary.




In the Rust language, structs can be serialised to multiple formats using a library called serde. In the
last post  we saved our word2vec model into a binary format using serde and bincode. In this post, we will look into serialising with serde and avro-rs, an avro implementation in rust and implement sending serialised objects over a tcp stream using framing.

As usual the code is on [:github:]


Data (De) Serialisation 

Avro supports several data types for serialisation including more complex types such as arrays.


 #[derive(Debug, Deserialize, Serialize)]
pub struct Test {
    a: i32,
    b: f64,
    c: String,
    d: Vec<i32>
}

fn schema() -> Schema {
    let raw_schema = r#"
            {
                "type": "record",
                "name": "test",
                "fields": [
                    {"name": "a", "type": "int",    "default": 0},
                    {"name": "b", "type": "double", "default": 1.0},
                    {"name": "c", "type": "string", "default": "dansen"},
                    {"name": "d", "type": {"type": "array", "items": "int"}}
                ]
            }
        "#;
    avro_rs::Schema::parse_str(raw_schema).unwrap()
}

In the code above, we define a struct that we want to serialise and define the serialisation schema in json. Nothing more is needed. With this information we can serialise and deserialise the `Test` struct. We can even (de) serialise a collection of these structs. The serialisation results in a vector of bytes and given this vector of bytes we can deserialise into a vector of our original struct.
 #[derive(Debug, Deserialize, Serialize)]
fn write_data(data: &[Test], schema: Schema) -> Vec<u8> {
    let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Deflate);
    for x in data.iter() {
        writer.append_ser(x).unwrap();
    }
    writer.flush().unwrap();
    writer.into_inner()
}

fn read_data(data: &[u8], schema: Schema) -> Vec<Test> {
    let reader = Reader::with_schema(&schema, data).unwrap();
    reader
        .map(|record| from_value::<Test>(&record.unwrap()).unwrap())
        .collect()
}

Framed Writing and Reading over TCP/IP

In order to send avro over the network, it seems recommended to frame the message. That means breaking our byte stream into chunks we send over, each preceded by it's length and terminated by a 0 sized chunk. Avro messages are framed as a list of buffers. Framing is a layer between messages and the transport. It exists to optimize certain operations. The format of framed message data is: a series of buffers, where each buffer consists of: a four-byte, big-endian buffer length, followed by that many bytes of buffer data. A message is always terminated by a zero-length buffer. Framing is transparent to request and response message formats (described below). Any message may be presented as a single or multiple buffers. Framing can permit readers to more efficiently get different buffers from different sources and for writers to more efficiently store different buffers to different destinations. In particular, it can reduce the number of times large binary objects are copied. For example, if an RPC parameter consists of a megabyte of file data, that data can be copied directly to a socket from a file descriptor, and, on the other end, it could be written directly to a file descriptor, never entering user space. A simple, recommended, framing policy is for writers to create a new segment whenever a single binary object is written that is larger than a normal output buffer. Small objects are then appended in buffers, while larger objects are written as their own buffers. When a reader then tries to read a large object the runtime can hand it an entire buffer directly, without having to copy it. from Avro Page .

Writing over the network with a fixed buffer or chunk size can be implemented by sending slices of the given size over the network, each preceded by the length of it (which really only matters for the last one). As described above we send the size of the buffer as a 4-byte integer, big endian.


 pub fn send_framed(objects: &[Test], address: String, schema: Schema, buf_size: usize) {
    let data = write_data(objects, schema);
    let mut stream = TcpStream::connect(address).unwrap();
    let n = data.len();
    for i in (0..n).step_by(buf_size) {
        // determine size of bytes to write
        let start = i;
        let stop = usize::min(i + buf_size, n);
        // send length of buffer
        let mut buffer_length = [0; 4];
        BigEndian::write_u32(&mut buffer_length, (stop - start) as u32);
        stream.write(&buffer_length).unwrap();
        // write actual data
        stream.write(&data[start..stop]).unwrap();
    }
    // terminate by 0 sized
    let mut buffer_length = [0; 4];
    BigEndian::write_u32(&mut buffer_length, 0);
    stream.write(&buffer_length).unwrap();
    stream.flush().unwrap();
}


When reading we simply read the size of the next buffer, then read it and append it into a vector of bytes until we hit the zero size buffer marking the end. Then we simply use the method above to de serialise into a vector of structs
 
pub fn read_framed(stream: &mut TcpStream, schema: Schema) -> Vec<Test> {
    let mut message: Vec<u8> = Vec::new();
    let mut first_buf = [0; 4];
    stream.read(&mut first_buf).unwrap();
    let mut next = BigEndian::read_u32(&first_buf);
    // read while we see non empty buffers
    while next > 0 {
        // append all bytes to final object
        let mut object = vec![0; next as usize];
        stream.read(&mut object).unwrap();
        message.append(&mut object);
        // read next buffer size
        let mut next_buf = [0; 4];
        stream.read(&mut next_buf).unwrap();
        next = BigEndian::read_u32(&next_buf);
    }
    read_data(&message, schema)
}


Thats it, now we can send avro framed over the network and de-serialise on the other end.

1 comment:

  1. Thanks for this helpful example.
    How would you deal with union's. I struggle to find a good example on how to model an Avro union in Rust.

    ReplyDelete