DEV Community

Scott
Scott

Posted on

Using Cap'n Proto on Rust

Introduction

Recently at work, I have been working a lot with gRPC and Golang, and I felt enjoy using gRPC as an application interface. It is simple to implement and works well.. Anyway about 2 weeks ago while reading about various technologies related to my work. I came across Cap'n Proto and was interested to learn that it was created by one of the primary authors of Protocol Buffers v2. Since I was already using Protocol Buffers with gRPC at work, I decided to try out Cap'n Proto + Cap'n Proto RPC with Rust.

What is Cap'n Proto?

Cap'n Proto is a incredibly fast data serialization format. If you want to know more about Cap'n Proto in general, please visit their website!. If you are familiar with Protocol buffers, then a majority of the following concepts will be familiar to you.

My Goal

My goal here is to create a simple API that accepts a list of parameters, and a source directory then return a job ID, or accepts a job ID and returns the results. See the following diagrams.

Alt Text

Alt Text

NOTE: The example built in this article will be split into the following files. main.rs, server.rs, client.rs, build.rs, service.capnp

Cap'n Proto Interface Definition

With a goal in mind, I set out and defined my service.capnp as follows:

@0xeb1bbbd418f18514;

interface Job {
    register @0 Request -> RegistryItem;
    getResult @1 RegistryItem -> Result;
    struct Request {
        parms @0 :List(Text);
        source @1 :Text;
    }
    struct Result {
        union {
            link @0 :Text;
            error @1 :Text;
        }
    }
    struct RegistryItem {
        # Single Job Identity
        id @0 :Text;
    }
}

NOTE: You can write your methods in a more verbose format, however this is not recommended as you will have to pry! multiple times in your server implementation.

Right away you can see we have defined a Job interface with two methods register and getResults, and three data structures Request, Result, and RegistryItem.

To actually use the defined RPC interface, we will need to generate Rust code from the above service.capnp file.

Generate the Code

In order to generate the code, you will need to create a build.rs file as follows.

extern crate capnpc;

fn main() {
    ::capnpc::CompilerCommand::new().file("service.capnp").run().unwrap();
}

This will generate the RPC code from the service.capnp file defined in the previous section, it roughly translates into the following:

  • interface Job {...}

    • Creates pub mod job for client/server communications
  • register @0 Request -> RegistryItem

    • Adds register method to job::Server trait
    • Implements register_request for the job::Client struct to generating a Cap'nProto Request object for the register method.
    • Creates type job::RegisterParams which controls access to incoming data on the server side.
    • creates type job::RegisterResults which controls access to outgoing data on the server side.
  • getResult @1 RegistryItem -> Result

    • Adds get_result method to job::Server trait
    • Implements get_result_request for the job::Client struct to generating a Cap'nProto Request object for the get_result method.
    • Creates type job::GetResultParams which controls access to incoming data on the server side.
    • creates type job::GetResultResults which controls access to outgoing data on the server side.
  • struct Request {...}

    • Creates a pub mod request (job::request) submodule
    • Creates type job::request::Reader containing getters for each capnp parameter
    • Creates type job::request::Builder containing setters for each capnp parameter
  • struct Result {...}

    • Creates a pub mod result (job::result) submodule
    • Creates type job::result::Reader containing getters for each capnp parameter
    • Creates type job::result::Builder containing setters for each capnp parameter
  • struct RegistryItem {...}

    • Creates a pub mod registry_item (job::registry_item) submodule
    • Creates type job::registry_item::Reader containing getters for each capnp parameter
    • Creates type job::registry_item::Builder containing setters for each capnp parameter

Importing the Generated Code

To use the code we just generated, we must import into our source as follows.

use crate::service_capnp::{job};

To build our service we still need to create an RPC client and server.

Creating the Server

The requires implementing 2 things.

  • Create the local implementation of your interface
  • Create your interface methods

Implement the local interface

First things first, we need to create a local struct upon which we can implement our interface methods. This is easy.

struct Job;

Once this is finished we get to the interesting part. The implementation of our methods.

Implement the methods

Cap'nProto methods will always follow the below signature.

impl <imported_rpc_interface>::Server for <local_struct> {
    fn <rpc_method_name>(&mut self, params: <imported_rpc_interface>::<sentence_case_rpc_method_name>Params, mut results: <imported_rpc_interface>::<sentence_case_rpc_method_name>Results)
    -> Promise<(), capnp::Error> {
        // Do Something
    }
}

or in our case, the following..

impl job::Server for Job {
    fn register(&mut self, params: job::RegisterParams, mut results: job::RegisterResults)
    -> Promise<(), capnp::Error> {
        // Do Something
    }

    fn get_result(&mut self, params: job::GetResultParams, mut results: job::GetResultResults)
    -> Promise<(), capnp::Error> {
        // Do Something
    }
}

For the purposes of this article the business logic for the methods is scaled back to printing out received values, and sending back static responses.

impl job::Server for Job {
    fn register(&mut self, params: job::RegisterParams, mut results: job::RegisterResults)
    -> Promise<(), capnp::Error> {
        println!("Processing Register")
        // get a reader object for the sent request
        let request_reader = pry!(params.get());
        // get the value for the sent parms parameter
        let parms = request_reader.get_parms().unwrap();
        // get the value for the sent source parameter
        let source = request_reader.get_source().unwrap();
        for item in parms.iter() {
            // print each parms value
            println!("Parameter: {}", item);
        }
        // print the source value
        println!("DataSource: {}", source);
        // set return value for id
        results.get().set_id("01E3S4Q9SN3VHVXB2KCAGD9P62");
        Promise::ok(())
    }

    fn get_result(&mut self, params: job::GetResultParams, mut results: job::GetResultResults)
    -> Promise<(), capnp::Error> {
        println!("Processing Get Result");
        // get a reader object for the sent request
        let request_reader = pry!(params.get());
        // get the send ID
        let id = request_reader.get_id();
        // print the sent ID
        println!("ID: {}", id);
        // set return value for link
        results.get().set_link("https://link.to.your/output.csv");
        Promise::ok(())
    }
}

NOTE: pry! is a macro from the capnp_rpc crate that acts like try!(), but for functions that return a Promise rather than a Result.

We wrap it all together and build our server::main() with the following.

use capnp::capability::Promise;
use capnp_rpc::{pry, RpcSystem};
use capnp_rpc::twoparty::{VatNetwork};
use capnp_rpc::rpc_twoparty_capnp::{Side};
use async_std::net::{TcpListener};
use futures::{AsyncReadExt, FutureExt, StreamExt};
use futures::task::LocalSpawn;
use crate::service_capnp::{job};

struct Job;

impl job::Server for Job {
    fn register(&mut self, params: job::RegisterParams, mut results: job::RegisterResults)
    -> Promise<(), capnp::Error> {
        println!("Processing Register")
        // get a reader object for the sent request
        let request_reader = pry!(params.get());
        // get the value for the sent parms parameter
        let parms = request_reader.get_parms().unwrap();
        // get the value for the sent source parameter
        let source = request_reader.get_source().unwrap();
        for item in parms.iter() {
            // print each parms value
            println!("Parameter: {}", item);
        }
        // print the source value
        println!("DataSource: {}", source);
        // set return value for id
        results.get().set_id("01E3S4Q9SN3VHVXB2KCAGD9P62");
        Promise::ok(())
    }

    fn get_result(&mut self, params: job::GetResultParams, mut results: job::GetResultResults)
    -> Promise<(), capnp::Error> {
        println!("Processing Get Result");
        // get a reader object for the sent request
        let request_reader = pry!(params.get());
        // get the send ID
        let id = request_reader.get_id();
        // print the sent ID
        println!("ID: {}", id);
        // set return value for link
        results.get().set_link("https://link.to.your/output.csv");
        Promise::ok(())
    }
}

pub fn main(arg_addr: &String) {
    use std::net::ToSocketAddrs;
    // parse input address..
    let addr = arg_addr.to_socket_addrs().unwrap().next().expect("Invalid Address");
    let mut exec = futures::executor::LocalPool::new();
    let spawner = exec.spawner();

    // Start Server
    let _server: Result<(), Box<dyn std::error::Error>> = exec.run_until(async move {
        // Initialize TCP listener @ addr
        let listener = TcpListener::bind(&addr).await?;

        // Initialize local interface defined above
        let job_client = job::ToClient::new(Job).into_client::<capnp_rpc::Server>();

        // Initialize and execute async handler
        let mut incoming = listener.incoming();
        let handle_incoming = async move {
            while let Some(socket_result) = incoming.next().await {
                let sock = socket_result?;
                println!("Accepted connection from {:?}", sock.peer_addr());
                sock.set_nodelay(true)?;
                let (reader, writer) = sock.split();
                let network = VatNetwork::new(reader, writer, Side::Server, Default::default());
                let rpc_system = RpcSystem::new(Box::new(network), Some(job_client.clone().client));
                spawner.spawn_local_obj(Box::pin(rpc_system.map(|_| ())).into())?;
            }
            Ok::<(), Box<dyn std::error::Error>>(())
        };
        handle_incoming.await?;
        Ok(())
    });
}

Creating the Client

Creating the client only requires creating the main loop for sending our requests as follows.

use capnp_rpc::RpcSystem;
use capnp_rpc::twoparty::VatNetwork;
use capnp_rpc::rpc_twoparty_capnp::Side;
use async_std::net::TcpStream;
use futures::{AsyncReadExt, FutureExt};
use futures::task::LocalSpawn;
use crate::service_capnp::{job};

pub fn main(arg_addr: &String) {
    use std::net::ToSocketAddrs;

    // Resolve input server address
    let addr = arg_addr.to_socket_addrs().unwrap().next().expect("Invalid Address");

    // Create a pool for executing async requests
    let mut exec = futures::executor::LocalPool::new();
    let spawner = exec.spawner();

    // Execute the client
    let _client: Result<(), Box<dyn std::error::Error>> = exec.run_until(async move {
        // Create RPC connection to the server
        let stream = TcpStream::connect(&addr).await?;
        stream.set_nodelay(true)?;
        let (reader, writer) = stream.split();
        let network = Box::new(
            VatNetwork::new(reader, writer, Side::Client, Default::default())
        );
        let mut rpc_system = RpcSystem::new(network, None);

        // Setup the client interface
        let job_client: job::Client = rpc_system.bootstrap(Side::Server);
        spawner.spawn_local_obj(Box::pin(rpc_system.map(|_|())).into())?;

        {
            println!("Sending Register Job Request...");

            // Create a register request object
            let mut request = job_client.register_request();

            // Set parms - list parameters require initialization
            let mut parms_setter = request.get().init_parms(2);
            parms_setter.set(0, "dark");
            parms_setter.set(1, "world");

            // Set source - strings do not require prior initialization
            request.get().set_source("/link/to/the/path");

            // Send request, and await response
            let response = request.send().promise.await?;
            println!("Received JobID: {}", response.get().unwrap().get_id().unwrap())
        }

        {
            println!("Sending Get Result Request...");

            // Create get_result request object
            let mut request = job_client.get_result_request();

            // Set JobID
            request.get().set_id("01E3S4Q9SN3VHVXB2KCAGD9P62");

            // Send request, and await response
            let response = request.send().promise.await?;

            // The result is of the union type, so we must determine which type it is
            match response.get()?.which()? {
                job::result::Link(t) => {
                   println!("Received Link: {}", t?);
                }
                job::result::Error(t) =>  {
                   println!("Received Error: {}", t?);
                }
            }
        }
        Ok(())
    });
}

Creating The main()

For the purposes of this article, both the client and server will be part of the same binary. As such, we have a dedicated main.rs which looks as follows.

// Include capnp generated code
pub mod source_capnp {
    include!(concat!(env!("OUT_DIR"), "/source_capnp.rs"));
}

pub mod client;
pub mod server;

fn main() {
    let args: Vec<String> = ::std::env::args().collect();
    if args.len() >= 2 {
        match &args[1][..] {
            "client" => return client::main(&args[2]),
            "server" => return server::main(&args[2]),
            _ => ()
        }
    }
    println!("usage: {} [client | server] ADDRESS", args[0]);
}

Running the server

We will run our server on localhost:2020, and should receive the following output (pre-client)

16:46:22 🖎 export-rust master* ± cargo run server localhost:2020
   Compiling export-rust v0.1.0 (/home/procyclinsur/test/export-rust)
    Finished dev [unoptimized + debuginfo] target(s) in 2.90s
     Running `target/debug/export-rust server 'localhost:2020'`

Running the client

Similarly the client will run against the server running at localhost:2020. When the client is run however there should be output in both the server terminal and the client terminal as follows.

Client

16:50:53 🖎 export-rust master* ± cargo run client localhost:2020
    Finished dev [unoptimized + debuginfo] target(s) in 0.04s
     Running `target/debug/export-rust client 'localhost:2020'`
Sending Register Job Request...
Received JobID: 01E3S4Q9SN3VHVXB2KCAGD9P62
Sending Get Result Request...
https://link.to.your/output.csv

Server

16:46:22 🖎 export-rust master* ± cargo run server localhost:2020
   Compiling export-rust v0.1.0 (/home/procyclinsur/test/export-rust)
    Finished dev [unoptimized + debuginfo] target(s) in 2.90s
     Running `target/debug/export-rust server 'localhost:2020'`
Accepted connection from Ok(V6([::1]:36374))
Processing Register
Parameter: dark
Parameter: world
DataSource: /link/to/the/path
Processing Get Result
ID: 01E3S4Q9SN3VHVXB2KCAGD9P62

As you can see, we are able to send and receive data via our API powered by Cap'n Proto RPC. If you love it, hate it, have any questions, or suggestions for improvement please let me know!

Top comments (1)

Collapse
 
dotxlem profile image
Dan

Thanks for this! Great resource.

Can I ask what you use to make your sequence diagrams? I really like the aesthetic :)