DEV Community

Toshiki Teramura
Toshiki Teramura

Posted on

asink; async sink for time-series data

I'd like to share my small library for storing time-series data yielded from numerical simulations.

https://github.com/termoshtt/asink

asink is published to crates.io:

[dependencies]
asink = "0.1.1"
Enter fullscreen mode Exit fullscreen mode

and full document is placed on docs.rs

Sink and mpsc::channel

This library defines Sink, like most logging library, which stores data into the storage.

let sink = msgpack::MsgpackSink::from_str("test.msg");
let (s, th) = sink.run();
// execute simulation...
th.join().unwrap();
Enter fullscreen mode Exit fullscreen mode
  • s generated by sink.run() is a Sender<T> belongs to std::sync::mpsc. As its name, "Multi-Producer, Single-Consumer", mpsc::channel create a pair of copyable Sender<T> and uncopyable Receiver<T>. Sink starts a thread for writing received data from the main thread through this channel onto the storage.
  • The second value th is a thread handler to join this thread. When all copied Sender are droped, the channel is closed, and a finalize the Sink thread.

In this case, the sink is a msgpack file. The data received from main thread is encoded into msgpack and written down to this file. Thanks to the serde library, we can serialize any structure implementing Serialize into JSON/msgpack/BSON. Or, you can automatically implement Serialize trait for your struct using serde-derive

extern crate serde;
#[macro_use]
extern crate serde_derive;

#[derive(Serialize)]
struct Doc {
    time: f64,
    data: Vec<f64>,
}
Enter fullscreen mode Exit fullscreen mode

Working Example

extern crate asink;
extern crate serde;
#[macro_use]
extern crate serde_derive;

use asink::*;
use std::sync::mpsc::Sender;

/// This will be serialized into msgpack
#[derive(Serialize)]
struct Doc {
    id: usize,
    data: Vec<f64>,
}

fn experiment(s: Sender<Doc>) {
    for i in 0..5 {
        let doc = Doc {
            id: i,
            data: vec![i as f64],
        };
        s.send(doc).unwrap(); // Send data to sink
    }
}

fn main() {
    let sink = msgpack::MsgpackSink::from_str("test.msg");
    let (s, th) = sink.run(); // Sink start thread to write recieved data into msgpack
    experiment(s);
    th.join().unwrap();
}
Enter fullscreen mode Exit fullscreen mode

We can switch sink by replacing one line:

let sink = json::JsonSink::from_str("test.json"); // output to JSON file
let sink = mongo::MongoSink:::local("mydb", "collection1"); // output to MongoDB on localhost
Enter fullscreen mode Exit fullscreen mode

This library will be useful both storing large binary data (e.g. velocity/megnetic field) and logging datas (e.g. a residual of iterative method like BiCGStab).

I'd like to get your feedback. Thanks.

Top comments (0)