Skip to content

Kubernetes Operators & CLI Tools DevOps

Infrastructure as Code với Rust — Operators và CLI tools cấp độ production

1. Kubernetes Operators với kube-rs

Operator = Controller theo dõi Custom Resources và reconcile state.

Tại sao viết Operator bằng Rust?

AspectGo (kubebuilder)Rust (kube-rs)
Memory50-100MB10-20MB
Binary20-50MB5-10MB
Startup500ms+50ms
Type SafetyRuntime panicsCompile-time

Setup

bash
cargo new my-operator
cd my-operator
toml
# Cargo.toml
[dependencies]
kube = { version = "0.87", features = ["runtime", "derive"] }
kube-runtime = "0.87"
k8s-openapi = { version = "0.20", features = ["v1_28"] }
tokio = { version = "1", features = ["full"] }
schemars = "0.8"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "1"
tracing = "0.1"
tracing-subscriber = "0.3"

Define Custom Resource Definition (CRD)

rust
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};

/// Spec của MyApp CRD
#[derive(CustomResource, Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[kube(
    group = "example.com",
    version = "v1",
    kind = "MyApp",
    namespaced,
    status = "MyAppStatus",
    printcolumn = r#"{"name":"Replicas","type":"integer","jsonPath":".spec.replicas"}"#
)]
pub struct MyAppSpec {
    pub replicas: i32,
    pub image: String,
    #[serde(default)]
    pub env: Vec<EnvVar>,
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct EnvVar {
    pub name: String,
    pub value: String,
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
pub struct MyAppStatus {
    pub ready_replicas: i32,
    pub conditions: Vec<Condition>,
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct Condition {
    pub r#type: String,
    pub status: String,
    pub message: Option<String>,
}

Reconciler Logic

rust
use kube::{Api, Client, ResourceExt};
use kube_runtime::controller::{Action, Controller};
use kube_runtime::watcher::Config;
use std::sync::Arc;
use tokio::time::Duration;

// Error type for reconciliation
#[derive(Debug, thiserror::Error)]
pub enum Error {
    #[error("Kubernetes API error: {0}")]
    Kube(#[from] kube::Error),
    #[error("Missing object key: {0}")]
    MissingObjectKey(&'static str),
}

// Context shared across reconciliations
pub struct Context {
    client: Client,
}

/// Reconcile function: Called when CRD changes
async fn reconcile(myapp: Arc<MyApp>, ctx: Arc<Context>) -> Result<Action, Error> {
    let name = myapp.name_any();
    let namespace = myapp.namespace().ok_or(Error::MissingObjectKey("namespace"))?;
    
    tracing::info!("Reconciling MyApp: {}/{}", namespace, name);
    
    let client = &ctx.client;
    let deployments: Api<Deployment> = Api::namespaced(client.clone(), &namespace);
    
    // Desired state from CRD
    let desired_replicas = myapp.spec.replicas;
    let desired_image = &myapp.spec.image;
    
    // Create or update Deployment
    let deployment = build_deployment(&myapp)?;
    
    match deployments.get(&name).await {
        Ok(existing) => {
            // Update if different
            if needs_update(&existing, &deployment) {
                deployments.replace(&name, &PostParams::default(), &deployment).await?;
                tracing::info!("Updated deployment {}", name);
            }
        }
        Err(kube::Error::Api(ae)) if ae.code == 404 => {
            // Create new
            deployments.create(&PostParams::default(), &deployment).await?;
            tracing::info!("Created deployment {}", name);
        }
        Err(e) => return Err(e.into()),
    }
    
    // Requeue after 5 minutes for periodic reconciliation
    Ok(Action::requeue(Duration::from_secs(300)))
}

fn error_policy(myapp: Arc<MyApp>, error: &Error, _ctx: Arc<Context>) -> Action {
    tracing::error!("Reconciliation error for {}: {:?}", myapp.name_any(), error);
    // Requeue after 1 minute on error
    Action::requeue(Duration::from_secs(60))
}
)}

Main: Start Controller

rust
#[tokio::main]
async fn main() -> anyhow::Result<()> {
    tracing_subscriber::fmt::init();
    
    let client = Client::try_default().await?;
    let myapps: Api<MyApp> = Api::all(client.clone());
    
    let context = Arc::new(Context { client: client.clone() });
    
    // Install CRD if needed (dev only)
    // install_crd(&client).await?;
    
    tracing::info!("Starting MyApp operator");
    
    Controller::new(myapps, Config::default())
        .shutdown_on_signal()
        .run(reconcile, error_policy, context)
        .for_each(|res| async move {
            match res {
                Ok(o) => tracing::debug!("Reconciled: {:?}", o),
                Err(e) => tracing::error!("Reconcile failed: {:?}", e),
            }
        })
        .await;
    
    Ok(())
}

Generate CRD YAML

rust
// In build.rs or separate script
fn main() {
    let crd = MyApp::crd();
    println!("{}", serde_yaml::to_string(&crd).unwrap());
}
bash
cargo run --bin generate-crd > crd.yaml
kubectl apply -f crd.yaml

2. High-Performance CLI Tools

Thay thế Bash/Python scripts bằng Rust CLI.

Setup với clap

toml
# Cargo.toml
[dependencies]
clap = { version = "4", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
indicatif = "0.17"      # Progress bars
console = "0.15"        # Terminal colors
dialoguer = "0.11"      # Interactive prompts
anyhow = "1"

Basic CLI Structure

rust
use clap::{Parser, Subcommand};

#[derive(Parser)]
#[command(name = "devtool")]
#[command(author, version, about, long_about = None)]
struct Cli {
    /// Verbose output
    #[arg(short, long, global = true)]
    verbose: bool,
    
    #[command(subcommand)]
    command: Commands,
}

#[derive(Subcommand)]
enum Commands {
    /// Deploy to environment
    Deploy {
        /// Target environment
        #[arg(short, long)]
        env: String,
        
        /// Docker image tag
        #[arg(short, long, default_value = "latest")]
        tag: String,
    },
    
    /// Run database migrations
    Migrate {
        /// Migration direction
        #[arg(value_enum)]
        direction: MigrateDirection,
    },
    
    /// Health check all services
    Health,
}

#[derive(Clone, clap::ValueEnum)]
enum MigrateDirection {
    Up,
    Down,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let cli = Cli::parse();
    
    match cli.command {
        Commands::Deploy { env, tag } => {
            deploy(&env, &tag, cli.verbose).await?;
        }
        Commands::Migrate { direction } => {
            migrate(direction).await?;
        }
        Commands::Health => {
            health_check().await?;
        }
    }
    
    Ok(())
}

Progress Bars và Parallel Execution

rust
use indicatif::{ProgressBar, ProgressStyle, MultiProgress};
use tokio::task::JoinSet;

async fn deploy(env: &str, tag: &str, verbose: bool) -> anyhow::Result<()> {
    let services = vec!["api", "worker", "scheduler", "frontend"];
    
    let multi = MultiProgress::new();
    let style = ProgressStyle::default_bar()
        .template("{spinner:.green} [{bar:40.cyan/blue}] {msg}")
        .unwrap();
    
    let mut handles = JoinSet::new();
    
    for service in services {
        let pb = multi.add(ProgressBar::new(100));
        pb.set_style(style.clone());
        pb.set_message(format!("Deploying {}", service));
        
        let env = env.to_string();
        let tag = tag.to_string();
        let service = service.to_string();
        
        handles.spawn(async move {
            // Simulate deploy steps
            for i in 0..100 {
                pb.set_position(i);
                tokio::time::sleep(Duration::from_millis(50)).await;
            }
            pb.finish_with_message(format!("{}", service));
            Ok::<_, anyhow::Error>(())
        });
    }
    
    while let Some(result) = handles.join_next().await {
        result??;
    }
    
    println!("✅ All services deployed to {}", env);
    Ok(())
}

Interactive Prompts

rust
use dialoguer::{Confirm, Select, Input, theme::ColorfulTheme};

async fn interactive_deploy() -> anyhow::Result<()> {
    let envs = vec!["development", "staging", "production"];
    
    let env_idx = Select::with_theme(&ColorfulTheme::default())
        .with_prompt("Select environment")
        .items(&envs)
        .default(0)
        .interact()?;
    
    let tag: String = Input::with_theme(&ColorfulTheme::default())
        .with_prompt("Docker tag")
        .default("latest".to_string())
        .interact_text()?;
    
    if envs[env_idx] == "production" {
        let confirm = Confirm::with_theme(&ColorfulTheme::default())
            .with_prompt("⚠️ Deploy to PRODUCTION?")
            .default(false)
            .interact()?;
        
        if !confirm {
            println!("Aborted.");
            return Ok(());
        }
    }
    
    deploy(envs[env_idx], &tag, true).await
}

3. Best Practices

Shell Completion Generation

rust
use clap::CommandFactory;
use clap_complete::{generate, Shell};

fn generate_completions() {
    let mut cmd = Cli::command();
    generate(Shell::Bash, &mut cmd, "devtool", &mut std::io::stdout());
}

Self-Update Mechanism

rust
#[cfg(feature = "self-update")]
async fn self_update() -> anyhow::Result<()> {
    use self_update::cargo_crate_version;
    
    let status = self_update::backends::github::Update::configure()
        .repo_owner("mycompany")
        .repo_name("devtool")
        .bin_name("devtool")
        .current_version(cargo_crate_version!())
        .build()?
        .update()?;
    
    println!("Updated to version {}", status.version());
    Ok(())
}

Bảng Tóm tắt

ToolCrateUse Case
kube-rskube, kube-runtimeKubernetes Operators
clapclap + deriveCLI argument parsing
indicatifindicatifProgress bars
dialoguerdialoguerInteractive prompts
consoleconsoleTerminal colors/styles
self_updateself_updateAuto-update mechanism