From b3e8d304d21e386060eab31e961017e36188731b Mon Sep 17 00:00:00 2001 From: James Barford-Evans Date: Thu, 1 May 2025 12:32:01 +0100 Subject: [PATCH 1/4] feat - database schema & enqueue/dequeue logic --- database/src/lib.rs | 269 ++++++++++++++++++++++- database/src/pool.rs | 13 +- database/src/pool/postgres.rs | 394 +++++++++++++++++++++++++++++++++- database/src/pool/sqlite.rs | 384 ++++++++++++++++++++++++++++++++- 4 files changed, 1052 insertions(+), 8 deletions(-) diff --git a/database/src/lib.rs b/database/src/lib.rs index 41d664776..056b82a76 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -3,9 +3,9 @@ use chrono::{DateTime, Utc}; use hashbrown::HashMap; use intern::intern; use serde::{Deserialize, Serialize}; -use std::fmt; +use std::fmt::{self, Display, Formatter}; use std::hash; -use std::ops::{Add, Sub}; +use std::ops::{Add, Deref, Sub}; use std::sync::Arc; use std::time::Duration; @@ -155,6 +155,15 @@ impl FromStr for CommitType { } } +impl Display for CommitType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + CommitType::Try => f.write_str("try"), + CommitType::Master => f.write_str("master"), + } + } +} + #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub struct Commit { pub sha: String, @@ -794,3 +803,259 @@ pub struct ArtifactCollection { pub duration: Duration, pub end_time: DateTime, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum CommitJobType { + Try(u32), + Master(u32), + Release(String), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommitJobEntity { + pub sha: String, + pub parent_sha: String, + pub commit_time: Date, + pub target: Target, + pub include: Option, + pub exclude: Option, + pub runs: Option, + pub backends: Option, + pub job_type: CommitJobType, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommitJobInProgress { + pub commit_job: CommitJobEntity, + pub machine_id: String, + pub started_at: Date, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommitJobFinished { + pub commit_job: CommitJobEntity, + pub machine_id: String, + pub started_at: Date, + pub finished_at: Date, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommitJobFailed { + pub commit_job: CommitJobEntity, + pub machine_id: String, + pub started_at: Date, + pub finished_at: Date, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum CommitJob { + Queued(CommitJobEntity), + InProgress(CommitJobInProgress), + Finished(CommitJobFinished), + Failed(CommitJobFailed), +} + +impl CommitJob { + /// Returns `Some(&CommitJobEntity)` only if the job is still queued. + pub fn as_queued(&self) -> Option<&CommitJobEntity> { + match self { + CommitJob::Queued(e) => Some(e), + _ => None, + } + } + + /// Returns `Some(&CommitJobInProgress)` while the job is running. + pub fn as_in_progress(&self) -> Option<&CommitJobInProgress> { + match self { + CommitJob::InProgress(ip) => Some(ip), + _ => None, + } + } + + /// Returns `Some(&CommitJobFinished)` once the job is done. + pub fn as_finished(&self) -> Option<&CommitJobFinished> { + match self { + CommitJob::Finished(fin) => Some(fin), + _ => None, + } + } + + /// Get the status as a string + pub fn status(&self) -> &'static str { + match self { + CommitJob::Queued(_) => "queued", + CommitJob::InProgress(_) => "in_progress", + CommitJob::Finished(_) => "finished", + CommitJob::Failed(_) => "failed", + } + } + + /// True when `status == "finished"`. + pub fn is_finished(&self) -> bool { + matches!(self, CommitJob::Finished(_)) + } + + /// Will compose the column names for the job type + pub fn get_enqueue_column_names(&self) -> Vec { + let mut base_columns = vec![ + String::from("sha"), + String::from("parent_sha"), + String::from("commit_type"), + String::from("commit_time"), + String::from("status"), + String::from("target"), + String::from("include"), + String::from("exclude"), + String::from("runs"), + String::from("backends"), + ]; + + /* This is the last column */ + match self.job_type { + CommitJobType::Try(_) => base_columns.push("pr".into()), + CommitJobType::Master(_) => base_columns.push("pr".into()), + CommitJobType::Release(_) => base_columns.push("release_tag".into()), + }; + + base_columns + } +} + +impl Deref for CommitJob { + type Target = CommitJobEntity; + fn deref(&self) -> &Self::Target { + match self { + CommitJob::Queued(e) => e, + CommitJob::InProgress(ip) => &ip.commit_job, + CommitJob::Finished(fin) => &fin.commit_job, + CommitJob::Failed(fail) => &fail.commit_job, + } + } +} + +/// Maps from the database to a Rust struct +#[allow(clippy::too_many_arguments)] +fn commit_job_create( + sha: String, + parent_sha: String, + commit_type: &str, + pr: Option, + release_tag: Option, + commit_time: Date, + target: Target, + machine_id: Option, + started_at: Option, + finished_at: Option, + status: &str, + include: Option, + exclude: Option, + runs: Option, + backends: Option, +) -> CommitJob { + let job_type = match commit_type { + "try" => CommitJobType::Try(pr.expect("`pr` cannot be `None` for a Commit of type `try`")), + "master" => { + CommitJobType::Master(pr.expect("`pr` cannot be `None` for a Commit of type `master`")) + } + "release" => CommitJobType::Release( + release_tag.expect("`release_tag` cannot be `None` for a Commit of type `release`"), + ), + _ => panic!("Unhandled commit_type {}", commit_type), + }; + + let commit_job = CommitJobEntity { + sha, + parent_sha, + commit_time, + target, + include, + exclude, + runs, + backends, + job_type, + }; + + match status { + "queued" => CommitJob::Queued(commit_job), + + "in_progress" => { + let started_at = + started_at.expect("`started_at` must be Some for an `in_progress` job"); + let machine_id = + machine_id.expect("`machine_id` must be Some for an `in_progress` job"); + + CommitJob::InProgress(CommitJobInProgress { + commit_job, + started_at, + machine_id, + }) + } + + "finished" | "failed" => { + let started_at = + started_at.expect("`started_at` must be Some for finished or failed job"); + let finished_at = + finished_at.expect("`finished_at` must be Some for finished or failed"); + let machine_id = + machine_id.expect("`machine_id` must be Some for finished or failed a job"); + + if status == "finished" { + CommitJob::Finished(CommitJobFinished { + commit_job, + started_at, + finished_at, + machine_id, + }) + } else { + CommitJob::Failed(CommitJobFailed { + commit_job, + started_at, + finished_at, + machine_id, + }) + } + } + + other => { + panic!("unknown status `{other}` (expected `queued`, `in_progress`, `finished` or `failed`)") + } + } +} + +pub struct CommitsByType<'a> { + pub r#try: Vec<(&'a CommitJob, u32)>, + pub master: Vec<(&'a CommitJob, u32)>, + pub release: Vec<(&'a CommitJob, String)>, +} + +/// Given a vector of `CommitJobs` bucket them out into; +/// `try`, `master` and `release` (in that order) +pub fn split_queued_commit_jobs(commit_jobs: &[CommitJob]) -> CommitsByType<'_> { + // Split jobs by type as that determines what we enter into the database, + // `ToSql` is quite finiky about lifetimes. Moreover the column names + // change depending on the commit job type. `master` and `try` have + // a `pr` column whereas `release` has a `release_rag` column + let (try_commits, master_commits, release_commits) = commit_jobs.iter().fold( + (vec![], vec![], vec![]), + |(mut try_commits, mut master_commits, mut release_commits), job| { + let entity = job + .as_queued() + .expect("Can only enqueue jobs with a status of `queued`"); + + match &entity.job_type { + crate::CommitJobType::Try(pr) => try_commits.push((job, *pr)), + crate::CommitJobType::Master(pr) => master_commits.push((job, *pr)), + crate::CommitJobType::Release(release_tag) => { + release_commits.push((job, release_tag.clone())) + } + } + (try_commits, master_commits, release_commits) + }, + ); + + CommitsByType { + r#try: try_commits, + master: master_commits, + release: release_commits, + } +} diff --git a/database/src/pool.rs b/database/src/pool.rs index 71c0855a7..0147d73c5 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -1,5 +1,6 @@ use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, CodegenBackend, CompileBenchmark, Target, + ArtifactCollection, ArtifactId, ArtifactIdNumber, CodegenBackend, CommitJob, CompileBenchmark, + Target, }; use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step}; use chrono::{DateTime, Utc}; @@ -178,6 +179,16 @@ pub trait Connection: Send + Sync { /// Removes all data associated with the given artifact. async fn purge_artifact(&self, aid: &ArtifactId); + + /// Add a jobs to the queue + async fn enqueue_commit_jobs(&self, jobs: &[CommitJob]); + + /// Dequeue jobs, we pass `machine_id` and `target` in case there are jobs + /// the machine was previously doing and can pick up again + async fn dequeue_commit_job(&self, machine_id: &str, target: Target) -> Option; + + /// Mark the job as finished + async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) -> bool; } #[async_trait::async_trait] diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 374b4904f..8d5c7a7cc 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1,7 +1,8 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, CodegenBackend, CollectionId, - Commit, CommitType, CompileBenchmark, Date, Index, Profile, QueuedCommit, Scenario, Target, + commit_job_create, split_queued_commit_jobs, ArtifactCollection, ArtifactId, ArtifactIdNumber, + Benchmark, CodegenBackend, CollectionId, Commit, CommitJob, CommitType, CompileBenchmark, Date, + Index, Profile, QueuedCommit, Scenario, Target, }; use anyhow::Context as _; use chrono::{DateTime, TimeZone, Utc}; @@ -12,11 +13,14 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; +use tokio_postgres::types::{FromSql, ToSql}; use tokio_postgres::GenericClient; use tokio_postgres::Statement; pub struct Postgres(String, std::sync::Once); +type PgParam<'a> = &'a (dyn tokio_postgres::types::ToSql + Sync); + impl Postgres { pub fn new(url: String) -> Self { Postgres(url, std::sync::Once::new()) @@ -285,6 +289,30 @@ static MIGRATIONS: &[&str] = &[ alter table pstat_series drop constraint test_case; alter table pstat_series add constraint test_case UNIQUE(crate, profile, scenario, backend, target, metric); "#, + r#" + CREATE TABLE IF NOT EXISTS commit_queue ( + sha TEXT, + parent_sha TEXT, + commit_type TEXT, + pr INTEGER, + release_tag TEXT, + commit_time TIMESTAMP, + target TEXT, + include TEXT, + exclude TEXT, + runs INTEGER DEFAULT 0, + backends TEXT, + machine_id TEXT, + started_at TIMESTAMP, + finished_at TIMESTAMP, + status TEXT, + retries INTEGER DEFAULT 0, + PRIMARY KEY (sha, target) + ); + CREATE INDEX IF NOT EXISTS sha_idx ON commit_queue (sha); + CREATE INDEX IF NOT EXISTS machine_id_idx ON commit_queue (machine_id); + CREATE INDEX IF NOT EXISTS sha_machine_id_idx ON commit_queue (sha, machine_id); + "#, ]; #[async_trait::async_trait] @@ -1365,6 +1393,368 @@ where .await .unwrap(); } + + async fn enqueue_commit_jobs(&self, jobs: &[CommitJob]) { + if jobs.is_empty() { + return; + } + + let commits_by_type = split_queued_commit_jobs(jobs); + + // Create a bulk insert statment for a specific commit job type i.e; + // ``` + // INSERT INTO + // commit_queue(sha, parent_sha... ) + // VALUES + // ($1, $2, ...), + // ($3, $4, ...), + // ($5, $6, ...) + // ON CONFLICT DO NOTHING; + // ``` + fn make_insert_sql(commit: &CommitJob, rows: usize) -> String { + /* Get the column names we are interested in inserting */ + let cols = commit.get_enqueue_column_names(); + let col_cnt = cols.len(); + let col_sql = cols.join(", "); + + /* ($1,$2,...), ($k,$k+1, etc...) */ + let values = (0..rows) + .map(|r| { + let base = r * col_cnt; + let group = (1..=col_cnt) + .map(|i| format!("${}", base + i)) + .collect::>() + .join(", "); + format!("({})", group) + }) + .collect::>() + .join(", "); + + format!( + "INSERT INTO commit_queue ({}) VALUES {} ON CONFLICT DO NOTHING", + col_sql, values + ) + } + + /* Add the commits to the database in their serialised format */ + async fn add_to_database(client: &P, kind: &str, commit_jobs: Vec<(&CommitJob, T)>) + where + P: Send + Sync + PClient, + T: tokio_postgres::types::ToSql + Sync, + { + if let Some(head) = commit_jobs.first() { + let sql = make_insert_sql(head.0, commit_jobs.len()); + let params = commit_jobs + .iter() + .flat_map(|(commit_job, pr_or_release)| { + [ + &commit_job.sha as PgParam, + &commit_job.parent_sha, + &kind, + &commit_job.commit_time, + &"queued", /* status is always queued */ + &commit_job.target, + &commit_job.include, + &commit_job.exclude, + &commit_job.runs, + &commit_job.backends, + pr_or_release, /* This will either be a `pr` or `relase_tag`*/ + ] + }) + .collect::>(); + client.conn().execute(&sql, ¶ms).await.unwrap(); + } + } + + /* Fire out in parallel */ + tokio::join!( + add_to_database(self, "try", commits_by_type.r#try), + add_to_database(self, "master", commits_by_type.master), + add_to_database(self, "release", commits_by_type.release) + ); + } + + async fn dequeue_commit_job(&self, machine_id: &str, target: Target) -> Option { + /* Check to see if this machine possibly went offline while doing + * a previous job - if it did we'll take that job + * + * `FOR UPDATE SKIP LOCKED`prevents multiple machines of the same + * architecture taking the same job. See here for more information; + * https://www.postgresql.org/docs/17/sql-select.html#SQL-FOR-UPDATE-SHARE */ + let maybe_previous_job = self + .conn() + .query_opt( + " + WITH job_to_update AS ( + SELECT + sha, + parent_sha, + commit_type, + pr, + release_tag, + commit_time, + target, + include, + exclude, + runs, + backends, + machine_id, + started_at, + finished_at, + status, + retries + FROM commit_queue + WHERE machine_id = $1 + AND target = $2 + AND status = 'in_progress' + AND retries < 3 + ORDER BY started_at + LIMIT 1 + + FOR UPDATE SKIP LOCKED + + ) + UPDATE commit_queue AS cq + SET started_at = NOW(), + status = 'in_progress', + retries = cq.retries + 1 + WHERE cq.sha = (SELECT sha FROM job_to_update) + RETURNING cq.*; + ", + &[&machine_id, &target], + ) + .await + .unwrap(); + + /* If it was we will take that job */ + if let Some(row) = maybe_previous_job { + return Some(commit_queue_row_to_commit_job(&row)); + } + + let maybe_drift_job = self + .conn() + .query_opt( + " + WITH job_to_update AS ( + SELECT + sha, + parent_sha, + commit_type, + pr, + release_tag, + commit_time, + target, + include, + exclude, + runs, + backends, + machine_id, + started_at, + finished_at, + status, + retries + FROM commit_queue + WHERE target != $1 + AND status IN ('finished', 'in_progress') + AND sha NOT IN ( + SELECT sha + FROM commit_queue + WHERE target != $1 + AND status = 'finished' + ) + ORDER BY started_at + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + UPDATE commit_queue + SET started_at = NOW(), + status = 'in_progress', + machine_id = $2 + WHERE + target = $1 + AND sha = (SELECT sha FROM job_to_update) + RETURNING *; + ", + &[&target, &machine_id], + ) + .await + .unwrap(); + + /* If we are, we will take that job */ + if let Some(row) = maybe_drift_job { + return Some(commit_queue_row_to_commit_job(&row)); + } + + /* See if there are any jobs that need taking care of */ + let job = self + .conn() + .query_opt( + " + WITH job_to_update AS ( + SELECT + sha, + parent_sha, + commit_type, + pr, + release_tag, + commit_time, + target, + include, + exclude, + runs, + backends, + machine_id, + started_at, + finished_at, + status, + retries + FROM commit_queue + WHERE target = $1 + AND status = 'queued' + ORDER BY pr ASC, commit_type, sha + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + UPDATE commit_queue + SET started_at = NOW(), + status = 'in_progress', + machine_id = $2 + WHERE + sha = (SELECT sha FROM job_to_update) + AND target = $1 + RETURNING *; + ", + &[&target, &machine_id], + ) + .await + .unwrap(); + + /* If there is one, we will take that job */ + if let Some(row) = job { + return Some(commit_queue_row_to_commit_job(&row)); + } + + /* There are no jobs in the queue */ + return None; + } + + /// Mark a job in the database as done + async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) -> bool { + let jobs = self + .conn() + .query_opt( + " + UPDATE commit_queue + SET finished_at = DATETIME('now'), + status = 'finished', + WHERE + sha = $1 + AND machine_id = $1 + AND target = $1; + ", + &[&sha, &machine_id, &target], + ) + .await + .unwrap(); + return jobs.is_some(); + } +} + +/// Map a database row from the commit queue to a `CommitJob` +fn commit_queue_row_to_commit_job(row: &tokio_postgres::Row) -> CommitJob { + let sha = row.get::<_, String>(0); + let parent_sha = row.get::<_, String>(1); + let commit_type = row.get::<_, String>(2); + let pr = row.get::<_, Option>(3); + let release_tag = row.get::<_, Option>(4); + let commit_time = row.get::<_, String>(5).parse::().unwrap(); + let target = Target::from_str(&row.get::<_, String>(6)).unwrap(); + let include = row.get::<_, Option>(7); + let exclude = row.get::<_, Option>(8); + let runs = row.get::<_, Option>(9); + let backends = row.get::<_, Option>(10); + let machine_id = row.get::<_, Option>(11); + let started_at = row + .get::<_, Option>(12) + .map(|ts| ts.parse::().unwrap()); + + let finished_at = row + .get::<_, Option>(13) + .map(|ts| ts.parse::().unwrap()); + let status = row.get::<_, String>(14); + + commit_job_create( + sha, + parent_sha, + &commit_type, + pr, + release_tag, + commit_time, + target, + machine_id, + started_at, + finished_at, + &status, + include, + exclude, + runs, + backends, + ) +} + +#[macro_export] +macro_rules! impl_to_postgresql_via_to_string { + ($t:ty) => { + impl tokio_postgres::types::ToSql for $t { + fn to_sql( + &self, + ty: &tokio_postgres::types::Type, + out: &mut bytes::BytesMut, + ) -> Result> + { + self.to_string().to_sql(ty, out) + } + + fn accepts(ty: &tokio_postgres::types::Type) -> bool { + ::accepts(ty) + } + + // Only compile if the type is acceptable + tokio_postgres::types::to_sql_checked!(); + } + }; +} + +impl_to_postgresql_via_to_string!(Target); + +impl ToSql for Date { + fn to_sql( + &self, + ty: &tokio_postgres::types::Type, + out: &mut bytes::BytesMut, + ) -> Result> { + self.0.to_sql(ty, out) + } + + fn accepts(ty: &tokio_postgres::types::Type) -> bool { + as ToSql>::accepts(ty) + } + + tokio_postgres::types::to_sql_checked!(); +} + +impl<'a> FromSql<'a> for Date { + fn from_sql( + ty: &tokio_postgres::types::Type, + raw: &'a [u8], + ) -> Result> { + let dt = DateTime::::from_sql(ty, raw)?; + Ok(Date(dt)) + } + + fn accepts(ty: &tokio_postgres::types::Type) -> bool { + as FromSql>::accepts(ty) + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option>) -> ArtifactId { diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index 27d6b46de..d6d5802d7 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1,13 +1,15 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::{ - ArtifactCollection, ArtifactId, Benchmark, CodegenBackend, CollectionId, Commit, CommitType, - CompileBenchmark, Date, Profile, Target, + commit_job_create, split_queued_commit_jobs, ArtifactCollection, ArtifactId, Benchmark, + CodegenBackend, CollectionId, Commit, CommitJob, CommitType, CompileBenchmark, Date, Profile, + Target, }; use crate::{ArtifactIdNumber, Index, QueuedCommit}; use chrono::{DateTime, TimeZone, Utc}; use hashbrown::HashMap; -use rusqlite::params; +use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ValueRef}; use rusqlite::OptionalExtension; +use rusqlite::{params, params_from_iter}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Mutex; @@ -404,6 +406,32 @@ static MIGRATIONS: &[Migration] = &[ alter table pstat_series_with_target rename to pstat_series; "#, ), + Migration::without_foreign_key_constraints( + r#" + CREATE TABLE IF NOT EXISTS commit_queue ( + sha TEXT, + parent_sha TEXT, + commit_type TEXT, + pr INTEGER, + release_tag TEXT, + commit_time TIMESTAMP, + target TEXT, + include TEXT, + exclude TEXT, + runs INTEGER DEFAULT 0, + backends TEXT, + machine_id TEXT, + started_at TIMESTAMP, + finished_at TIMESTAMP, + status TEXT, + retries INTEGER DEFAULT 0, + PRIMARY KEY (sha, target) + ); + CREATE INDEX IF NOT EXISTS sha_idx ON commit_queue (sha); + CREATE INDEX IF NOT EXISTS machine_id_idx ON commit_queue (machine_id); + CREATE INDEX IF NOT EXISTS sha_machine_id_idx ON commit_queue (sha, machine_id); + "#, + ), ]; #[async_trait::async_trait] @@ -1252,6 +1280,356 @@ impl Connection for SqliteConnection { ) .unwrap(); } + + /// For this to work we need a central database + async fn enqueue_commit_jobs(&self, jobs: &[CommitJob]) { + if jobs.is_empty() { + return; + } + + let commits_by_type = split_queued_commit_jobs(jobs); + // Create a bulk insert statment for a specific commit job type i.e; + // ``` + // INSERT OR IGNORE INTO + // commit_queue(sha, parent_sha... ) + // VALUES + // (?, ?, ...), + // (?, ?, ...), + // (?, ?, ...); + // ``` + fn make_insert_sql(commit: &CommitJob, rows: usize) -> String { + let column_names = commit.get_enqueue_column_names(); + let column_string_names = column_names.join(", "); + let query_params = std::iter::repeat("?") + .take(column_names.len()) + .collect::>() + .join(", "); + let placeholders = (0..rows) + .map(|_| format!("({})", query_params)) + .collect::>() + .join(", "); + + format!( + "INSERT OR IGNORE INTO commit_queue ({}) VALUES {};", + column_string_names, placeholders + ) + } + + /* Add the commits to the database in their serialised format */ + fn add_to_database( + client: &SqliteConnection, + kind: &str, + commit_jobs: Vec<(&CommitJob, T)>, + ) where + T: rusqlite::ToSql, + { + if let Some(head) = commit_jobs.first() { + let sql = make_insert_sql(head.0, commit_jobs.len()); + let params = commit_jobs + .iter() + .flat_map(|(commit_job, pr_or_release)| { + [ + &commit_job.sha as &dyn ToSql, + &commit_job.parent_sha, + &kind, + &commit_job.commit_time, + &"queued", /* status is always queued */ + &commit_job.target, + &commit_job.include, + &commit_job.exclude, + &commit_job.runs, + &commit_job.backends, + pr_or_release, /* This will either be a `pr` or `relase_tag`*/ + ] + }) + .collect::>(); + client + .raw_ref() + .execute(&sql, params_from_iter(params)) + .unwrap(); + } + } + + add_to_database(self, "try", commits_by_type.r#try); + add_to_database(self, "master", commits_by_type.master); + add_to_database(self, "release", commits_by_type.release); + } + + /// For this to work we need a central database + async fn dequeue_commit_job(&self, machine_id: &str, target: Target) -> Option { + /* Check to see if this machine possibly went offline while doing + * a previous job - if it did we'll take that job */ + let maybe_previous_job = self + .raw_ref() + .prepare( + " + WITH job_to_update AS ( + SELECT + sha, + parent_sha, + commit_type, + pr, + release_tag, + commit_time, + target, + include, + exclude, + runs, + backends, + machine_id, + started_at, + finished_at, + status, + retries + FROM commit_queue + WHERE machine_id = ? + AND target = ? + AND status = 'in_progress' + AND retries < 3 + ORDER BY started_at + LIMIT 1 + ) + UPDATE commit_queue AS cq + SET started_at = DATETIME('now'), + status = 'in_progress', + retries = cq.retries + 1 + WHERE cq.sha = (SELECT sha FROM job_to_update) + RETURNING *; + ", + ) + .unwrap() + .query_map(params![machine_id, &target], |row| { + Ok(commit_queue_row_to_commit_job(row)) + }) + .unwrap() + .map(|row| row.unwrap()) + .collect::>(); + + if let Some(previous_job) = maybe_previous_job.first() { + return Some(previous_job.clone()); + } + + /* Check to see if we are out of sync with other collectors of + * different architectures, if we are we will update the row and + * return this `sha` */ + let maybe_drift_job = self + .raw_ref() + .prepare( + " + WITH job_to_update AS ( + SELECT + sha, + parent_sha, + commit_type, + pr, + release_tag, + commit_time, + target, + include, + exclude, + runs, + backends, + machine_id, + started_at, + finished_at, + status, + retries + FROM commit_queue + WHERE target != ? + AND status IN ('finished', 'in_progress') + AND sha NOT IN ( + SELECT sha + FROM commit_queue + WHERE target != ? + AND status = 'finished' + ) + ORDER BY started_at + LIMIT 1 + ) + UPDATE commit_queue + SET started_at = DATETIME('now'), + status = 'in_progress', + machine_id = ? + WHERE + target = ? + AND sha = (SELECT sha FROM job_to_update) + RETURNING *; + ", + ) + .unwrap() + .query_map(params![&target, &target, machine_id, &target], |row| { + Ok(commit_queue_row_to_commit_job(row)) + }) + .unwrap() + .map(|sha| sha.unwrap()) + .collect::>(); + + if let Some(drift_job) = maybe_drift_job.first() { + return Some(drift_job.clone()); + } + + /* See if there are any jobs that need taking care of */ + let jobs = self + .raw_ref() + .prepare( + " + WITH job_to_update AS ( + SELECT + sha, + parent_sha, + commit_type, + pr, + release_tag, + commit_time, + target, + include, + exclude, + runs, + backends, + machine_id, + started_at, + finished_at, + status, + retries + FROM commit_queue + WHERE target = ? + AND status = 'queued' + ORDER BY + pr ASC, + commit_type, + sha + LIMIT 1 + ) + UPDATE commit_queue + SET started_at = DATETIME('now'), + status = 'in_progress', + machine_id = ? + WHERE + sha = (SELECT sha FROM job_to_update) + AND target = ? + RETURNING *; + ", + ) + .unwrap() + .query_map(params![&target, machine_id, &target], |row| { + Ok(commit_queue_row_to_commit_job(row)) + }) + .unwrap() + .map(|r| r.unwrap()) + .collect::>(); + + /* If there is one, we will take that job */ + if let Some(commit_job) = jobs.first() { + return Some(commit_job.clone()); + } + + /* There are no jobs in the queue */ + return None; + } + + /// For this to work we need a central database + async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) -> bool { + let jobs = self + .raw_ref() + .execute( + " + UPDATE commit_queue + SET finished_at = DATETIME('now'), + status = 'finished', + WHERE + sha = ? + AND machine_id = ? + AND target = ?; + ", + params![&sha, machine_id, &target], + ) + .unwrap(); + return jobs == 1; + } +} + +#[macro_export] +macro_rules! impl_to_sqlite_via_to_string { + ($t:ty) => { + impl ToSql for $t { + fn to_sql(&self) -> rusqlite::Result> { + Ok(self.to_string().into()) + } + } + }; +} + +impl_to_sqlite_via_to_string!(Target); + +impl ToSql for Date { + fn to_sql(&self) -> rusqlite::Result> { + Ok(self.0.to_rfc3339().into()) + } +} + +impl FromSql for Date { + fn column_result(value: ValueRef<'_>) -> FromSqlResult { + match value { + ValueRef::Text(text) => { + let s = std::str::from_utf8(text).map_err(|e| FromSqlError::Other(Box::new(e)))?; + DateTime::parse_from_rfc3339(s) + .map(|dt| Date(dt.with_timezone(&Utc))) + .map_err(|e| FromSqlError::Other(Box::new(e))) + } + ValueRef::Integer(i) => Ok(Date(Utc.timestamp_opt(i, 0).unwrap())), + ValueRef::Real(f) => { + let secs = f.trunc() as i64; + let nanos = ((f - f.trunc()) * 1e9) as u32; + Ok(Date(Utc.timestamp_opt(secs, nanos).unwrap())) + } + _ => Err(FromSqlError::InvalidType), + } + } +} + +/* This is the same order as the `SELECT ...` for above, which is also the + * table creation order */ +fn commit_queue_row_to_commit_job(row: &rusqlite::Row) -> CommitJob { + let sha = row.get::<_, String>(0).unwrap(); + let parent_sha = row.get::<_, String>(1).unwrap(); + let commit_type = row.get::<_, String>(2).unwrap(); + let pr = row.get::<_, Option>(3).unwrap(); + let release_tag = row.get::<_, Option>(4).unwrap(); + let commit_time = row.get::<_, String>(5).unwrap().parse::().unwrap(); + let target = Target::from_str(&row.get::<_, String>(6).unwrap()).unwrap(); + let include = row.get::<_, Option>(7).unwrap(); + let exclude = row.get::<_, Option>(8).unwrap(); + let runs = row.get::<_, Option>(9).unwrap(); + let backends = row.get::<_, Option>(10).unwrap(); + let machine_id = row.get::<_, Option>(11).unwrap(); + let started_at = row + .get::<_, Option>(12) + .unwrap() + .map(|ts| ts.parse::().unwrap()); + + let finished_at = row + .get::<_, Option>(13) + .unwrap() + .map(|ts| ts.parse::().unwrap()); + let status = row.get::<_, String>(14).unwrap(); + + commit_job_create( + sha, + parent_sha, + &commit_type, + pr, + release_tag, + commit_time, + target, + machine_id, + started_at, + finished_at, + &status, + include, + exclude, + runs, + backends, + ) } fn parse_artifact_id(ty: &str, sha: &str, date: Option) -> ArtifactId { From 49e24b794729a738c36eb98fb40ebc7d83055db9 Mon Sep 17 00:00:00 2001 From: Jamesbarford Date: Tue, 6 May 2025 12:20:05 +0100 Subject: [PATCH 2/4] PR feedback - simpilfy requeuing, remove C-like OOP and correct queries --- database/src/lib.rs | 208 +++++++------------------- database/src/pool.rs | 8 +- database/src/pool/postgres.rs | 272 ++++++++++++++++------------------ database/src/pool/sqlite.rs | 265 ++++++++++++++++----------------- 4 files changed, 318 insertions(+), 435 deletions(-) diff --git a/database/src/lib.rs b/database/src/lib.rs index 056b82a76..9f7e8b6d9 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -5,7 +5,7 @@ use intern::intern; use serde::{Deserialize, Serialize}; use std::fmt::{self, Display, Formatter}; use std::hash; -use std::ops::{Add, Deref, Sub}; +use std::ops::{Add, Sub}; use std::sync::Arc; use std::time::Duration; @@ -806,13 +806,24 @@ pub struct ArtifactCollection { #[derive(Debug, Clone, Serialize, Deserialize)] pub enum CommitJobType { - Try(u32), - Master(u32), - Release(String), + Try { pr: u32 }, + Master { pr: u32 }, + Release { tag: String }, +} + +impl CommitJobType { + /// Get the name of the type as a `str` + pub fn name(&self) -> &'static str { + match self { + CommitJobType::Try { pr: _ } => "try", + CommitJobType::Master { pr: _ } => "master", + CommitJobType::Release { tag: _ } => "release", + } + } } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CommitJobEntity { +pub struct CommitJob { pub sha: String, pub parent_sha: String, pub commit_time: Date, @@ -822,18 +833,25 @@ pub struct CommitJobEntity { pub runs: Option, pub backends: Option, pub job_type: CommitJobType, + pub state: CommitJobState, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum CommitJobState { + Queued, + Finished(CommitJobFinished), + Failed(CommitJobFailed), + InProgress(CommitJobInProgress), } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CommitJobInProgress { - pub commit_job: CommitJobEntity, pub machine_id: String, pub started_at: Date, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CommitJobFinished { - pub commit_job: CommitJobEntity, pub machine_id: String, pub started_at: Date, pub finished_at: Date, @@ -841,94 +859,19 @@ pub struct CommitJobFinished { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CommitJobFailed { - pub commit_job: CommitJobEntity, pub machine_id: String, pub started_at: Date, pub finished_at: Date, } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum CommitJob { - Queued(CommitJobEntity), - InProgress(CommitJobInProgress), - Finished(CommitJobFinished), - Failed(CommitJobFailed), -} - impl CommitJob { - /// Returns `Some(&CommitJobEntity)` only if the job is still queued. - pub fn as_queued(&self) -> Option<&CommitJobEntity> { - match self { - CommitJob::Queued(e) => Some(e), - _ => None, - } - } - - /// Returns `Some(&CommitJobInProgress)` while the job is running. - pub fn as_in_progress(&self) -> Option<&CommitJobInProgress> { - match self { - CommitJob::InProgress(ip) => Some(ip), - _ => None, - } - } - - /// Returns `Some(&CommitJobFinished)` once the job is done. - pub fn as_finished(&self) -> Option<&CommitJobFinished> { - match self { - CommitJob::Finished(fin) => Some(fin), - _ => None, - } - } - /// Get the status as a string pub fn status(&self) -> &'static str { - match self { - CommitJob::Queued(_) => "queued", - CommitJob::InProgress(_) => "in_progress", - CommitJob::Finished(_) => "finished", - CommitJob::Failed(_) => "failed", - } - } - - /// True when `status == "finished"`. - pub fn is_finished(&self) -> bool { - matches!(self, CommitJob::Finished(_)) - } - - /// Will compose the column names for the job type - pub fn get_enqueue_column_names(&self) -> Vec { - let mut base_columns = vec![ - String::from("sha"), - String::from("parent_sha"), - String::from("commit_type"), - String::from("commit_time"), - String::from("status"), - String::from("target"), - String::from("include"), - String::from("exclude"), - String::from("runs"), - String::from("backends"), - ]; - - /* This is the last column */ - match self.job_type { - CommitJobType::Try(_) => base_columns.push("pr".into()), - CommitJobType::Master(_) => base_columns.push("pr".into()), - CommitJobType::Release(_) => base_columns.push("release_tag".into()), - }; - - base_columns - } -} - -impl Deref for CommitJob { - type Target = CommitJobEntity; - fn deref(&self) -> &Self::Target { - match self { - CommitJob::Queued(e) => e, - CommitJob::InProgress(ip) => &ip.commit_job, - CommitJob::Finished(fin) => &fin.commit_job, - CommitJob::Failed(fail) => &fail.commit_job, + match self.state { + CommitJobState::Queued => "queued", + CommitJobState::InProgress(_) => "in_progress", + CommitJobState::Finished(_) => "finished", + CommitJobState::Failed(_) => "failed", } } } @@ -953,30 +896,21 @@ fn commit_job_create( backends: Option, ) -> CommitJob { let job_type = match commit_type { - "try" => CommitJobType::Try(pr.expect("`pr` cannot be `None` for a Commit of type `try`")), - "master" => { - CommitJobType::Master(pr.expect("`pr` cannot be `None` for a Commit of type `master`")) - } - "release" => CommitJobType::Release( - release_tag.expect("`release_tag` cannot be `None` for a Commit of type `release`"), - ), + "try" => CommitJobType::Try { + pr: pr.expect("`pr` cannot be `None` for a Commit of type `try`"), + }, + "master" => CommitJobType::Master { + pr: pr.expect("`pr` cannot be `None` for a Commit of type `master`"), + }, + "release" => CommitJobType::Release { + tag: release_tag + .expect("`release_tag` cannot be `None` for a Commit of type `release`"), + }, _ => panic!("Unhandled commit_type {}", commit_type), }; - let commit_job = CommitJobEntity { - sha, - parent_sha, - commit_time, - target, - include, - exclude, - runs, - backends, - job_type, - }; - - match status { - "queued" => CommitJob::Queued(commit_job), + let state = match status { + "queued" => CommitJobState::Queued, "in_progress" => { let started_at = @@ -984,8 +918,7 @@ fn commit_job_create( let machine_id = machine_id.expect("`machine_id` must be Some for an `in_progress` job"); - CommitJob::InProgress(CommitJobInProgress { - commit_job, + CommitJobState::InProgress(CommitJobInProgress { started_at, machine_id, }) @@ -1000,15 +933,13 @@ fn commit_job_create( machine_id.expect("`machine_id` must be Some for finished or failed a job"); if status == "finished" { - CommitJob::Finished(CommitJobFinished { - commit_job, + CommitJobState::Finished(CommitJobFinished { started_at, finished_at, machine_id, }) } else { - CommitJob::Failed(CommitJobFailed { - commit_job, + CommitJobState::Failed(CommitJobFailed { started_at, finished_at, machine_id, @@ -1019,43 +950,18 @@ fn commit_job_create( other => { panic!("unknown status `{other}` (expected `queued`, `in_progress`, `finished` or `failed`)") } - } -} - -pub struct CommitsByType<'a> { - pub r#try: Vec<(&'a CommitJob, u32)>, - pub master: Vec<(&'a CommitJob, u32)>, - pub release: Vec<(&'a CommitJob, String)>, -} - -/// Given a vector of `CommitJobs` bucket them out into; -/// `try`, `master` and `release` (in that order) -pub fn split_queued_commit_jobs(commit_jobs: &[CommitJob]) -> CommitsByType<'_> { - // Split jobs by type as that determines what we enter into the database, - // `ToSql` is quite finiky about lifetimes. Moreover the column names - // change depending on the commit job type. `master` and `try` have - // a `pr` column whereas `release` has a `release_rag` column - let (try_commits, master_commits, release_commits) = commit_jobs.iter().fold( - (vec![], vec![], vec![]), - |(mut try_commits, mut master_commits, mut release_commits), job| { - let entity = job - .as_queued() - .expect("Can only enqueue jobs with a status of `queued`"); - - match &entity.job_type { - crate::CommitJobType::Try(pr) => try_commits.push((job, *pr)), - crate::CommitJobType::Master(pr) => master_commits.push((job, *pr)), - crate::CommitJobType::Release(release_tag) => { - release_commits.push((job, release_tag.clone())) - } - } - (try_commits, master_commits, release_commits) - }, - ); + }; - CommitsByType { - r#try: try_commits, - master: master_commits, - release: release_commits, + CommitJob { + sha, + parent_sha, + commit_time, + target, + include, + exclude, + runs, + backends, + job_type, + state, } } diff --git a/database/src/pool.rs b/database/src/pool.rs index 0147d73c5..bc02624c2 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -180,15 +180,15 @@ pub trait Connection: Send + Sync { /// Removes all data associated with the given artifact. async fn purge_artifact(&self, aid: &ArtifactId); - /// Add a jobs to the queue - async fn enqueue_commit_jobs(&self, jobs: &[CommitJob]); + /// Add a job to the queue + async fn enqueue_commit_job(&self, jobs: &CommitJob); /// Dequeue jobs, we pass `machine_id` and `target` in case there are jobs /// the machine was previously doing and can pick up again - async fn dequeue_commit_job(&self, machine_id: &str, target: Target) -> Option; + async fn take_commit_job(&self, machine_id: &str, target: Target) -> Option; /// Mark the job as finished - async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) -> bool; + async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String); } #[async_trait::async_trait] diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 8d5c7a7cc..bbfe35630 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1,8 +1,8 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::{ - commit_job_create, split_queued_commit_jobs, ArtifactCollection, ArtifactId, ArtifactIdNumber, - Benchmark, CodegenBackend, CollectionId, Commit, CommitJob, CommitType, CompileBenchmark, Date, - Index, Profile, QueuedCommit, Scenario, Target, + commit_job_create, ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, CodegenBackend, + CollectionId, Commit, CommitJob, CommitJobType, CommitType, CompileBenchmark, Date, Index, + Profile, QueuedCommit, Scenario, Target, }; use anyhow::Context as _; use chrono::{DateTime, TimeZone, Utc}; @@ -19,8 +19,6 @@ use tokio_postgres::Statement; pub struct Postgres(String, std::sync::Once); -type PgParam<'a> = &'a (dyn tokio_postgres::types::ToSql + Sync); - impl Postgres { pub fn new(url: String) -> Self { Postgres(url, std::sync::Once::new()) @@ -309,9 +307,6 @@ static MIGRATIONS: &[&str] = &[ retries INTEGER DEFAULT 0, PRIMARY KEY (sha, target) ); - CREATE INDEX IF NOT EXISTS sha_idx ON commit_queue (sha); - CREATE INDEX IF NOT EXISTS machine_id_idx ON commit_queue (machine_id); - CREATE INDEX IF NOT EXISTS sha_machine_id_idx ON commit_queue (sha, machine_id); "#, ]; @@ -1394,87 +1389,123 @@ where .unwrap(); } - async fn enqueue_commit_jobs(&self, jobs: &[CommitJob]) { - if jobs.is_empty() { - return; - } - - let commits_by_type = split_queued_commit_jobs(jobs); - - // Create a bulk insert statment for a specific commit job type i.e; - // ``` - // INSERT INTO - // commit_queue(sha, parent_sha... ) - // VALUES - // ($1, $2, ...), - // ($3, $4, ...), - // ($5, $6, ...) - // ON CONFLICT DO NOTHING; - // ``` - fn make_insert_sql(commit: &CommitJob, rows: usize) -> String { - /* Get the column names we are interested in inserting */ - let cols = commit.get_enqueue_column_names(); - let col_cnt = cols.len(); - let col_sql = cols.join(", "); - - /* ($1,$2,...), ($k,$k+1, etc...) */ - let values = (0..rows) - .map(|r| { - let base = r * col_cnt; - let group = (1..=col_cnt) - .map(|i| format!("${}", base + i)) - .collect::>() - .join(", "); - format!("({})", group) - }) - .collect::>() - .join(", "); + /// Add a job to the queue + async fn enqueue_commit_job(&self, job: &CommitJob) { + match &job.job_type { + CommitJobType::Try { pr } | CommitJobType::Master { pr } => self + .conn() + .execute( + "INSERT INTO commit_queue ( + sha, + parent_sha, + commit_type, + commit_time, + status, + target, + include, + exclude, + runs, + backends, + pr + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT DO NOTHING", + &[ + &job.sha, + &job.parent_sha, + &job.job_type.name(), + &job.commit_time, + &"queued", + &job.target, + &job.include, + &job.exclude, + &job.runs, + &job.backends, + &pr, + ], + ) + .await + .unwrap(), + CommitJobType::Release { tag } => self + .conn() + .execute( + "INSERT INTO commit_queue ( + sha, + parent_sha, + commit_type, + commit_time, + status, + target, + include, + exclude, + runs, + backends, + release_tag + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT DO NOTHING", + &[ + &job.sha, + &job.parent_sha, + &job.job_type.name(), + &job.commit_time, + &"queued", + &job.target, + &job.include, + &job.exclude, + &job.runs, + &job.backends, + &tag, + ], + ) + .await + .unwrap(), + }; + } - format!( - "INSERT INTO commit_queue ({}) VALUES {} ON CONFLICT DO NOTHING", - col_sql, values + async fn take_commit_job(&self, machine_id: &str, target: Target) -> Option { + /// Map a database row from the commit queue to a `CommitJob` + fn commit_queue_row_to_commit_job(row: &tokio_postgres::Row) -> CommitJob { + let sha = row.get::<_, String>(0); + let parent_sha = row.get::<_, String>(1); + let commit_type = row.get::<_, String>(2); + let pr = row.get::<_, Option>(3); + let release_tag = row.get::<_, Option>(4); + let commit_time = row.get::<_, String>(5).parse::().unwrap(); + let target = Target::from_str(&row.get::<_, String>(6)).unwrap(); + let include = row.get::<_, Option>(7); + let exclude = row.get::<_, Option>(8); + let runs = row.get::<_, Option>(9); + let backends = row.get::<_, Option>(10); + let machine_id = row.get::<_, Option>(11); + let started_at = row + .get::<_, Option>(12) + .map(|ts| ts.parse::().unwrap()); + + let finished_at = row + .get::<_, Option>(13) + .map(|ts| ts.parse::().unwrap()); + let status = row.get::<_, String>(14); + + commit_job_create( + sha, + parent_sha, + &commit_type, + pr, + release_tag, + commit_time, + target, + machine_id, + started_at, + finished_at, + &status, + include, + exclude, + runs, + backends, ) } - /* Add the commits to the database in their serialised format */ - async fn add_to_database(client: &P, kind: &str, commit_jobs: Vec<(&CommitJob, T)>) - where - P: Send + Sync + PClient, - T: tokio_postgres::types::ToSql + Sync, - { - if let Some(head) = commit_jobs.first() { - let sql = make_insert_sql(head.0, commit_jobs.len()); - let params = commit_jobs - .iter() - .flat_map(|(commit_job, pr_or_release)| { - [ - &commit_job.sha as PgParam, - &commit_job.parent_sha, - &kind, - &commit_job.commit_time, - &"queued", /* status is always queued */ - &commit_job.target, - &commit_job.include, - &commit_job.exclude, - &commit_job.runs, - &commit_job.backends, - pr_or_release, /* This will either be a `pr` or `relase_tag`*/ - ] - }) - .collect::>(); - client.conn().execute(&sql, ¶ms).await.unwrap(); - } - } - - /* Fire out in parallel */ - tokio::join!( - add_to_database(self, "try", commits_by_type.r#try), - add_to_database(self, "master", commits_by_type.master), - add_to_database(self, "release", commits_by_type.release) - ); - } - - async fn dequeue_commit_job(&self, machine_id: &str, target: Target) -> Option { /* Check to see if this machine possibly went offline while doing * a previous job - if it did we'll take that job * @@ -1554,14 +1585,7 @@ where status, retries FROM commit_queue - WHERE target != $1 - AND status IN ('finished', 'in_progress') - AND sha NOT IN ( - SELECT sha - FROM commit_queue - WHERE target != $1 - AND status = 'finished' - ) + WHERE target != $1 AND status IN ('finished', 'in_progress') ORDER BY started_at LIMIT 1 FOR UPDATE SKIP LOCKED @@ -1607,11 +1631,17 @@ where started_at, finished_at, status, - retries + retries, + CASE + WHEN commit_type = 'release' THEN 0 + WHEN commit_type = 'master' THEN 1 + WHEN commit_type = 'try' THEN 2 + ELSE -1 + END AS type_rank FROM commit_queue WHERE target = $1 AND status = 'queued' - ORDER BY pr ASC, commit_type, sha + ORDER BY type_rank, pr ASC, sha LIMIT 1 FOR UPDATE SKIP LOCKED ) @@ -1639,9 +1669,8 @@ where } /// Mark a job in the database as done - async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) -> bool { - let jobs = self - .conn() + async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) { + self.conn() .query_opt( " UPDATE commit_queue @@ -1649,59 +1678,16 @@ where status = 'finished', WHERE sha = $1 - AND machine_id = $1 - AND target = $1; + AND machine_id = $2 + AND target = $3; ", &[&sha, &machine_id, &target], ) .await .unwrap(); - return jobs.is_some(); } } -/// Map a database row from the commit queue to a `CommitJob` -fn commit_queue_row_to_commit_job(row: &tokio_postgres::Row) -> CommitJob { - let sha = row.get::<_, String>(0); - let parent_sha = row.get::<_, String>(1); - let commit_type = row.get::<_, String>(2); - let pr = row.get::<_, Option>(3); - let release_tag = row.get::<_, Option>(4); - let commit_time = row.get::<_, String>(5).parse::().unwrap(); - let target = Target::from_str(&row.get::<_, String>(6)).unwrap(); - let include = row.get::<_, Option>(7); - let exclude = row.get::<_, Option>(8); - let runs = row.get::<_, Option>(9); - let backends = row.get::<_, Option>(10); - let machine_id = row.get::<_, Option>(11); - let started_at = row - .get::<_, Option>(12) - .map(|ts| ts.parse::().unwrap()); - - let finished_at = row - .get::<_, Option>(13) - .map(|ts| ts.parse::().unwrap()); - let status = row.get::<_, String>(14); - - commit_job_create( - sha, - parent_sha, - &commit_type, - pr, - release_tag, - commit_time, - target, - machine_id, - started_at, - finished_at, - &status, - include, - exclude, - runs, - backends, - ) -} - #[macro_export] macro_rules! impl_to_postgresql_via_to_string { ($t:ty) => { diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index d6d5802d7..791b3bd11 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1,15 +1,14 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::{ - commit_job_create, split_queued_commit_jobs, ArtifactCollection, ArtifactId, Benchmark, - CodegenBackend, CollectionId, Commit, CommitJob, CommitType, CompileBenchmark, Date, Profile, - Target, + commit_job_create, ArtifactCollection, ArtifactId, Benchmark, CodegenBackend, CollectionId, + Commit, CommitJob, CommitJobType, CommitType, CompileBenchmark, Date, Profile, Target, }; use crate::{ArtifactIdNumber, Index, QueuedCommit}; use chrono::{DateTime, TimeZone, Utc}; use hashbrown::HashMap; +use rusqlite::params; use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ValueRef}; use rusqlite::OptionalExtension; -use rusqlite::{params, params_from_iter}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Mutex; @@ -427,9 +426,6 @@ static MIGRATIONS: &[Migration] = &[ retries INTEGER DEFAULT 0, PRIMARY KEY (sha, target) ); - CREATE INDEX IF NOT EXISTS sha_idx ON commit_queue (sha); - CREATE INDEX IF NOT EXISTS machine_id_idx ON commit_queue (machine_id); - CREATE INDEX IF NOT EXISTS sha_machine_id_idx ON commit_queue (sha, machine_id); "#, ), ]; @@ -1281,82 +1277,125 @@ impl Connection for SqliteConnection { .unwrap(); } + /// Add a job to the queue + async fn enqueue_commit_job(&self, job: &CommitJob) { + match &job.job_type { + CommitJobType::Try { pr } | CommitJobType::Master { pr } => self + .raw_ref() + .execute( + "INSERT OR IGNORE commit_queue ( + sha, + parent_sha, + commit_type, + commit_time, + status, + target, + include, + exclude, + runs, + backends, + pr + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT DO NOTHING", + params![ + &job.sha, + &job.parent_sha, + job.job_type.name(), + &job.commit_time, + &"queued", + &job.target, + &job.include, + &job.exclude, + &job.runs, + &job.backends, + &pr, + ], + ) + .unwrap(), + CommitJobType::Release { tag } => self + .raw_ref() + .execute( + "INSERT OR IGNORE INTO commit_queue ( + sha, + parent_sha, + commit_type, + commit_time, + status, + target, + include, + exclude, + runs, + backends, + release_tag + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT DO NOTHING", + params![ + &job.sha, + &job.parent_sha, + &job.job_type.name(), + &job.commit_time, + &"queued", + &job.target, + &job.include, + &job.exclude, + &job.runs, + &job.backends, + &tag, + ], + ) + .unwrap(), + }; + } + /// For this to work we need a central database - async fn enqueue_commit_jobs(&self, jobs: &[CommitJob]) { - if jobs.is_empty() { - return; - } + async fn take_commit_job(&self, machine_id: &str, target: Target) -> Option { + /* This is the same order as the `SELECT ...` below, which is also the + * table creation order */ + fn commit_queue_row_to_commit_job(row: &rusqlite::Row) -> CommitJob { + let sha = row.get::<_, String>(0).unwrap(); + let parent_sha = row.get::<_, String>(1).unwrap(); + let commit_type = row.get::<_, String>(2).unwrap(); + let pr = row.get::<_, Option>(3).unwrap(); + let release_tag = row.get::<_, Option>(4).unwrap(); + let commit_time = row.get::<_, String>(5).unwrap().parse::().unwrap(); + let target = Target::from_str(&row.get::<_, String>(6).unwrap()).unwrap(); + let include = row.get::<_, Option>(7).unwrap(); + let exclude = row.get::<_, Option>(8).unwrap(); + let runs = row.get::<_, Option>(9).unwrap(); + let backends = row.get::<_, Option>(10).unwrap(); + let machine_id = row.get::<_, Option>(11).unwrap(); + let started_at = row + .get::<_, Option>(12) + .unwrap() + .map(|ts| ts.parse::().unwrap()); - let commits_by_type = split_queued_commit_jobs(jobs); - // Create a bulk insert statment for a specific commit job type i.e; - // ``` - // INSERT OR IGNORE INTO - // commit_queue(sha, parent_sha... ) - // VALUES - // (?, ?, ...), - // (?, ?, ...), - // (?, ?, ...); - // ``` - fn make_insert_sql(commit: &CommitJob, rows: usize) -> String { - let column_names = commit.get_enqueue_column_names(); - let column_string_names = column_names.join(", "); - let query_params = std::iter::repeat("?") - .take(column_names.len()) - .collect::>() - .join(", "); - let placeholders = (0..rows) - .map(|_| format!("({})", query_params)) - .collect::>() - .join(", "); - - format!( - "INSERT OR IGNORE INTO commit_queue ({}) VALUES {};", - column_string_names, placeholders + let finished_at = row + .get::<_, Option>(13) + .unwrap() + .map(|ts| ts.parse::().unwrap()); + let status = row.get::<_, String>(14).unwrap(); + + commit_job_create( + sha, + parent_sha, + &commit_type, + pr, + release_tag, + commit_time, + target, + machine_id, + started_at, + finished_at, + &status, + include, + exclude, + runs, + backends, ) } - /* Add the commits to the database in their serialised format */ - fn add_to_database( - client: &SqliteConnection, - kind: &str, - commit_jobs: Vec<(&CommitJob, T)>, - ) where - T: rusqlite::ToSql, - { - if let Some(head) = commit_jobs.first() { - let sql = make_insert_sql(head.0, commit_jobs.len()); - let params = commit_jobs - .iter() - .flat_map(|(commit_job, pr_or_release)| { - [ - &commit_job.sha as &dyn ToSql, - &commit_job.parent_sha, - &kind, - &commit_job.commit_time, - &"queued", /* status is always queued */ - &commit_job.target, - &commit_job.include, - &commit_job.exclude, - &commit_job.runs, - &commit_job.backends, - pr_or_release, /* This will either be a `pr` or `relase_tag`*/ - ] - }) - .collect::>(); - client - .raw_ref() - .execute(&sql, params_from_iter(params)) - .unwrap(); - } - } - - add_to_database(self, "try", commits_by_type.r#try); - add_to_database(self, "master", commits_by_type.master); - add_to_database(self, "release", commits_by_type.release); - } - - /// For this to work we need a central database - async fn dequeue_commit_job(&self, machine_id: &str, target: Target) -> Option { /* Check to see if this machine possibly went offline while doing * a previous job - if it did we'll take that job */ let maybe_previous_job = self @@ -1435,14 +1474,7 @@ impl Connection for SqliteConnection { status, retries FROM commit_queue - WHERE target != ? - AND status IN ('finished', 'in_progress') - AND sha NOT IN ( - SELECT sha - FROM commit_queue - WHERE target != ? - AND status = 'finished' - ) + WHERE target != ? AND status IN ('finished', 'in_progress') ORDER BY started_at LIMIT 1 ) @@ -1490,13 +1522,19 @@ impl Connection for SqliteConnection { started_at, finished_at, status, - retries + retries, + CASE + WHEN commit_type = 'release' THEN 0 + WHEN commit_type = 'master' THEN 1 + WHEN commit_type = 'try' THEN 2 + ELSE -1 + END AS type_rank FROM commit_queue WHERE target = ? AND status = 'queued' ORDER BY + type_rank, pr ASC, - commit_type, sha LIMIT 1 ) @@ -1528,9 +1566,8 @@ impl Connection for SqliteConnection { } /// For this to work we need a central database - async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) -> bool { - let jobs = self - .raw_ref() + async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) { + self.raw_ref() .execute( " UPDATE commit_queue @@ -1544,7 +1581,6 @@ impl Connection for SqliteConnection { params![&sha, machine_id, &target], ) .unwrap(); - return jobs == 1; } } @@ -1587,51 +1623,6 @@ impl FromSql for Date { } } -/* This is the same order as the `SELECT ...` for above, which is also the - * table creation order */ -fn commit_queue_row_to_commit_job(row: &rusqlite::Row) -> CommitJob { - let sha = row.get::<_, String>(0).unwrap(); - let parent_sha = row.get::<_, String>(1).unwrap(); - let commit_type = row.get::<_, String>(2).unwrap(); - let pr = row.get::<_, Option>(3).unwrap(); - let release_tag = row.get::<_, Option>(4).unwrap(); - let commit_time = row.get::<_, String>(5).unwrap().parse::().unwrap(); - let target = Target::from_str(&row.get::<_, String>(6).unwrap()).unwrap(); - let include = row.get::<_, Option>(7).unwrap(); - let exclude = row.get::<_, Option>(8).unwrap(); - let runs = row.get::<_, Option>(9).unwrap(); - let backends = row.get::<_, Option>(10).unwrap(); - let machine_id = row.get::<_, Option>(11).unwrap(); - let started_at = row - .get::<_, Option>(12) - .unwrap() - .map(|ts| ts.parse::().unwrap()); - - let finished_at = row - .get::<_, Option>(13) - .unwrap() - .map(|ts| ts.parse::().unwrap()); - let status = row.get::<_, String>(14).unwrap(); - - commit_job_create( - sha, - parent_sha, - &commit_type, - pr, - release_tag, - commit_time, - target, - machine_id, - started_at, - finished_at, - &status, - include, - exclude, - runs, - backends, - ) -} - fn parse_artifact_id(ty: &str, sha: &str, date: Option) -> ArtifactId { match ty { "master" => ArtifactId::Commit(Commit { From 7cc1e95c9cba00a79b69e93270a0f52676270654 Mon Sep 17 00:00:00 2001 From: Jamesbarford Date: Tue, 20 May 2025 16:18:01 +0100 Subject: [PATCH 3/4] Add tests --- database/src/pool.rs | 143 +++++++++++++++++++++++++++++++- database/src/pool/postgres.rs | 151 ++++++++++------------------------ 2 files changed, 185 insertions(+), 109 deletions(-) diff --git a/database/src/pool.rs b/database/src/pool.rs index bc02624c2..e353546aa 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -312,7 +312,7 @@ mod tests { use std::str::FromStr; use super::*; - use crate::{tests::run_db_test, Commit, CommitType, Date}; + use crate::{tests::run_db_test, Commit, CommitJobState, CommitJobType, CommitType, Date}; /// Create a Commit fn create_commit(commit_sha: &str, time: chrono::DateTime, r#type: CommitType) -> Commit { @@ -323,6 +323,29 @@ mod tests { } } + /// Create a CommitJob + fn create_commit_job( + sha: &str, + parent_sha: &str, + commit_time: chrono::DateTime, + target: Target, + job_type: CommitJobType, + state: CommitJobState, + ) -> CommitJob { + CommitJob { + sha: sha.to_string(), + parent_sha: parent_sha.to_string(), + commit_time: Date(commit_time), + target, + include: None, + exclude: None, + runs: None, + backends: None, + job_type, + state, + } + } + #[tokio::test] async fn pstat_returns_empty_vector_when_empty() { run_db_test(|ctx| async { @@ -381,4 +404,122 @@ mod tests { }) .await; } + + #[tokio::test] + async fn take_commit_job() { + run_db_test(|ctx| async { + // ORDER: + // Releases first + // Master commits second, order by oldest PR ascending + // Try commits last, order by oldest PR ascending + + let db = ctx.db_client().connection().await; + let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); + + // Try commits + let try_job_1 = create_commit_job( + "sha1", + "p1", + time, + Target::X86_64UnknownLinuxGnu, + CommitJobType::Try { pr: 1 }, + CommitJobState::Queued, + ); + let try_job_2 = create_commit_job( + "sha2", + "p2", + time, + Target::X86_64UnknownLinuxGnu, + CommitJobType::Try { pr: 2 }, + CommitJobState::Queued, + ); + + // Master commits + let master_job_1 = create_commit_job( + "sha3", + "p3", + time, + Target::X86_64UnknownLinuxGnu, + CommitJobType::Master { pr: 3 }, + CommitJobState::Queued, + ); + let master_job_2 = create_commit_job( + "sha4", + "p4", + time, + Target::X86_64UnknownLinuxGnu, + CommitJobType::Master { pr: 4 }, + CommitJobState::Queued, + ); + + // Release commits + let release_job_1 = create_commit_job( + "sha5", + "p5", + time, + Target::X86_64UnknownLinuxGnu, + CommitJobType::Release { tag: "tag1".into() }, + CommitJobState::Queued, + ); + let release_job_2 = create_commit_job( + "sha6", + "p6", + time, + Target::X86_64UnknownLinuxGnu, + CommitJobType::Release { tag: "tag2".into() }, + CommitJobState::Queued, + ); + + // Shuffle the insert order a bit + let all_commits = vec![ + release_job_1, + master_job_2, + try_job_1, + release_job_2, + master_job_1, + try_job_2, + ]; + + // queue all the jobs + for commit in all_commits { + db.enqueue_commit_job(&commit).await; + } + + // Now we test the ordering: after each dequeue we immediately mark + // the job as finished for the sake of testing so it can't be + // returned again in the test. + // + // The priority should be; + // + // 1. Release commits (oldest tag first) + // 2. Master commits (oldest PR first) + // 3. Try commits (oldest PR first) + // + // Given the data we inserted above the expected SHA order is: + // sha5, sha6, sha3, sha4, sha1, sha2. + + let machine = "machine-1"; + let target = Target::X86_64UnknownLinuxGnu; + let expected = ["sha5", "sha6", "sha3", "sha4", "sha1", "sha2"]; + + for &sha in &expected { + let job = db.take_commit_job(machine, target).await; + assert!(job.is_some(), "expected a job for sha {sha}"); + let job = job.unwrap(); + assert_eq!(job.sha, sha, "jobs dequeued out of priority order"); + + // Mark the job finished so it is not returned again. + db.finish_commit_job(machine, target, sha.to_string()).await; + } + + // After all six jobs have been taken, the queue should be empty. + assert!( + db.take_commit_job(machine, target).await.is_none(), + "queue should be empty after draining all jobs" + ); + + Ok(ctx) + }) + .await; + } } diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index bbfe35630..5ea3687c4 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -13,7 +13,6 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; -use tokio_postgres::types::{FromSql, ToSql}; use tokio_postgres::GenericClient; use tokio_postgres::Statement; @@ -294,17 +293,16 @@ static MIGRATIONS: &[&str] = &[ commit_type TEXT, pr INTEGER, release_tag TEXT, - commit_time TIMESTAMP, + commit_time TIMESTAMPTZ, target TEXT, include TEXT, exclude TEXT, runs INTEGER DEFAULT 0, backends TEXT, machine_id TEXT, - started_at TIMESTAMP, - finished_at TIMESTAMP, + started_at TIMESTAMPTZ, + finished_at TIMESTAMPTZ, status TEXT, - retries INTEGER DEFAULT 0, PRIMARY KEY (sha, target) ); "#, @@ -1414,14 +1412,14 @@ where &job.sha, &job.parent_sha, &job.job_type.name(), - &job.commit_time, + &job.commit_time.0, &"queued", &job.target, &job.include, &job.exclude, &job.runs, &job.backends, - &pr, + &(*pr as i32), ], ) .await @@ -1448,7 +1446,7 @@ where &job.sha, &job.parent_sha, &job.job_type.name(), - &job.commit_time, + &job.commit_time.0, &"queued", &job.target, &job.include, @@ -1469,22 +1467,17 @@ where let sha = row.get::<_, String>(0); let parent_sha = row.get::<_, String>(1); let commit_type = row.get::<_, String>(2); - let pr = row.get::<_, Option>(3); + let pr = row.get::<_, Option>(3).map(|it| it as u32); let release_tag = row.get::<_, Option>(4); - let commit_time = row.get::<_, String>(5).parse::().unwrap(); + let commit_time = row.get::<_, DateTime>(5); let target = Target::from_str(&row.get::<_, String>(6)).unwrap(); let include = row.get::<_, Option>(7); let exclude = row.get::<_, Option>(8); let runs = row.get::<_, Option>(9); let backends = row.get::<_, Option>(10); let machine_id = row.get::<_, Option>(11); - let started_at = row - .get::<_, Option>(12) - .map(|ts| ts.parse::().unwrap()); - - let finished_at = row - .get::<_, Option>(13) - .map(|ts| ts.parse::().unwrap()); + let started_at = row.get::<_, Option>>(12).map(Date); + let finished_at = row.get::<_, Option>>(13).map(Date); let status = row.get::<_, String>(14); commit_job_create( @@ -1493,7 +1486,7 @@ where &commit_type, pr, release_tag, - commit_time, + Date(commit_time), target, machine_id, started_at, @@ -1506,62 +1499,6 @@ where ) } - /* Check to see if this machine possibly went offline while doing - * a previous job - if it did we'll take that job - * - * `FOR UPDATE SKIP LOCKED`prevents multiple machines of the same - * architecture taking the same job. See here for more information; - * https://www.postgresql.org/docs/17/sql-select.html#SQL-FOR-UPDATE-SHARE */ - let maybe_previous_job = self - .conn() - .query_opt( - " - WITH job_to_update AS ( - SELECT - sha, - parent_sha, - commit_type, - pr, - release_tag, - commit_time, - target, - include, - exclude, - runs, - backends, - machine_id, - started_at, - finished_at, - status, - retries - FROM commit_queue - WHERE machine_id = $1 - AND target = $2 - AND status = 'in_progress' - AND retries < 3 - ORDER BY started_at - LIMIT 1 - - FOR UPDATE SKIP LOCKED - - ) - UPDATE commit_queue AS cq - SET started_at = NOW(), - status = 'in_progress', - retries = cq.retries + 1 - WHERE cq.sha = (SELECT sha FROM job_to_update) - RETURNING cq.*; - ", - &[&machine_id, &target], - ) - .await - .unwrap(); - - /* If it was we will take that job */ - if let Some(row) = maybe_previous_job { - return Some(commit_queue_row_to_commit_job(&row)); - } - let maybe_drift_job = self .conn() .query_opt( @@ -1582,8 +1519,7 @@ where machine_id, started_at, finished_at, - status, - retries + status FROM commit_queue WHERE target != $1 AND status IN ('finished', 'in_progress') ORDER BY started_at @@ -1631,7 +1567,6 @@ where started_at, finished_at, status, - retries, CASE WHEN commit_type = 'release' THEN 0 WHEN commit_type = 'master' THEN 1 @@ -1674,8 +1609,8 @@ where .query_opt( " UPDATE commit_queue - SET finished_at = DATETIME('now'), - status = 'finished', + SET finished_at = NOW(), + status = 'finished' WHERE sha = $1 AND machine_id = $2 @@ -1713,35 +1648,35 @@ macro_rules! impl_to_postgresql_via_to_string { impl_to_postgresql_via_to_string!(Target); -impl ToSql for Date { - fn to_sql( - &self, - ty: &tokio_postgres::types::Type, - out: &mut bytes::BytesMut, - ) -> Result> { - self.0.to_sql(ty, out) - } - - fn accepts(ty: &tokio_postgres::types::Type) -> bool { - as ToSql>::accepts(ty) - } - - tokio_postgres::types::to_sql_checked!(); -} - -impl<'a> FromSql<'a> for Date { - fn from_sql( - ty: &tokio_postgres::types::Type, - raw: &'a [u8], - ) -> Result> { - let dt = DateTime::::from_sql(ty, raw)?; - Ok(Date(dt)) - } - - fn accepts(ty: &tokio_postgres::types::Type) -> bool { - as FromSql>::accepts(ty) - } -} +//impl ToSql for Date { +// fn to_sql( +// &self, +// ty: &tokio_postgres::types::Type, +// out: &mut bytes::BytesMut, +// ) -> Result> { +// self.0.to_sql(ty, out) +// } +// +// fn accepts(ty: &tokio_postgres::types::Type) -> bool { +// as tokio_postgres::types::ToSql>::accepts(ty) +// } +// +// tokio_postgres::types::to_sql_checked!(); +//} +// +//impl<'a> FromSql<'a> for Date { +// fn from_sql( +// ty: &tokio_postgres::types::Type, +// raw: &'a [u8], +// ) -> Result> { +// let dt = DateTime::::from_sql(ty, raw)?; +// Ok(Date(dt)) +// } +// +// fn accepts(ty: &tokio_postgres::types::Type) -> bool { +// as FromSql>::accepts(ty) +// } +//} fn parse_artifact_id(ty: &str, sha: &str, date: Option>) -> ArtifactId { match ty { From 44c34057146015f5a7e3cebe6c0df05fa2bf9519 Mon Sep 17 00:00:00 2001 From: Jamesbarford Date: Tue, 20 May 2025 16:24:55 +0100 Subject: [PATCH 4/4] remove retry logic from the sqlite database too and remove commented out code --- database/src/pool/postgres.rs | 30 ------------------ database/src/pool/sqlite.rs | 57 +---------------------------------- 2 files changed, 1 insertion(+), 86 deletions(-) diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 5ea3687c4..26a35ed31 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1648,36 +1648,6 @@ macro_rules! impl_to_postgresql_via_to_string { impl_to_postgresql_via_to_string!(Target); -//impl ToSql for Date { -// fn to_sql( -// &self, -// ty: &tokio_postgres::types::Type, -// out: &mut bytes::BytesMut, -// ) -> Result> { -// self.0.to_sql(ty, out) -// } -// -// fn accepts(ty: &tokio_postgres::types::Type) -> bool { -// as tokio_postgres::types::ToSql>::accepts(ty) -// } -// -// tokio_postgres::types::to_sql_checked!(); -//} -// -//impl<'a> FromSql<'a> for Date { -// fn from_sql( -// ty: &tokio_postgres::types::Type, -// raw: &'a [u8], -// ) -> Result> { -// let dt = DateTime::::from_sql(ty, raw)?; -// Ok(Date(dt)) -// } -// -// fn accepts(ty: &tokio_postgres::types::Type) -> bool { -// as FromSql>::accepts(ty) -// } -//} - fn parse_artifact_id(ty: &str, sha: &str, date: Option>) -> ArtifactId { match ty { "master" => ArtifactId::Commit(Commit { diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index 791b3bd11..49b637cd4 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -423,7 +423,6 @@ static MIGRATIONS: &[Migration] = &[ started_at TIMESTAMP, finished_at TIMESTAMP, status TEXT, - retries INTEGER DEFAULT 0, PRIMARY KEY (sha, target) ); "#, @@ -1396,58 +1395,6 @@ impl Connection for SqliteConnection { ) } - /* Check to see if this machine possibly went offline while doing - * a previous job - if it did we'll take that job */ - let maybe_previous_job = self - .raw_ref() - .prepare( - " - WITH job_to_update AS ( - SELECT - sha, - parent_sha, - commit_type, - pr, - release_tag, - commit_time, - target, - include, - exclude, - runs, - backends, - machine_id, - started_at, - finished_at, - status, - retries - FROM commit_queue - WHERE machine_id = ? - AND target = ? - AND status = 'in_progress' - AND retries < 3 - ORDER BY started_at - LIMIT 1 - ) - UPDATE commit_queue AS cq - SET started_at = DATETIME('now'), - status = 'in_progress', - retries = cq.retries + 1 - WHERE cq.sha = (SELECT sha FROM job_to_update) - RETURNING *; - ", - ) - .unwrap() - .query_map(params![machine_id, &target], |row| { - Ok(commit_queue_row_to_commit_job(row)) - }) - .unwrap() - .map(|row| row.unwrap()) - .collect::>(); - - if let Some(previous_job) = maybe_previous_job.first() { - return Some(previous_job.clone()); - } - /* Check to see if we are out of sync with other collectors of * different architectures, if we are we will update the row and * return this `sha` */ @@ -1471,8 +1418,7 @@ impl Connection for SqliteConnection { machine_id, started_at, finished_at, - status, - retries + status FROM commit_queue WHERE target != ? AND status IN ('finished', 'in_progress') ORDER BY started_at @@ -1522,7 +1468,6 @@ impl Connection for SqliteConnection { started_at, finished_at, status, - retries, CASE WHEN commit_type = 'release' THEN 0 WHEN commit_type = 'master' THEN 1