diff --git a/Cargo.toml b/Cargo.toml index 6ad92f065..343579876 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ mime_guess = "2" dotenv = "0.15" zstd = "0.5" git2 = { version = "0.13.6", default-features = false } +once_cell = "1.2.0" # Data serialization and deserialization serde = { version = "1.0", features = ["derive"] } @@ -80,7 +81,6 @@ procfs = "0.7" path-slash = "0.1.1" [dev-dependencies] -once_cell = "1.2.0" criterion = "0.3" rand = "0.7.3" diff --git a/src/bin/cratesfyi.rs b/src/bin/cratesfyi.rs index d30a29723..e5e311936 100644 --- a/src/bin/cratesfyi.rs +++ b/src/bin/cratesfyi.rs @@ -3,10 +3,10 @@ use std::path::PathBuf; use std::sync::Arc; use cratesfyi::db::{self, add_path_into_database, Pool}; -use cratesfyi::utils::{add_crate_to_queue, remove_crate_priority, set_crate_priority}; -use cratesfyi::{Config, DocBuilder, DocBuilderOptions, RustwideBuilder, Server}; +use cratesfyi::utils::{remove_crate_priority, set_crate_priority}; +use cratesfyi::{BuildQueue, Config, DocBuilder, DocBuilderOptions, RustwideBuilder, Server}; use failure::Error; -use postgres::Connection; +use once_cell::sync::OnceCell; use structopt::StructOpt; pub fn main() -> Result<(), Error> { @@ -82,26 +82,31 @@ enum CommandLine { impl CommandLine { pub fn handle_args(self) -> Result<(), Error> { - let config = Arc::new(Config::from_env()?); - let pool = Pool::new(&config)?; + let ctx = Context::new(); match self { - Self::Build(build) => build.handle_args(pool), + Self::Build(build) => build.handle_args(ctx)?, Self::StartWebServer { socket_addr, reload_templates, } => { - Server::start(Some(&socket_addr), reload_templates, pool, config)?; + Server::start( + Some(&socket_addr), + reload_templates, + ctx.pool()?, + ctx.config()?, + ctx.build_queue()?, + )?; } Self::Daemon { foreground } => { if foreground { log::warn!("--foreground was passed, but there is no need for it anymore"); } - cratesfyi::utils::start_daemon(config, pool)?; + cratesfyi::utils::start_daemon(ctx.config()?, ctx.pool()?, ctx.build_queue()?)?; } - Self::Database { subcommand } => subcommand.handle_args(&*pool.get()?), - Self::Queue { subcommand } => subcommand.handle_args(&*pool.get()?), + Self::Database { subcommand } => subcommand.handle_args(ctx)?, + Self::Queue { subcommand } => subcommand.handle_args(ctx)?, } Ok(()) @@ -136,19 +141,19 @@ enum QueueSubcommand { } impl QueueSubcommand { - pub fn handle_args(self, conn: &Connection) { + pub fn handle_args(self, ctx: Context) -> Result<(), Error> { match self { Self::Add { crate_name, crate_version, build_priority, - } => { - add_crate_to_queue(&conn, &crate_name, &crate_version, build_priority) - .expect("Could not add crate to queue"); - } + } => ctx + .build_queue()? + .add_crate(&crate_name, &crate_version, build_priority)?, - Self::DefaultPriority { subcommand } => subcommand.handle_args(conn), + Self::DefaultPriority { subcommand } => subcommand.handle_args(ctx)?, } + Ok(()) } } @@ -172,15 +177,15 @@ enum PrioritySubcommand { } impl PrioritySubcommand { - pub fn handle_args(self, conn: &Connection) { + pub fn handle_args(self, ctx: Context) -> Result<(), Error> { match self { Self::Set { pattern, priority } => { - set_crate_priority(&conn, &pattern, priority) + set_crate_priority(&*ctx.conn()?, &pattern, priority) .expect("Could not set pattern's priority"); } Self::Remove { pattern } => { - if let Some(priority) = remove_crate_priority(&conn, &pattern) + if let Some(priority) = remove_crate_priority(&*ctx.conn()?, &pattern) .expect("Could not remove pattern's priority") { println!("Removed pattern with priority {}", priority); @@ -189,6 +194,7 @@ impl PrioritySubcommand { } } } + Ok(()) } } @@ -233,7 +239,7 @@ struct Build { } impl Build { - pub fn handle_args(self, pool: Pool) { + pub fn handle_args(self, ctx: Context) -> Result<(), Error> { let docbuilder = { let mut doc_options = DocBuilderOptions::from_prefix(self.prefix); @@ -249,10 +255,10 @@ impl Build { .check_paths() .expect("The given paths were invalid"); - DocBuilder::new(doc_options, pool.clone()) + DocBuilder::new(doc_options, ctx.pool()?, ctx.build_queue()?) }; - self.subcommand.handle_args(docbuilder, pool); + self.subcommand.handle_args(ctx, docbuilder) } } @@ -300,12 +306,12 @@ enum BuildSubcommand { } impl BuildSubcommand { - pub fn handle_args(self, mut docbuilder: DocBuilder, pool: cratesfyi::db::Pool) { + pub fn handle_args(self, ctx: Context, mut docbuilder: DocBuilder) -> Result<(), Error> { match self { Self::World => { docbuilder.load_cache().expect("Failed to load cache"); - let mut builder = RustwideBuilder::init(pool).unwrap(); + let mut builder = RustwideBuilder::init(ctx.pool()?).unwrap(); builder .build_world(&mut docbuilder) .expect("Failed to build world"); @@ -320,7 +326,7 @@ impl BuildSubcommand { } => { docbuilder.load_cache().expect("Failed to load cache"); let mut builder = - RustwideBuilder::init(pool).expect("failed to initialize rustwide"); + RustwideBuilder::init(ctx.pool()?).expect("failed to initialize rustwide"); if let Some(path) = local { builder @@ -342,25 +348,28 @@ impl BuildSubcommand { Self::UpdateToolchain { only_first_time } => { if only_first_time { - let conn = pool.get().expect("failed to get a database connection"); + let conn = ctx + .pool()? + .get() + .expect("failed to get a database connection"); let res = conn .query("SELECT * FROM config WHERE name = 'rustc_version';", &[]) .unwrap(); if !res.is_empty() { println!("update-toolchain was already called in the past, exiting"); - return; + return Ok(()); } } - let mut builder = RustwideBuilder::init(pool).unwrap(); + let mut builder = RustwideBuilder::init(ctx.pool()?).unwrap(); builder .update_toolchain() .expect("failed to update toolchain"); } Self::AddEssentialFiles => { - let mut builder = RustwideBuilder::init(pool).unwrap(); + let mut builder = RustwideBuilder::init(ctx.pool()?).unwrap(); builder .add_essential_files() .expect("failed to add essential files"); @@ -370,6 +379,8 @@ impl BuildSubcommand { Self::Unlock => docbuilder.unlock().expect("Failed to unlock"), Self::PrintOptions => println!("{:?}", docbuilder.options()), } + + Ok(()) } } @@ -412,31 +423,33 @@ enum DatabaseSubcommand { } impl DatabaseSubcommand { - pub fn handle_args(self, conn: &Connection) { + pub fn handle_args(self, ctx: Context) -> Result<(), Error> { match self { Self::Migrate { version } => { - db::migrate(version, &conn).expect("Failed to run database migrations"); + db::migrate(version, &*ctx.conn()?).expect("Failed to run database migrations"); } Self::UpdateGithubFields => { - cratesfyi::utils::github_updater(&conn).expect("Failed to update github fields"); + cratesfyi::utils::github_updater(&*ctx.conn()?) + .expect("Failed to update github fields"); } Self::AddDirectory { directory, prefix } => { - add_path_into_database(&conn, &prefix, directory) + add_path_into_database(&*ctx.conn()?, &prefix, directory) .expect("Failed to add directory into database"); } // FIXME: This is actually util command not database - Self::UpdateReleaseActivity => cratesfyi::utils::update_release_activity(&conn) + Self::UpdateReleaseActivity => cratesfyi::utils::update_release_activity(&*ctx.conn()?) .expect("Failed to update release activity"), Self::DeleteCrate { crate_name } => { - db::delete_crate(&conn, &crate_name).expect("failed to delete the crate"); + db::delete_crate(&*ctx.conn()?, &crate_name).expect("failed to delete the crate"); } - Self::Blacklist { command } => command.handle_args(&conn), + Self::Blacklist { command } => command.handle_args(ctx)?, } + Ok(()) } } @@ -461,7 +474,8 @@ enum BlacklistSubcommand { } impl BlacklistSubcommand { - fn handle_args(self, conn: &Connection) { + fn handle_args(self, ctx: Context) -> Result<(), Error> { + let conn = &*ctx.conn()?; match self { Self::List => { let crates = @@ -476,5 +490,51 @@ impl BlacklistSubcommand { Self::Remove { crate_name } => db::blacklist::remove_crate(&conn, &crate_name) .expect("failed to remove crate from blacklist"), } + Ok(()) + } +} + +struct Context { + build_queue: OnceCell>, + config: OnceCell>, + pool: OnceCell, +} + +impl Context { + fn new() -> Self { + Self { + build_queue: OnceCell::new(), + config: OnceCell::new(), + pool: OnceCell::new(), + } + } + + fn build_queue(&self) -> Result, Error> { + Ok(self + .build_queue + .get_or_try_init::<_, Error>(|| { + Ok(Arc::new(BuildQueue::new(self.pool()?, &*self.config()?))) + })? + .clone()) + } + + fn config(&self) -> Result, Error> { + Ok(self + .config + .get_or_try_init::<_, Error>(|| Ok(Arc::new(Config::from_env()?)))? + .clone()) + } + + fn pool(&self) -> Result { + Ok(self + .pool + .get_or_try_init::<_, Error>(|| Ok(Pool::new(&*self.config()?)?))? + .clone()) + } + + fn conn( + &self, + ) -> Result, Error> { + Ok(self.pool()?.get()?) } } diff --git a/src/build_queue.rs b/src/build_queue.rs new file mode 100644 index 000000000..e3c36b92c --- /dev/null +++ b/src/build_queue.rs @@ -0,0 +1,321 @@ +use crate::config::Config; +use crate::db::Pool; +use crate::error::Result; +use log::error; + +#[derive(Debug, Eq, PartialEq, serde::Serialize)] +pub(crate) struct QueuedCrate { + #[serde(skip)] + id: i32, + pub(crate) name: String, + pub(crate) version: String, + pub(crate) priority: i32, +} + +#[derive(Debug)] +pub struct BuildQueue { + db: Pool, + max_attempts: i32, +} + +impl BuildQueue { + pub fn new(db: Pool, config: &Config) -> Self { + BuildQueue { + db, + max_attempts: config.build_attempts.into(), + } + } + + pub fn add_crate(&self, name: &str, version: &str, priority: i32) -> Result<()> { + self.db.get()?.execute( + "INSERT INTO queue (name, version, priority) VALUES ($1, $2, $3);", + &[&name, &version, &priority], + )?; + Ok(()) + } + + pub(crate) fn pending_count(&self) -> Result { + let res = self.db.get()?.query( + "SELECT COUNT(*) FROM queue WHERE attempt < $1;", + &[&self.max_attempts], + )?; + Ok(res.get(0).get::<_, i64>(0) as usize) + } + + pub(crate) fn prioritized_count(&self) -> Result { + let res = self.db.get()?.query( + "SELECT COUNT(*) FROM queue WHERE attempt < $1 AND priority <= 0;", + &[&self.max_attempts], + )?; + Ok(res.get(0).get::<_, i64>(0) as usize) + } + + pub(crate) fn failed_count(&self) -> Result { + let res = self.db.get()?.query( + "SELECT COUNT(*) FROM queue WHERE attempt >= $1;", + &[&self.max_attempts], + )?; + Ok(res.get(0).get::<_, i64>(0) as usize) + } + + pub(crate) fn queued_crates(&self) -> Result> { + let query = self.db.get()?.query( + "SELECT id, name, version, priority + FROM queue + WHERE attempt < $1 + ORDER BY priority ASC, attempt ASC, id ASC", + &[&self.max_attempts], + )?; + + Ok(query + .into_iter() + .map(|row| QueuedCrate { + id: row.get("id"), + name: row.get("name"), + version: row.get("version"), + priority: row.get("priority"), + }) + .collect()) + } + + pub(crate) fn process_next_crate( + &self, + f: impl FnOnce(&QueuedCrate) -> Result<()>, + ) -> Result<()> { + let conn = self.db.get()?; + + let queued = self.queued_crates()?; + let to_process = match queued.get(0) { + Some(krate) => krate, + None => return Ok(()), + }; + + let res = f(&to_process); + crate::web::metrics::TOTAL_BUILDS.inc(); + match res { + Ok(()) => { + conn.execute("DELETE FROM queue WHERE id = $1;", &[&to_process.id])?; + } + Err(e) => { + // Increase attempt count + let rows = conn.query( + "UPDATE queue SET attempt = attempt + 1 WHERE id = $1 RETURNING attempt;", + &[&to_process.id], + )?; + let attempt: i32 = rows.get(0).get(0); + + if attempt >= self.max_attempts { + crate::web::metrics::FAILED_BUILDS.inc(); + } + + error!( + "Failed to build package {}-{} from queue: {}\nBacktrace: {}", + to_process.name, + to_process.version, + e, + e.backtrace() + ); + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_add_and_process_crates() { + const MAX_ATTEMPTS: u16 = 3; + + crate::test::wrapper(|env| { + env.override_config(|config| { + config.build_attempts = MAX_ATTEMPTS; + }); + + let queue = env.build_queue(); + + let test_crates = [ + ("low-priority", "1.0.0", 1000), + ("high-priority-foo", "1.0.0", -1000), + ("medium-priority", "1.0.0", -10), + ("high-priority-bar", "1.0.0", -1000), + ("standard-priority", "1.0.0", 0), + ("high-priority-baz", "1.0.0", -1000), + ]; + for krate in &test_crates { + queue.add_crate(krate.0, krate.1, krate.2)?; + } + + let assert_next = |name| -> Result<()> { + queue.process_next_crate(|krate| { + assert_eq!(name, krate.name); + Ok(()) + })?; + Ok(()) + }; + let assert_next_and_fail = |name| -> Result<()> { + queue.process_next_crate(|krate| { + assert_eq!(name, krate.name); + failure::bail!("simulate a failure"); + })?; + Ok(()) + }; + + // The first processed item is the one with the highest priority added first. + assert_next("high-priority-foo")?; + + // Simulate a failure in high-priority-bar. + assert_next_and_fail("high-priority-bar")?; + + // Continue with the next high priority crate. + assert_next("high-priority-baz")?; + + // After all the crates with the max priority are processed, before starting to process + // crates with a lower priority the failed crates with the max priority will be tried + // again. + assert_next("high-priority-bar")?; + + // Continue processing according to the priority. + assert_next("medium-priority")?; + assert_next("standard-priority")?; + + // Simulate the crate failing many times. + for _ in 0..MAX_ATTEMPTS { + assert_next_and_fail("low-priority")?; + } + + // Since low-priority failed many times it will be removed from the queue. Because of + // that the queue should now be empty. + let mut called = false; + queue.process_next_crate(|_| { + called = true; + Ok(()) + })?; + assert!(!called, "there were still items in the queue"); + + Ok(()) + }) + } + + #[test] + fn test_pending_count() { + crate::test::wrapper(|env| { + let queue = env.build_queue(); + + assert_eq!(queue.pending_count()?, 0); + queue.add_crate("foo", "1.0.0", 0)?; + assert_eq!(queue.pending_count()?, 1); + queue.add_crate("bar", "1.0.0", 0)?; + assert_eq!(queue.pending_count()?, 2); + + queue.process_next_crate(|krate| { + assert_eq!("foo", krate.name); + Ok(()) + })?; + assert_eq!(queue.pending_count()?, 1); + + Ok(()) + }); + } + + #[test] + fn test_prioritized_count() { + crate::test::wrapper(|env| { + let queue = env.build_queue(); + + assert_eq!(queue.prioritized_count()?, 0); + queue.add_crate("foo", "1.0.0", 0)?; + assert_eq!(queue.prioritized_count()?, 1); + queue.add_crate("bar", "1.0.0", -100)?; + assert_eq!(queue.prioritized_count()?, 2); + queue.add_crate("baz", "1.0.0", 100)?; + assert_eq!(queue.prioritized_count()?, 2); + + queue.process_next_crate(|krate| { + assert_eq!("bar", krate.name); + Ok(()) + })?; + assert_eq!(queue.prioritized_count()?, 1); + + Ok(()) + }); + } + + #[test] + fn test_failed_count() { + const MAX_ATTEMPTS: u16 = 3; + crate::test::wrapper(|env| { + env.override_config(|config| { + config.build_attempts = MAX_ATTEMPTS; + }); + let queue = env.build_queue(); + + assert_eq!(queue.failed_count()?, 0); + queue.add_crate("foo", "1.0.0", -100)?; + assert_eq!(queue.failed_count()?, 0); + queue.add_crate("bar", "1.0.0", 0)?; + + for _ in 0..MAX_ATTEMPTS { + assert_eq!(queue.failed_count()?, 0); + queue.process_next_crate(|krate| { + assert_eq!("foo", krate.name); + failure::bail!("this failed"); + })?; + } + assert_eq!(queue.failed_count()?, 1); + + queue.process_next_crate(|krate| { + assert_eq!("bar", krate.name); + Ok(()) + })?; + assert_eq!(queue.failed_count()?, 1); + + Ok(()) + }); + } + + #[test] + fn test_queued_crates() { + crate::test::wrapper(|env| { + let queue = env.build_queue(); + + let test_crates = [ + ("foo", "1.0.0", -10), + ("bar", "1.0.0", 0), + ("baz", "1.0.0", 10), + ]; + for krate in &test_crates { + queue.add_crate(krate.0, krate.1, krate.2)?; + } + + assert_eq!( + vec![ + QueuedCrate { + id: 1, + name: "foo".into(), + version: "1.0.0".into(), + priority: -10, + }, + QueuedCrate { + id: 2, + name: "bar".into(), + version: "1.0.0".into(), + priority: 0, + }, + QueuedCrate { + id: 3, + name: "baz".into(), + version: "1.0.0".into(), + priority: 10, + }, + ], + queue.queued_crates()? + ); + + Ok(()) + }); + } +} diff --git a/src/config.rs b/src/config.rs index 85920639d..8f5c60f7f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -4,6 +4,9 @@ use std::str::FromStr; #[derive(Debug)] pub struct Config { + // Build params + pub(crate) build_attempts: u16, + // Database connection params pub(crate) database_url: String, pub(crate) max_pool_size: u32, @@ -17,6 +20,8 @@ pub struct Config { impl Config { pub fn from_env() -> Result { Ok(Self { + build_attempts: env("DOCSRS_BUILD_ATTEMPTS", 5)?, + database_url: require_env("CRATESFYI_DATABASE_URL")?, max_pool_size: env("DOCSRS_MAX_POOL_SIZE", 90)?, min_pool_idle: env("DOCSRS_MIN_POOL_IDLE", 10)?, diff --git a/src/docbuilder/mod.rs b/src/docbuilder/mod.rs index 240b2f86c..8aedff9aa 100644 --- a/src/docbuilder/mod.rs +++ b/src/docbuilder/mod.rs @@ -13,6 +13,7 @@ pub use self::rustwide_builder::RustwideBuilder; use crate::db::Pool; use crate::error::Result; use crate::index::Index; +use crate::BuildQueue; use crate::DocBuilderOptions; use log::debug; use std::collections::BTreeSet; @@ -20,20 +21,23 @@ use std::fs; use std::io::prelude::*; use std::io::BufReader; use std::path::PathBuf; +use std::sync::Arc; /// chroot based documentation builder pub struct DocBuilder { options: DocBuilderOptions, index: Index, db: Pool, + build_queue: Arc, cache: BTreeSet, db_cache: BTreeSet, } impl DocBuilder { - pub fn new(options: DocBuilderOptions, db: Pool) -> DocBuilder { + pub fn new(options: DocBuilderOptions, db: Pool, build_queue: Arc) -> DocBuilder { let index = Index::new(&options.registry_index_path).expect("valid index"); DocBuilder { + build_queue, options, index, db, diff --git a/src/docbuilder/queue.rs b/src/docbuilder/queue.rs index 5dc9aed6e..9be78eb20 100644 --- a/src/docbuilder/queue.rs +++ b/src/docbuilder/queue.rs @@ -2,7 +2,7 @@ use super::{DocBuilder, RustwideBuilder}; use crate::error::Result; -use crate::utils::{add_crate_to_queue, get_crate_priority}; +use crate::utils::get_crate_priority; use crates_index_diff::ChangeKind; use log::{debug, error}; @@ -44,7 +44,10 @@ impl DocBuilder { ChangeKind::Added => { let priority = get_crate_priority(&conn, &krate.name)?; - match add_crate_to_queue(&conn, &krate.name, &krate.version, priority) { + match self + .build_queue + .add_crate(&krate.name, &krate.version, priority) + { Ok(()) => { debug!("{}-{} added into build queue", krate.name, krate.version); crates_added += 1; @@ -63,15 +66,6 @@ impl DocBuilder { Ok(crates_added) } - pub fn get_queue_count(&self) -> Result { - let conn = self.db.get()?; - - Ok(conn - .query("SELECT COUNT(*) FROM queue WHERE attempt < 5", &[])? - .get(0) - .get(0)) - } - /// Builds the top package from the queue. Returns whether there was a package in the queue. /// /// Note that this will return `Ok(true)` even if the package failed to build. @@ -79,58 +73,15 @@ impl DocBuilder { &mut self, builder: &mut RustwideBuilder, ) -> Result { - // This is in a nested scope to drop the connection before build_package is called, - // otherwise the borrow checker will complain. - let (id, name, version): (i32, String, String) = { - let conn = self.db.get()?; - - let query = conn.query( - "SELECT id, name, version - FROM queue - WHERE attempt < 5 - ORDER BY priority ASC, attempt ASC, id ASC - LIMIT 1", - &[], - )?; - - if query.is_empty() { - // nothing in the queue; bail - return Ok(false); - } + let mut processed = false; + let queue = self.build_queue.clone(); + queue.process_next_crate(|krate| { + processed = true; - let row = query.get(0); - (row.get("id"), row.get("name"), row.get("version")) - }; - - match builder.build_package(self, &name, &version, None) { - Ok(_) => { - let conn = self.db.get()?; - - let _ = conn.execute("DELETE FROM queue WHERE id = $1", &[&id]); - } - Err(e) => { - let conn = self.db.get()?; - - // Increase attempt count - let rows = conn.query( - "UPDATE queue SET attempt = attempt + 1 WHERE id = $1 RETURNING attempt", - &[&id], - )?; - let attempt: i32 = rows.get(0).get(0); - if attempt >= 5 { - crate::web::metrics::FAILED_BUILDS.inc(); - } - error!( - "Failed to build package {}-{} from queue: {}\nBacktrace: {}", - name, - version, - e, - e.backtrace() - ) - } - } + builder.build_package(self, &krate.name, &krate.version, None)?; + Ok(()) + })?; - crate::web::metrics::TOTAL_BUILDS.inc(); - Ok(true) + Ok(processed) } } diff --git a/src/lib.rs b/src/lib.rs index 905798f0c..7afcca18e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,12 +2,14 @@ //! documentation of crates for the Rust Programming Language. #![allow(clippy::cognitive_complexity)] +pub use self::build_queue::BuildQueue; pub use self::config::Config; pub use self::docbuilder::options::DocBuilderOptions; pub use self::docbuilder::DocBuilder; pub use self::docbuilder::RustwideBuilder; pub use self::web::Server; +mod build_queue; mod config; pub mod db; mod docbuilder; diff --git a/src/test/mod.rs b/src/test/mod.rs index f7173b8bf..95239ff58 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -3,6 +3,7 @@ mod fakes; use crate::db::{Pool, PoolConnection}; use crate::storage::s3::TestS3; use crate::web::Server; +use crate::BuildQueue; use crate::Config; use failure::Error; use log::error; @@ -92,6 +93,7 @@ pub(crate) fn assert_redirect( } pub(crate) struct TestEnvironment { + build_queue: OnceCell>, config: OnceCell>, db: OnceCell, frontend: OnceCell, @@ -107,6 +109,7 @@ impl TestEnvironment { fn new() -> Self { init_logger(); Self { + build_queue: OnceCell::new(), config: OnceCell::new(), db: OnceCell::new(), frontend: OnceCell::new(), @@ -139,6 +142,12 @@ impl TestEnvironment { } } + pub(crate) fn build_queue(&self) -> Arc { + self.build_queue + .get_or_init(|| Arc::new(BuildQueue::new(self.db().pool(), &self.config()))) + .clone() + } + pub(crate) fn config(&self) -> Arc { self.config .get_or_init(|| Arc::new(self.base_config())) @@ -152,7 +161,7 @@ impl TestEnvironment { pub(crate) fn frontend(&self) -> &TestFrontend { self.frontend - .get_or_init(|| TestFrontend::new(self.db(), self.config())) + .get_or_init(|| TestFrontend::new(self.db(), self.config(), self.build_queue())) } pub(crate) fn s3(&self) -> &TestS3 { @@ -187,6 +196,10 @@ impl TestDatabase { }) } + pub(crate) fn pool(&self) -> Pool { + self.pool.clone() + } + pub(crate) fn conn(&self) -> PoolConnection { self.pool .get() @@ -216,10 +229,16 @@ pub(crate) struct TestFrontend { } impl TestFrontend { - fn new(db: &TestDatabase, config: Arc) -> Self { + fn new(db: &TestDatabase, config: Arc, build_queue: Arc) -> Self { Self { - server: Server::start(Some("127.0.0.1:0"), false, db.pool.clone(), config) - .expect("failed to start the web server"), + server: Server::start( + Some("127.0.0.1:0"), + false, + db.pool.clone(), + config, + build_queue, + ) + .expect("failed to start the web server"), client: Client::new(), } } diff --git a/src/utils/daemon.rs b/src/utils/daemon.rs index 48ba449c5..d57e06028 100644 --- a/src/utils/daemon.rs +++ b/src/utils/daemon.rs @@ -6,7 +6,7 @@ use crate::{ db::Pool, docbuilder::RustwideBuilder, utils::{github_updater, pubsubhubbub, update_release_activity}, - Config, DocBuilder, DocBuilderOptions, + BuildQueue, Config, DocBuilder, DocBuilderOptions, }; use chrono::{Timelike, Utc}; use failure::Error; @@ -17,7 +17,11 @@ use std::sync::Arc; use std::time::Duration; use std::{env, thread}; -pub fn start_daemon(config: Arc, db: Pool) -> Result<(), Error> { +pub fn start_daemon( + config: Arc, + db: Pool, + build_queue: Arc, +) -> Result<(), Error> { const CRATE_VARIABLES: [&str; 3] = [ "CRATESFYI_PREFIX", "CRATESFYI_GITHUB_USERNAME", @@ -38,6 +42,7 @@ pub fn start_daemon(config: Arc, db: Pool) -> Result<(), Error> { // check new crates every minute let cloned_db = db.clone(); + let cloned_build_queue = build_queue.clone(); thread::Builder::new() .name("registry index reader".to_string()) .spawn(move || { @@ -45,7 +50,8 @@ pub fn start_daemon(config: Arc, db: Pool) -> Result<(), Error> { thread::sleep(Duration::from_secs(30)); loop { let opts = opts(); - let mut doc_builder = DocBuilder::new(opts, cloned_db.clone()); + let mut doc_builder = + DocBuilder::new(opts, cloned_db.clone(), cloned_build_queue.clone()); if doc_builder.is_locked() { debug!("Lock file exists, skipping checking new crates"); @@ -65,9 +71,10 @@ pub fn start_daemon(config: Arc, db: Pool) -> Result<(), Error> { // build new crates every minute // REFACTOR: Break this into smaller functions let cloned_db = db.clone(); + let cloned_build_queue = build_queue.clone(); thread::Builder::new().name("build queue reader".to_string()).spawn(move || { let opts = opts(); - let mut doc_builder = DocBuilder::new(opts, cloned_db.clone()); + let mut doc_builder = DocBuilder::new(opts, cloned_db.clone(), cloned_build_queue.clone()); /// Represents the current state of the builder thread. enum BuilderState { @@ -119,7 +126,7 @@ pub fn start_daemon(config: Arc, db: Pool) -> Result<(), Error> { // Only build crates if there are any to build debug!("Checking build queue"); - match doc_builder.get_queue_count() { + match cloned_build_queue.pending_count() { Err(e) => { error!("Failed to read the number of crates in the queue: {}", e); continue; @@ -225,7 +232,7 @@ pub fn start_daemon(config: Arc, db: Pool) -> Result<(), Error> { // at least start web server info!("Starting web server"); - crate::Server::start(None, false, db, config)?; + crate::Server::start(None, false, db, config, build_queue)?; Ok(()) } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 69b666c26..453445a68 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -5,9 +5,7 @@ pub(crate) use self::copy::copy_doc_dir; pub use self::daemon::start_daemon; pub use self::github_updater::github_updater; pub use self::html::extract_head_and_body; -pub use self::queue::{ - add_crate_to_queue, get_crate_priority, remove_crate_priority, set_crate_priority, -}; +pub use self::queue::{get_crate_priority, remove_crate_priority, set_crate_priority}; pub use self::release_activity_updater::update_release_activity; pub(crate) use self::rustc_version::parse_rustc_version; diff --git a/src/utils/queue.rs b/src/utils/queue.rs index abe5472bf..1e3952994 100644 --- a/src/utils/queue.rs +++ b/src/utils/queue.rs @@ -46,21 +46,6 @@ pub fn remove_crate_priority(conn: &Connection, pattern: &str) -> Result Result<()> { - conn.execute( - "INSERT INTO queue (name, version, priority) VALUES ($1, $2, $3)", - &[&name, &version, &priority], - )?; - - Ok(()) -} - #[cfg(test)] mod tests { use super::*; @@ -164,37 +149,4 @@ mod tests { Ok(()) }) } - - #[test] - fn add_to_queue() { - wrapper(|env| { - let db = env.db(); - - let test_crates = [ - ("rcc", "0.1.0", 2), - ("lasso", "0.1.0", -1), - ("hexponent", "0.1.0", 0), - ("destroy-all-humans", "0.0.0-alpha", -100000), - ("totally-not-destroying-humans", "0.0.1", 0), - ]; - - for (name, version, priority) in test_crates.iter() { - add_crate_to_queue(&db.conn(), name, version, *priority)?; - - let query = db.conn().query( - "SELECT name, version, priority FROM queue WHERE name = $1", - &[&name], - )?; - - assert!(query.len() == 1); - let row = query.iter().next().unwrap(); - - assert_eq!(&row.get::<_, String>(0), name); - assert_eq!(&row.get::<_, String>(1), version); - assert_eq!(row.get::<_, i32>(2), *priority); - } - - Ok(()) - }) - } } diff --git a/src/web/extensions.rs b/src/web/extensions.rs index 0054f72cd..b0b78074f 100644 --- a/src/web/extensions.rs +++ b/src/web/extensions.rs @@ -1,11 +1,13 @@ use crate::config::Config; use crate::db::Pool; use crate::web::page::TemplateData; +use crate::BuildQueue; use iron::{BeforeMiddleware, IronResult, Request}; use std::sync::Arc; #[derive(Debug, Clone)] pub(super) struct InjectExtensions { + pub(super) build_queue: Arc, pub(super) pool: Pool, pub(super) config: Arc, pub(super) template_data: Arc, @@ -13,6 +15,8 @@ pub(super) struct InjectExtensions { impl BeforeMiddleware for InjectExtensions { fn before(&self, req: &mut Request) -> IronResult<()> { + req.extensions + .insert::(self.build_queue.clone()); req.extensions.insert::(self.pool.clone()); req.extensions.insert::(self.config.clone()); req.extensions @@ -30,6 +34,7 @@ macro_rules! key { }; } +key!(BuildQueue => Arc); key!(Pool => Pool); key!(Config => Arc); key!(TemplateData => Arc); diff --git a/src/web/metrics.rs b/src/web/metrics.rs index 2cdca0976..bf947bcfa 100644 --- a/src/web/metrics.rs +++ b/src/web/metrics.rs @@ -1,4 +1,5 @@ use crate::db::Pool; +use crate::BuildQueue; use iron::headers::ContentType; use iron::prelude::*; use iron::status::Status; @@ -117,33 +118,15 @@ lazy_static::lazy_static! { } pub fn metrics_handler(req: &mut Request) -> IronResult { - // Database calls are scoped in order to minimize the time a db connection is locked - { - let pool = extension!(req, Pool); - let conn = pool.get()?; - - USED_DB_CONNECTIONS.set(pool.used_connections() as i64); - IDLE_DB_CONNECTIONS.set(pool.idle_connections() as i64); - - QUEUED_CRATES_COUNT.set( - ctry!(conn.query("SELECT COUNT(*) FROM queue WHERE attempt < 5;", &[])) - .get(0) - .get(0), - ); - PRIORITIZED_CRATES_COUNT.set( - ctry!(conn.query( - "SELECT COUNT(*) FROM queue WHERE attempt < 5 AND priority <= 0;", - &[] - )) - .get(0) - .get(0), - ); - FAILED_CRATES_COUNT.set( - ctry!(conn.query("SELECT COUNT(*) FROM queue WHERE attempt >= 5;", &[])) - .get(0) - .get(0), - ); - } + let pool = extension!(req, Pool); + let queue = extension!(req, BuildQueue); + + USED_DB_CONNECTIONS.set(pool.used_connections() as i64); + IDLE_DB_CONNECTIONS.set(pool.idle_connections() as i64); + + QUEUED_CRATES_COUNT.set(ctry!(queue.pending_count()) as i64); + PRIORITIZED_CRATES_COUNT.set(ctry!(queue.prioritized_count()) as i64); + FAILED_CRATES_COUNT.set(ctry!(queue.failed_count()) as i64); #[cfg(not(windows))] { diff --git a/src/web/mod.rs b/src/web/mod.rs index 2c3921cb5..d820a05d5 100644 --- a/src/web/mod.rs +++ b/src/web/mod.rs @@ -64,6 +64,7 @@ use self::extensions::InjectExtensions; use self::page::TemplateData; use crate::config::Config; use crate::db::Pool; +use crate::BuildQueue; use chrono::{DateTime, Utc}; use failure::Error; use handlebars_iron::{DirectorySource, HandlebarsEngine, SourceError}; @@ -117,8 +118,14 @@ impl CratesfyiHandler { chain } - fn new(pool: Pool, config: Arc, template_data: Arc) -> CratesfyiHandler { + fn new( + pool: Pool, + config: Arc, + template_data: Arc, + build_queue: Arc, + ) -> CratesfyiHandler { let inject_extensions = InjectExtensions { + build_queue, pool, config, template_data, @@ -382,6 +389,7 @@ impl Server { reload_templates: bool, db: Pool, config: Arc, + build_queue: Arc, ) -> Result { // Initialize templates let template_data = Arc::new(TemplateData::new(&*db.get()?)?); @@ -389,7 +397,13 @@ impl Server { TemplateData::start_template_reloading(template_data.clone(), db.clone()); } - let server = Self::start_inner(addr.unwrap_or(DEFAULT_BIND), db, config, template_data); + let server = Self::start_inner( + addr.unwrap_or(DEFAULT_BIND), + db, + config, + template_data, + build_queue, + ); info!("Running docs.rs web server on http://{}", server.addr()); Ok(server) } @@ -399,6 +413,7 @@ impl Server { pool: Pool, config: Arc, template_data: Arc, + build_queue: Arc, ) -> Self { // poke all the metrics counters to instantiate and register them metrics::TOTAL_BUILDS.inc_by(0); @@ -408,7 +423,7 @@ impl Server { metrics::UPLOADED_FILES_TOTAL.inc_by(0); metrics::FAILED_DB_CONNECTIONS.inc_by(0); - let cratesfyi = CratesfyiHandler::new(pool, config, template_data); + let cratesfyi = CratesfyiHandler::new(pool, config, template_data, build_queue); let inner = Iron::new(cratesfyi) .http(addr) .unwrap_or_else(|_| panic!("Failed to bind to socket on {}", addr)); diff --git a/src/web/releases.rs b/src/web/releases.rs index 15464dd9e..e25099f09 100644 --- a/src/web/releases.rs +++ b/src/web/releases.rs @@ -4,6 +4,7 @@ use super::error::Nope; use super::page::Page; use super::{duration_to_str, match_version, redirect_base}; use crate::db::Pool; +use crate::BuildQueue; use chrono::{DateTime, NaiveDateTime, Utc}; use iron::prelude::*; use iron::status; @@ -643,30 +644,15 @@ pub fn activity_handler(req: &mut Request) -> IronResult { } pub fn build_queue_handler(req: &mut Request) -> IronResult { - let conn = extension!(req, Pool).get()?; - let query = conn - .query( - "SELECT name, version, priority - FROM queue - WHERE attempt < 5 - ORDER BY priority ASC, attempt ASC, id ASC", - &[], - ) - .unwrap(); - - let crates: Vec<(String, String, i32)> = query - .into_iter() - .map(|krate| { - ( - krate.get("name"), - krate.get("version"), - // The priority here is inverted: in the database if a crate has a higher priority it - // will be built after everything else, which is counter-intuitive for people not - // familiar with docs.rs's inner workings. - -krate.get::<_, i32>("priority"), - ) - }) - .collect(); + let queue = extension!(req, BuildQueue); + + let mut crates = ctry!(queue.queued_crates()); + for krate in &mut crates { + // The priority here is inverted: in the database if a crate has a higher priority it + // will be built after everything else, which is counter-intuitive for people not + // familiar with docs.rs's inner workings. + krate.priority = -krate.priority; + } let is_empty = crates.is_empty(); Page::new(crates) @@ -683,6 +669,7 @@ mod tests { use super::*; use crate::test::{assert_success, wrapper}; use chrono::TimeZone; + use kuchiki::traits::TendrilSink; use serde_json::json; #[test] @@ -1079,4 +1066,48 @@ mod tests { assert_success("/releases/feed", web) }) } + + #[test] + fn test_releases_queue() { + wrapper(|env| { + let queue = env.build_queue(); + let web = env.frontend(); + + let empty = kuchiki::parse_html().one(web.get("/releases/queue").send()?.text()?); + assert!(empty + .select(".release > strong") + .expect("missing heading") + .any(|el| el.text_contents().contains("nothing"))); + + queue.add_crate("foo", "1.0.0", 0)?; + queue.add_crate("bar", "0.1.0", -10)?; + queue.add_crate("baz", "0.0.1", 10)?; + + let full = kuchiki::parse_html().one(web.get("/releases/queue").send()?.text()?); + let items = full + .select(".queue-list > li") + .expect("missing list items") + .collect::>(); + + assert_eq!(items.len(), 3); + let expected = [ + ("bar", "0.1.0", Some(10)), + ("foo", "1.0.0", None), + ("baz", "0.0.1", Some(-10)), + ]; + for (li, expected) in items.iter().zip(&expected) { + let a = li.as_node().select_first("a").expect("missing link"); + assert!(a.text_contents().contains(expected.0)); + assert!(a.text_contents().contains(expected.1)); + + if let Some(priority) = expected.2 { + assert!(li + .text_contents() + .contains(&format!("priority: {}", priority))); + } + } + + Ok(()) + }); + } } diff --git a/templates/releases_queue.hbs b/templates/releases_queue.hbs index 1d988eed7..438f997e3 100644 --- a/templates/releases_queue.hbs +++ b/templates/releases_queue.hbs @@ -14,11 +14,11 @@
    {{#each content}}
  1. - - {{this.[0]}} {{this.[1]}} + + {{this.name}} {{this.version}} - {{#if this.[2]}} - (priority: {{this.[2]}}) + {{#if this.priority}} + (priority: {{this.priority}}) {{/if}}
  2. {{/each}}