The cargo
configuration file for the project.
[package]
name = "container-stats"
version = "0.1.0"
edition = "2024"
[dependencies]
clap = { version = "4.0", features = ["derive"] }
chrono = "0.4.41"
questdb-rs = { version = "4.0.4", features = ["chrono_timestamp"] }
serde = { version = "1.0.140", features = ["derive"] }
serde_json = "1.0.140"
substring = "1.4.5"
The process entry point implementation. Sets up simple command line arguments, and retrieves the statistics for currently running containers.
mod ilp;
mod stats;
use clap::Parser;
use std::process::Command;
use std::io::{BufRead, BufReader};
use ilp::publish;
use stats::{RawStats, createStats};
use stats::Stats;
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Cli {
#[arg(short, long, default_value = "containerStats")]
table: String,
#[arg(short = 'n', long = "node")]
host: String,
#[arg(short, long, default_value = "localhost")]
questdb: String
}
fn main()
{
let args = Cli::parse();
let output = Command::new("docker").arg("stats").arg("--no-stream").arg("--format=json").output().expect("failed to execute process");
let mut reader = BufReader::new(output.stdout.as_slice());
let mut vec : Vec<Stats> = Vec::new();
let mut line = String::new();
while reader.read_line(&mut line).unwrap() > 0
{
let raw : RawStats = serde_json::from_str(&line.trim()).expect("failed to parse json");
let stats = createStats(&raw);
if cfg!(target_os = "macos") { println!("{:?}", stats); }
vec.push(stats);
line.clear();
}
if vec.len() > 0 { publish(&args, &vec).expect("Failed to publish stats"); }
}
Structures used to represent the docker statistics information. A structure captures the raw information, which is then transformed into a processed structure, which is then converted to ILP.
use substring::Substring;
use serde::Deserialize;
#[derive(Deserialize, Debug)]
#[allow(non_snake_case)]
pub struct RawStats
{
pub BlockIO: String,
pub CPUPerc: String,
pub Container: String,
pub ID: String,
pub MemPerc: String,
pub MemUsage: String,
pub Name: String,
pub NetIO: String,
pub PIDs: String
}
#[derive(Deserialize, Debug)]
#[allow(non_snake_case)]
pub struct Measurement
{
pub value: f32,
pub unit: String
}
impl Measurement
{
pub fn new() -> Measurement
{
Measurement{value: 0.0, unit: String::new()}
}
}
#[derive(Deserialize, Debug)]
#[allow(non_snake_case)]
pub struct IO
{
pub incoming: Measurement,
pub outgoing: Measurement
}
impl IO
{
pub fn new() -> IO
{
IO{incoming: Measurement::new(), outgoing: Measurement::new()}
}
}
#[derive(Deserialize, Debug)]
#[allow(non_snake_case)]
pub struct Stats
{
pub id: String,
pub container: String,
pub name: String,
pub blockIO: IO,
pub cpuPercentage: f32,
pub memoryPercentage: f32,
pub memoryUsage: Measurement,
pub totalMemory: Measurement,
pub netIO: IO,
pub pids: u32,
}
#[allow(non_snake_case)]
pub fn createStats(rs: &RawStats) -> Stats
{
fn parseIO(value: &String, ioType: &str) -> IO
{
let mut io = IO::new();
let parts : Vec<&str> = value.split(" / ").collect();
if parts.len() != 2 { return io; }
let parseMeasurement = |part: &str| -> Measurement
{
if part.ends_with("GB") || part.ends_with("gB")
{
let v = part.substring(0, part.len() - 2);
return Measurement { value: v.parse::<f32>().expect(format!("Failed to parse {}", ioType).as_str()), unit: "GB".to_string() };
}
if part.ends_with("MB") || part.ends_with("mB")
{
let v = part.substring(0, part.len() - 2);
return Measurement{value: v.parse::<f32>().expect(format!("Failed to parse {}", ioType).as_str()), unit: "MB".to_string()};
}
if part.ends_with("KB") || part.ends_with("kB")
{
let v = part.substring(0, part.len() - 2);
return Measurement{value: v.parse::<f32>().expect(format!("Failed to parse {}", ioType).as_str()), unit: "KB".to_string()};
}
if part.ends_with("B")
{
let v = part.substring(0, part.len() - 1);
return Measurement{value: v.parse::<f32>().expect(format!("Failed to parse {}", ioType).as_str()), unit: "B".to_string()};
}
return Measurement::new();
};
io.incoming = parseMeasurement(parts[0]);
io.outgoing = parseMeasurement(parts[1]);
io
}
fn parseMemory(value: &str) -> Measurement
{
let v = value.substring(0, value.len() - 3);
Measurement{ value: v.parse::<f32>().expect("Failed to parse MemUsage"), unit: value.substring(v.len(), value.len()).to_string() }
}
let mut stats = Stats{ id: rs.ID.clone(), container: rs.Container.clone(), name: rs.Name.clone(),
blockIO: IO::new(), cpuPercentage: 0.0, memoryPercentage: 0.0,
memoryUsage: Measurement::new(), totalMemory: Measurement::new(),
netIO: IO::new(), pids: 0};
stats.cpuPercentage = rs.CPUPerc.replace("%", "").parse::<f32>().expect("Failed to parse CPUPerc");
stats.memoryPercentage = rs.MemPerc.replace("%", "").parse::<f32>().expect("Failed to parse CPUPerc");
stats.pids = rs.PIDs.parse::<u32>().expect("Failed to parse PIDs");
stats.blockIO = parseIO(&rs.BlockIO, "BlockIO");
stats.netIO = parseIO(&rs.NetIO, "NetIO");
let parts : Vec<&str> = rs.MemUsage.split(" / ").collect();
if parts.len() > 0
{
let mem = parseMemory(parts[0]);
stats.memoryUsage.value = mem.value;
stats.memoryUsage.unit = mem.unit.clone();
}
if parts.len() > 1
{
let mem = parseMemory(parts[1]);
stats.totalMemory.value = mem.value;
stats.totalMemory.unit = mem.unit.clone();
}
stats
}
Module with a function that converts the parsed docker statistics information into ILP format and publishes to QuestDB.
use questdb::{
Result,
ingress::{
Sender,
Buffer,
TimestampNanos
},
};
use chrono::Utc;
use super::Cli;
use super::stats::{Measurement, Stats, IO};
pub fn publish(cli: &Cli, stats: &Vec<Stats>) -> Result<()>
{
let now = Utc::now();
let uri = {
if cfg!(target_os = "macos") { "tcp::addr=localhost:9009".to_string() }
else { format!("tcp::addr={}:9009", cli.questdb) }
};
let mut sender = Sender::from_conf(uri)?;
let mut buffer = Buffer::new();
fn add_io(buffer: &mut Buffer, io: &IO, prefix: &str)
{
fn bytes(measurement: &Measurement) -> f64
{
if measurement.unit == "KB" { return (measurement.value * 1024.0) as f64; }
else if measurement.unit == "MB" { return (measurement.value * 1024.0 * 1024.0) as f64; }
else if measurement.unit == "GB" { return (measurement.value * 1024.0 * 1024.0 * 1024.0) as f64; }
measurement.value as f64
}
buffer.column_f64(format!("{}_in", prefix).as_str(), bytes(&io.incoming)).expect(format!("Failed to add incoming {} IO", prefix).as_str());
buffer.column_str(format!("{}_in_unit", prefix).as_str(), "bytes").expect(format!("Failed to add IO {} unit", prefix).as_str());
buffer.column_f64(format!("{}_out", prefix).as_str(), bytes(&io.outgoing)).expect(format!("Failed to add outgoing {} IO", prefix).as_str());
buffer.column_str(format!("{}_out_unit", prefix).as_str(), "bytes").expect(format!("Failed to add IO {} unit", prefix).as_str());
}
fn add_memory(buffer: &mut Buffer, measurement: &Measurement, prefix: &str)
{
if measurement.unit == "B" { buffer.column_f64(prefix, measurement.value as f64).expect("Failed to add memory B"); }
if measurement.unit == "KiB" { buffer.column_f64(prefix, (measurement.value * 1024.0) as f64).expect("Failed to add memory KiB"); }
if measurement.unit == "MiB" { buffer.column_f64(prefix, (measurement.value * 1024.0 * 1024.0) as f64).expect("Failed to add memory MiB"); }
if measurement.unit == "GiB" { buffer.column_f64(prefix, (measurement.value * 1024.0 * 1024.0 * 1024.0) as f64).expect("Failed to add memory GiB"); }
buffer.column_str(format!("{}_unit", prefix).as_str(), "bytes").expect("Failed to add memory unit");
}
for stat in stats
{
buffer.table(cli.table.as_str())?.
symbol("host", cli.host.clone())?.
symbol("container", stat.container.clone())?.
symbol("name", stat.name.clone())?.
column_str("id", stat.id.clone())?.
column_f64("cpu", stat.cpuPercentage as f64)?.
column_f64("memory_percentage", stat.memoryPercentage as f64)?.
column_i64("pids", stat.pids as i64)?;
add_io(&mut buffer, &stat.blockIO, "block_io");
add_io(&mut buffer, &stat.netIO, "net_io");
add_memory(&mut buffer, &stat.memoryUsage, "memory_use");
add_memory(&mut buffer, &stat.totalMemory, "total_memory");
buffer.at(TimestampNanos::from_datetime(now)?)?;
}
sender.flush(&mut buffer)?;
Ok(())
}
A simple script to build the executable that is deployed on our EC2 instances. A commented line is shown which was used to install dependencies to build the application.
#!/bin/sh
# apt-get install musl-tools clang llvm -y
export CC_aarch64_unknown_linux_musl=clang
export AR_aarch64_unknown_linux_musl=llvm-ar
export CARGO_TARGET_AARCH64_UNKNOWN_LINUX_MUSL_RUSTFLAGS="-Clink-self-contained=yes -Clinker=rust-lld"
cargo build --target=aarch64-unknown-linux-musl --release