use std::collections::HashMap; use std::collections::HashSet; use std::process::Stdio; use displaydoc::Display; use futures::FutureExt; use futures::StreamExt; use futures::stream::BoxStream; use nix_json::NixBuildLogLine; use nix_json::RawNixDerivationInfoOutput; use nix_json::helpers::NixBuildState; use petgraph::Directed; use petgraph::Graph; use petgraph::acyclic::Acyclic; use petgraph::data::Build; use petgraph::prelude::NodeIndex; use thiserror::Error; use tokio::io::AsyncBufReadExt; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; use tokio::process::Command; #[derive(Debug)] pub struct NixBuildResult { derivation: String, log: String, success: bool, } impl NixBuildResult { pub fn derivation(&self) -> &str { &self.derivation } pub fn log(&self) -> &str { &self.log } pub fn success(&self) -> bool { self.success } } #[derive(Debug, Error, Display)] pub enum NixBuildError { /// No error NoneYet, } pub trait NixInProgressBuild { fn next_log_line( &mut self, ) -> impl Future>>; } pub trait NixBackend { type InProgressBuild<'b>: NixInProgressBuild where Self: 'b; fn start_build( &self, derivation: &str, ) -> impl Future, NixBuildError>>; fn get_needed_builds( &self, derivation: &str, ) -> impl Future>; fn check_if_paths_in_cache( &self, store: String, paths: &[String], ) -> impl Future, NixBuildError>>; } pub struct NixCliBackend { command_path: String, } impl NixCliBackend { pub fn new(command_path: String) -> Self { Self { command_path } } } pub struct NixCliBackendBuild { child: BoxStream<'static, String>, } impl NixInProgressBuild for NixCliBackendBuild { fn next_log_line( &mut self, ) -> impl Future>> { self.child.next().map(|out| { out.map(|line| Ok(serde_json::from_str(line.trim_start_matches("@nix ")).unwrap())) }) } } impl NixBackend for NixCliBackend { type InProgressBuild<'b> = NixCliBackendBuild; async fn start_build( &self, derivation: &str, ) -> Result, NixBuildError> { let mut cmd = Command::new(&self.command_path) .args(["build", "--log-format", "internal-json"]) .arg(derivation) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() .unwrap(); let init = cmd.stderr.take().unwrap(); Ok(NixCliBackendBuild { child: futures::stream::unfold( tokio::io::BufReader::new(init), move |mut state| async move { let mut buffer = String::new(); let num_bytes = state.read_line(&mut buffer).await.unwrap(); if num_bytes == 0 { None } else { Some((std::mem::take(&mut buffer), state)) } }, ) .boxed(), }) } async fn get_needed_builds(&self, derivation: &str) -> Result { let cmd = Command::new(&self.command_path) .args(["derivation", "show", "--recursive"]) .arg(derivation) .output() .await .unwrap(); let output: RawNixDerivationInfoOutput = serde_json::from_slice(&cmd.stdout).unwrap(); let mut build_graph = NixBuildGraph::default(); for path in output.info().keys() { let internal_id = build_graph.dependencies.add_node(path.to_string()); build_graph.build_infos.insert( path.to_string(), NixBuildInfo { internal_id, present_in_binary_cache: false, }, ); } for (path, info) in output.info() { let build_info = &build_graph.build_infos[path]; let cur_node = build_info.internal_id; for dep_path in info.input_derivations.keys() { let other_node = build_graph.build_infos[dep_path].internal_id; build_graph.dependencies.add_edge( cur_node, other_node, NixBuildOutput { output_name: String::new(), }, ); } } Ok(build_graph) } async fn check_if_paths_in_cache( &self, store: String, paths: &[String], ) -> Result, NixBuildError> { let mut cmd = Command::new(&self.command_path) .args(["path-info", "--store", &store, "--json", "--stdin"]) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::null()) .spawn() .unwrap(); let mut stdout = cmd.stdout.take().unwrap(); // Write all to stdin, and then have it drop at the end of the block { let paths = paths.join(" "); let mut stdin = cmd.stdin.take().unwrap(); stdin.write_all(paths.as_ref()).await.unwrap(); stdin.shutdown().await.unwrap(); } println!("Wrote all to stdin!"); let mut output = vec![]; stdout.read_to_end(&mut output).await.unwrap(); tokio::spawn(async move { cmd.wait().await }); let store_info: HashMap> = serde_json::from_slice(&output).unwrap(); Ok(store_info .into_iter() .map(|(k, v)| (k, v.is_some())) .collect()) } } pub struct NixBuilder { backend: B, } #[derive(Debug, Clone)] pub struct NixBuildInfo { internal_id: NodeIndex, present_in_binary_cache: bool, } #[derive(Debug)] pub struct NixBuildOutput { output_name: String, } impl NixBuildOutput { pub fn output_name(&self) -> &str { &self.output_name } } #[derive(Debug, Default)] pub struct NixBuildGraph { build_infos: HashMap, dependencies: Acyclic>, } impl NixBuildGraph { pub fn get_non_binary_builds(&self) -> impl Iterator { self.build_infos .iter() .filter(|(_, value)| !value.present_in_binary_cache) .map(|(key, value)| (key.clone(), value.clone())) } } impl NixBuilder where B: NixBackend, { pub fn new(backend: B) -> Self { Self { backend } } pub async fn get_needed_builds( &self, derivation: &str, ) -> Result { self.backend.get_needed_builds(derivation).await } pub async fn build( &self, derivation: String, ) -> Result, NixBuildError> { let needed_builds = self.backend.get_needed_builds(&derivation).await?; let mut started_build = self.backend.start_build(&derivation).await?; let mut actually_built = HashMap::from_iter(needed_builds.get_non_binary_builds().map(|(k, _v)| { ( k.clone(), NixBuildResult { derivation: k, log: String::new(), success: false, }, ) })); let mut build_state = NixBuildState::default(); while let Some(next_log_line) = started_build.next_log_line().await { let next_log_line = next_log_line?; build_state.handle_log_line(next_log_line); } let mut actually_built_derivations = HashSet::new(); for drv in build_state.derivations() { let built = actually_built.get_mut(drv.derivation_path()).unwrap(); built.success = drv.is_success(); actually_built_derivations.insert(drv.derivation_path()); } actually_built.retain(|k, _| actually_built_derivations.contains(&k.as_ref())); Ok(actually_built) } }