From edc1c69eefe7ec37ff4f6c749ecf7e293994748d Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Wed, 18 Jun 2025 20:23:46 +0800 Subject: [PATCH 1/9] feat(query): support pep723 scripts in python udf scripts --- .../src/pipelines/builders/builder_udf.rs | 8 +- .../transforms/transform_udf_script.rs | 161 +++++++++++++++++- .../suites/udf_native/03_0001_udf_py.test | 12 ++ 3 files changed, 173 insertions(+), 8 deletions(-) diff --git a/src/query/service/src/pipelines/builders/builder_udf.rs b/src/query/service/src/pipelines/builders/builder_udf.rs index da9b35d5559bf..de5ca3c3bb2a4 100644 --- a/src/query/service/src/pipelines/builders/builder_udf.rs +++ b/src/query/service/src/pipelines/builders/builder_udf.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use databend_common_exception::Result; use databend_common_pipeline_transforms::processors::TransformPipelineHelper; use databend_common_sql::executor::physical_plans::Udf; @@ -25,13 +27,15 @@ impl PipelineBuilder { self.build_pipeline(&udf.input)?; if udf.script_udf { - let runtimes = TransformUdfScript::init_runtime(&udf.udf_funcs)?; + let (runtimes, py_temp_dir) = TransformUdfScript::init_runtime(&udf.udf_funcs)?; + let py_temp_dir = Arc::new(py_temp_dir); self.main_pipeline.try_add_transformer(|| { Ok(TransformUdfScript::new( self.func_ctx.clone(), udf.udf_funcs.clone(), runtimes.clone(), - )) + ) + .with_py_temp_dir(py_temp_dir.clone())) }) } else { let semaphore = TransformUdfServer::init_semaphore(self.ctx.clone())?; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index 6c2db1990d755..2b4a20f9d63d5 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -16,6 +16,7 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::sync::atomic::AtomicUsize; use std::sync::Arc; +use std::sync::LazyLock; use arrow_array::RecordBatch; use arrow_udf_runtime::javascript::FunctionOptions; @@ -38,6 +39,7 @@ use databend_common_sql::executor::physical_plans::UdfFunctionDesc; use databend_common_sql::plans::UDFLanguage; use databend_common_sql::plans::UDFScriptCode; use databend_common_sql::plans::UDFType; +use tempfile::TempDir; use super::runtime_pool::Pool; use super::runtime_pool::RuntimeBuilder; @@ -49,8 +51,11 @@ pub enum ScriptRuntime { Python(python_pool::PyRuntimePool), } +static PY_VERSION: LazyLock = + LazyLock::new(|| uv::detect_python_version().unwrap_or("3.12".to_string())); + impl ScriptRuntime { - pub fn try_create(func: &UdfFunctionDesc) -> Result { + pub fn try_create(func: &UdfFunctionDesc, temp_dir: &Option) -> Result { let UDFType::Script(UDFScriptCode { language, code, .. }) = &func.udf_type else { unreachable!() }; @@ -81,10 +86,22 @@ impl ScriptRuntime { } #[cfg(feature = "python-udf")] UDFLanguage::Python => { + let code = String::from_utf8(code.to_vec())?; + let code = if let Some(temp_dir) = temp_dir { + format!( + "import sys\nsys.path.append('{}/.venv/lib/python{}/site-packages')\n{}", + temp_dir.path().display(), + PY_VERSION.as_str(), + code + ) + } else { + code + }; + let builder = PyRuntimeBuilder { name: func.name.clone(), handler: func.func_name.clone(), - code: String::from_utf8(code.to_vec())?, + code, output_type: func.data_type.as_ref().clone(), counter: Default::default(), }; @@ -260,6 +277,7 @@ mod python_pool { pub struct TransformUdfScript { funcs: Vec, script_runtimes: BTreeMap>, + py_temp_dir: Arc>, } impl TransformUdfScript { @@ -271,8 +289,14 @@ impl TransformUdfScript { Self { funcs, script_runtimes, + py_temp_dir: Arc::new(None), } } + + pub fn with_py_temp_dir(mut self, py_temp_dir: Arc>) -> Self { + self.py_temp_dir = py_temp_dir; + self + } } impl Transform for TransformUdfScript { @@ -298,9 +322,11 @@ impl Transform for TransformUdfScript { } impl TransformUdfScript { - pub fn init_runtime(funcs: &[UdfFunctionDesc]) -> Result>> { + pub fn init_runtime( + funcs: &[UdfFunctionDesc], + ) -> Result<(BTreeMap>, Option)> { let mut script_runtimes: BTreeMap> = BTreeMap::new(); - + let temp_dir = Self::prepare_py_env(funcs)?; for func in funcs { let code = match &func.udf_type { UDFType::Script(code) => code, @@ -308,7 +334,7 @@ impl TransformUdfScript { }; if let Entry::Vacant(entry) = script_runtimes.entry(func.name.clone()) { - let runtime = ScriptRuntime::try_create(func).map_err(|err| { + let runtime = ScriptRuntime::try_create(func, &temp_dir).map_err(|err| { ErrorCode::UDFDataError(format!( "Failed to create UDF runtime for language {:?} with error: {err}", code.language @@ -318,7 +344,62 @@ impl TransformUdfScript { }; } - Ok(script_runtimes) + Ok((script_runtimes, temp_dir)) + } + + // returns the injection codes for python + fn prepare_py_env(funcs: &[UdfFunctionDesc]) -> Result> { + let mut dependencies = Vec::new(); + for func in funcs { + match &func.udf_type { + UDFType::Script(UDFScriptCode { + language: UDFLanguage::Python, + code, + .. + }) => { + let code = String::from_utf8(code.to_vec())?; + dependencies.extend_from_slice(&Self::extract_deps(&code)); + } + _ => continue, + }; + } + + // use uv to install dependencies + if !dependencies.is_empty() { + dependencies.dedup(); + + let temp_dir = uv::create_uv_env(PY_VERSION.as_str())?; + uv::install_deps(temp_dir.path(), &dependencies)?; + return Ok(Some(temp_dir)); + } + + Ok(None) + } + + fn extract_deps(script: &str) -> Vec { + let mut ss = String::new(); + let mut meta_start = false; + for line in script.lines() { + if meta_start { + if line.starts_with("# ///") { + break; + } + ss.push_str(line.trim_start_matches('#').trim()); + ss.push('\n'); + } + if !meta_start && line.starts_with("# /// script") { + meta_start = true; + } + } + + let parsed = ss.parse::().unwrap(); + if let Some(deps) = parsed["dependencies"].as_array() { + deps.iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect() + } else { + Vec::new() + } } fn prepare_block_entries( @@ -431,3 +512,71 @@ impl TransformUdfScript { Ok(()) } } + +mod uv { + use std::path::Path; + use std::process::Command; + + use tempfile::TempDir; + + pub fn install_deps(temp_dir_path: &Path, deps: &[String]) -> Result<(), String> { + let status = Command::new("uv") + .current_dir(temp_dir_path.join(".venv")) + .args(&["pip", "install"]) + .args(deps) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .status() + .map_err(|e| format!("Failed to install dependencies: {}", e))?; + + log::info!("Dependency installation success {}", deps.join(", ")); + + if status.success() { + Ok(()) + } else { + Err("Dependency installation failed".into()) + } + } + + pub fn create_uv_env(python_version: &str) -> Result { + let temp_dir = + tempfile::tempdir().map_err(|e| format!("Failed to create temp dir: {}", e))?; + let env_path = temp_dir.path().join(".venv"); + + Command::new("uv") + .args(&[ + "venv", + "--python", + python_version, + env_path.to_str().unwrap(), + ]) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .status() + .map_err(|e| format!("Failed to create UV env: {}", e))?; + + Ok(temp_dir) + } + + pub fn detect_python_version() -> Result { + let output = Command::new("python") + .arg("--version") + .output() + .map_err(|e| format!("Failed to detect python version: {}", e))?; + + if output.status.success() { + let version = String::from_utf8_lossy(&output.stdout); + let version = version + .trim() + .to_string() + .replace("Python ", "") + .split('.') + .take(2) + .collect::>() + .join("."); + Ok(version) + } else { + Err("Failed to detect python version".into()) + } + } +} diff --git a/tests/sqllogictests/suites/udf_native/03_0001_udf_py.test b/tests/sqllogictests/suites/udf_native/03_0001_udf_py.test index 059763c946573..36265872dafa6 100644 --- a/tests/sqllogictests/suites/udf_native/03_0001_udf_py.test +++ b/tests/sqllogictests/suites/udf_native/03_0001_udf_py.test @@ -1,7 +1,19 @@ ## enable it when compiled with ee feature ## statement ok ## CREATE OR REPLACE FUNCTION gcd_py (INT, INT) RETURNS BIGINT LANGUAGE python HANDLER = 'gcd' AS $$ +## # /// script +## # requires-python = ">=3.12" +## # dependencies = ["numpy", "pandas"] +## # /// +## import numpy as np +## import pandas as pd +## ## def gcd(a: int, b: int) -> int: +## x = int(pd.DataFrame(np.random.rand(3, 3)).sum().sum()) +## a += x +## b -= x +## a -= x +## b += x ## while b: ## a, b = b, a % b ## return a From 6b6040d0ddc8e2c527e16aeafe40027d37463cfe Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Wed, 18 Jun 2025 12:46:00 +0000 Subject: [PATCH 2/9] feat(query): support pep723 scripts in python udf scripts --- .../pipelines/processors/transforms/transform_udf_script.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index 2b4a20f9d63d5..13ee257f1dcc9 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -522,7 +522,7 @@ mod uv { pub fn install_deps(temp_dir_path: &Path, deps: &[String]) -> Result<(), String> { let status = Command::new("uv") .current_dir(temp_dir_path.join(".venv")) - .args(&["pip", "install"]) + .args(["pip", "install"]) .args(deps) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::null()) @@ -544,7 +544,7 @@ mod uv { let env_path = temp_dir.path().join(".venv"); Command::new("uv") - .args(&[ + .args([ "venv", "--python", python_version, From e26d021219a1170695fbae9d42190e5d5287064d Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Wed, 18 Jun 2025 12:51:02 +0000 Subject: [PATCH 3/9] feat(query): support pep723 scripts in python udf scripts --- .../processors/transforms/transform_udf_script.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index 13ee257f1dcc9..e64e6effa2d0f 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -55,7 +55,7 @@ static PY_VERSION: LazyLock = LazyLock::new(|| uv::detect_python_version().unwrap_or("3.12".to_string())); impl ScriptRuntime { - pub fn try_create(func: &UdfFunctionDesc, temp_dir: &Option) -> Result { + pub fn try_create(func: &UdfFunctionDesc, _temp_dir: &Option) -> Result { let UDFType::Script(UDFScriptCode { language, code, .. }) = &func.udf_type else { unreachable!() }; @@ -87,7 +87,7 @@ impl ScriptRuntime { #[cfg(feature = "python-udf")] UDFLanguage::Python => { let code = String::from_utf8(code.to_vec())?; - let code = if let Some(temp_dir) = temp_dir { + let code = if let Some(temp_dir) = _temp_dir { format!( "import sys\nsys.path.append('{}/.venv/lib/python{}/site-packages')\n{}", temp_dir.path().display(), @@ -321,10 +321,9 @@ impl Transform for TransformUdfScript { } } +type RuntimeTimeRes = (BTreeMap>, Option); impl TransformUdfScript { - pub fn init_runtime( - funcs: &[UdfFunctionDesc], - ) -> Result<(BTreeMap>, Option)> { + pub fn init_runtime(funcs: &[UdfFunctionDesc]) -> Result { let mut script_runtimes: BTreeMap> = BTreeMap::new(); let temp_dir = Self::prepare_py_env(funcs)?; for func in funcs { From 5332f9142c74a8564359f4441e6f41b945f605e7 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Thu, 19 Jun 2025 11:20:38 +0000 Subject: [PATCH 4/9] chore(query): fix --- .../src/pipelines/builders/builder_udf.rs | 8 +- .../transforms/transform_udf_script.rs | 164 ++++++++++-------- .../executor/physical_plans/physical_udf.rs | 2 +- 3 files changed, 98 insertions(+), 76 deletions(-) diff --git a/src/query/service/src/pipelines/builders/builder_udf.rs b/src/query/service/src/pipelines/builders/builder_udf.rs index de5ca3c3bb2a4..da9b35d5559bf 100644 --- a/src/query/service/src/pipelines/builders/builder_udf.rs +++ b/src/query/service/src/pipelines/builders/builder_udf.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use databend_common_exception::Result; use databend_common_pipeline_transforms::processors::TransformPipelineHelper; use databend_common_sql::executor::physical_plans::Udf; @@ -27,15 +25,13 @@ impl PipelineBuilder { self.build_pipeline(&udf.input)?; if udf.script_udf { - let (runtimes, py_temp_dir) = TransformUdfScript::init_runtime(&udf.udf_funcs)?; - let py_temp_dir = Arc::new(py_temp_dir); + let runtimes = TransformUdfScript::init_runtime(&udf.udf_funcs)?; self.main_pipeline.try_add_transformer(|| { Ok(TransformUdfScript::new( self.func_ctx.clone(), udf.udf_funcs.clone(), runtimes.clone(), - ) - .with_py_temp_dir(py_temp_dir.clone())) + )) }) } else { let semaphore = TransformUdfServer::init_semaphore(self.ctx.clone())?; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index e64e6effa2d0f..519499f055ff2 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -21,6 +21,7 @@ use std::sync::LazyLock; use arrow_array::RecordBatch; use arrow_udf_runtime::javascript::FunctionOptions; use databend_common_base::runtime::GlobalIORuntime; +use databend_common_cache::Cache; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::converts::arrow::ARROW_EXT_TYPE_VARIANT; @@ -52,10 +53,10 @@ pub enum ScriptRuntime { } static PY_VERSION: LazyLock = - LazyLock::new(|| uv::detect_python_version().unwrap_or("3.12".to_string())); + LazyLock::new(|| venv::detect_python_version().unwrap_or("3.12".to_string())); impl ScriptRuntime { - pub fn try_create(func: &UdfFunctionDesc, _temp_dir: &Option) -> Result { + pub fn try_create(func: &UdfFunctionDesc, _temp_dir: Option>) -> Result { let UDFType::Script(UDFScriptCode { language, code, .. }) = &func.udf_type else { unreachable!() }; @@ -89,9 +90,8 @@ impl ScriptRuntime { let code = String::from_utf8(code.to_vec())?; let code = if let Some(temp_dir) = _temp_dir { format!( - "import sys\nsys.path.append('{}/.venv/lib/python{}/site-packages')\n{}", + "import sys\nsys.path.append('{}')\n{}", temp_dir.path().display(), - PY_VERSION.as_str(), code ) } else { @@ -276,27 +276,20 @@ mod python_pool { pub struct TransformUdfScript { funcs: Vec, - script_runtimes: BTreeMap>, - py_temp_dir: Arc>, + script_runtimes: RuntimeTimeRes, } impl TransformUdfScript { pub fn new( _func_ctx: FunctionContext, funcs: Vec, - script_runtimes: BTreeMap>, + script_runtimes: RuntimeTimeRes, ) -> Self { Self { funcs, script_runtimes, - py_temp_dir: Arc::new(None), } } - - pub fn with_py_temp_dir(mut self, py_temp_dir: Arc>) -> Self { - self.py_temp_dir = py_temp_dir; - self - } } impl Transform for TransformUdfScript { @@ -313,7 +306,7 @@ impl Transform for TransformUdfScript { let num_rows = data_block.num_rows(); let block_entries = self.prepare_block_entries(func, &data_block)?; let input_batch = self.create_input_batch(block_entries, num_rows)?; - let runtime = self.script_runtimes.get(&func.name).unwrap(); + let (runtime, _) = self.script_runtimes.get(&func.name).unwrap(); let result_batch = runtime.handle_execution(func, &input_batch)?; self.update_datablock(func, result_batch, &mut data_block)?; } @@ -321,58 +314,61 @@ impl Transform for TransformUdfScript { } } -type RuntimeTimeRes = (BTreeMap>, Option); +type RuntimeTimeRes = BTreeMap, Option>)>; + impl TransformUdfScript { pub fn init_runtime(funcs: &[UdfFunctionDesc]) -> Result { - let mut script_runtimes: BTreeMap> = BTreeMap::new(); - let temp_dir = Self::prepare_py_env(funcs)?; + let mut script_runtimes = BTreeMap::new(); for func in funcs { - let code = match &func.udf_type { - UDFType::Script(code) => code, + let (code, code_str) = match &func.udf_type { + UDFType::Script(script_code) => { + (script_code, String::from_utf8(script_code.code.to_vec())?) + } _ => continue, }; - if let Entry::Vacant(entry) = script_runtimes.entry(func.name.clone()) { - let runtime = ScriptRuntime::try_create(func, &temp_dir).map_err(|err| { - ErrorCode::UDFDataError(format!( - "Failed to create UDF runtime for language {:?} with error: {err}", - code.language - )) - })?; - entry.insert(Arc::new(runtime)); - }; - } - - Ok((script_runtimes, temp_dir)) - } - - // returns the injection codes for python - fn prepare_py_env(funcs: &[UdfFunctionDesc]) -> Result> { - let mut dependencies = Vec::new(); - for func in funcs { - match &func.udf_type { + let temp_dir = match &func.udf_type { UDFType::Script(UDFScriptCode { language: UDFLanguage::Python, - code, .. }) => { - let code = String::from_utf8(code.to_vec())?; - dependencies.extend_from_slice(&Self::extract_deps(&code)); + let dependencies = Self::extract_deps(&code_str); + if !dependencies.is_empty() { + // try to find the temp dir from cache + let key = venv::PyVenvKeyEntry { + udf_desc: func.clone(), + }; + let mut w = venv::PY_VENV_CACHE.write(); + let entry = w.get(&key); + if let Some(entry) = entry { + Some(entry.temp_dir.clone()) + } else { + let temp_dir = Arc::new(venv::create_venv(PY_VERSION.as_str())?); + venv::install_deps(temp_dir.path(), &dependencies)?; + w.insert(key, venv::PyVenvCacheEntry { + temp_dir: temp_dir.clone(), + }); + Some(temp_dir) + } + } else { + None + } } - _ => continue, + _ => None, }; - } - // use uv to install dependencies - if !dependencies.is_empty() { - dependencies.dedup(); - - let temp_dir = uv::create_uv_env(PY_VERSION.as_str())?; - uv::install_deps(temp_dir.path(), &dependencies)?; - return Ok(Some(temp_dir)); + if let Entry::Vacant(entry) = script_runtimes.entry(func.name.clone()) { + let runtime = ScriptRuntime::try_create(func, temp_dir.clone()).map_err(|err| { + ErrorCode::UDFDataError(format!( + "Failed to create UDF runtime for language {:?} with error: {err}", + code.language + )) + })?; + entry.insert((Arc::new(runtime), temp_dir)); + }; } - Ok(None) + Ok(script_runtimes) } fn extract_deps(script: &str) -> Vec { @@ -512,17 +508,24 @@ impl TransformUdfScript { } } -mod uv { +mod venv { use std::path::Path; use std::process::Command; + use std::sync::Arc; + use std::sync::LazyLock; + use databend_common_cache::LruCache; + use databend_common_cache::MemSized; + use databend_common_sql::executor::physical_plans::UdfFunctionDesc; + use parking_lot::RwLock; use tempfile::TempDir; pub fn install_deps(temp_dir_path: &Path, deps: &[String]) -> Result<(), String> { - let status = Command::new("uv") - .current_dir(temp_dir_path.join(".venv")) - .args(["pip", "install"]) + let target_path = temp_dir_path.display().to_string(); + let status = Command::new("python") + .args(["-m", "pip", "install"]) .args(deps) + .args(["--target", &target_path]) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::null()) .status() @@ -537,22 +540,17 @@ mod uv { } } - pub fn create_uv_env(python_version: &str) -> Result { + pub fn create_venv(_python_version: &str) -> Result { let temp_dir = tempfile::tempdir().map_err(|e| format!("Failed to create temp dir: {}", e))?; - let env_path = temp_dir.path().join(".venv"); - - Command::new("uv") - .args([ - "venv", - "--python", - python_version, - env_path.to_str().unwrap(), - ]) - .stdout(std::process::Stdio::null()) - .stderr(std::process::Stdio::null()) - .status() - .map_err(|e| format!("Failed to create UV env: {}", e))?; + + // let env_path = temp_dir.path().join(".venv"); + // Command::new("python") + // .args(["-m", "venv", env_path.to_str().unwrap()]) + // .stdout(std::process::Stdio::null()) + // .stderr(std::process::Stdio::null()) + // .status() + // .map_err(|e| format!("Failed to create venv: {}", e))?; Ok(temp_dir) } @@ -578,4 +576,32 @@ mod uv { Err("Failed to detect python version".into()) } } + + // cached temp dir for python udf + // Add this after the PY_VERSION LazyLock declaration + // A simple LRU cache for Python virtual environments + #[derive(Clone)] + pub(crate) struct PyVenvCacheEntry { + pub(crate) temp_dir: Arc, + } + + #[derive(Eq, Hash, PartialEq)] + pub(crate) struct PyVenvKeyEntry { + pub(crate) udf_desc: UdfFunctionDesc, + } + + impl MemSized for PyVenvKeyEntry { + fn mem_bytes(&self) -> usize { + std::mem::size_of::() + } + } + + impl MemSized for PyVenvCacheEntry { + fn mem_bytes(&self) -> usize { + std::mem::size_of::() + } + } + + pub static PY_VENV_CACHE: LazyLock>> = + LazyLock::new(|| RwLock::new(LruCache::with_items_capacity(64))); } diff --git a/src/query/sql/src/executor/physical_plans/physical_udf.rs b/src/query/sql/src/executor/physical_plans/physical_udf.rs index bc22c5dc1ee92..acfd628b36af3 100644 --- a/src/query/sql/src/executor/physical_plans/physical_udf.rs +++ b/src/query/sql/src/executor/physical_plans/physical_udf.rs @@ -55,7 +55,7 @@ impl Udf { } } -#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, Eq, Hash, PartialEq, serde::Serialize, serde::Deserialize)] pub struct UdfFunctionDesc { pub name: String, pub func_name: String, From 37338b629c0ac2cc309854b5e77e706ba612d681 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Fri, 20 Jun 2025 15:08:01 +0800 Subject: [PATCH 5/9] update --- .../src/principal/user_defined_function.rs | 14 +- .../src/udf_from_to_protobuf_impl.rs | 8 + src/meta/proto-conv/src/util.rs | 1 + src/meta/proto-conv/tests/it/main.rs | 2 +- ...tatype.rs => v129_vector_datatype copy.rs} | 0 .../tests/it/v130_udf_imports_packages.rs | 275 ++++++++++++++++++ src/meta/protos/proto/udf.proto | 6 +- src/query/ast/src/ast/statements/udf.rs | 33 ++- src/query/ast/src/parser/statement.rs | 20 ++ src/query/ast/src/parser/token.rs | 4 + src/query/ast/tests/it/parser.rs | 2 + .../ast/tests/it/testdata/stmt-error.txt | 2 +- src/query/ast/tests/it/testdata/stmt.txt | 23 +- src/query/expression/src/schema.rs | 5 + .../transforms/aggregator/udaf_script.rs | 3 + .../transforms/transform_udf_script.rs | 47 ++- .../physical_plans/physical_exchange.rs | 1 + .../sql/src/planner/binder/copy_into_table.rs | 26 +- src/query/sql/src/planner/binder/mod.rs | 1 + src/query/sql/src/planner/binder/udf.rs | 14 + .../sql/src/planner/plans/scalar_expr.rs | 8 +- .../sql/src/planner/semantic/type_check.rs | 28 ++ 22 files changed, 503 insertions(+), 20 deletions(-) rename src/meta/proto-conv/tests/it/{v129_vector_datatype.rs => v129_vector_datatype copy.rs} (100%) create mode 100644 src/meta/proto-conv/tests/it/v130_udf_imports_packages.rs diff --git a/src/meta/app/src/principal/user_defined_function.rs b/src/meta/app/src/principal/user_defined_function.rs index cba5326217820..208f98385a822 100644 --- a/src/meta/app/src/principal/user_defined_function.rs +++ b/src/meta/app/src/principal/user_defined_function.rs @@ -40,6 +40,8 @@ pub struct UDFServer { #[derive(Clone, Debug, Eq, PartialEq)] pub struct UDFScript { pub code: String, + pub imports: Vec, + pub packages: Vec, pub handler: String, pub language: String, pub arg_types: Vec, @@ -50,6 +52,8 @@ pub struct UDFScript { #[derive(Clone, Debug, Eq, PartialEq)] pub struct UDAFScript { pub code: String, + pub imports: Vec, + pub packages: Vec, pub language: String, // aggregate function input types pub arg_types: Vec, @@ -167,6 +171,8 @@ impl UserDefinedFunction { arg_types, return_type, runtime_version: runtime_version.to_string(), + imports: vec![], + packages: vec![], }), created_on: Utc::now(), } @@ -226,6 +232,8 @@ impl Display for UDFDefinition { handler, language, runtime_version, + imports, + packages, }) => { for (i, item) in arg_types.iter().enumerate() { if i > 0 { @@ -235,7 +243,7 @@ impl Display for UDFDefinition { } write!( f, - ") RETURNS {return_type} LANGUAGE {language} RUNTIME_VERSION = {runtime_version} HANDLER = {handler} AS $${code}$$" + ") RETURNS {return_type} LANGUAGE {language} IMPORTS = {imports:?} PACKAGES = {packages:?} RUNTIME_VERSION = {runtime_version} HANDLER = {handler} AS $${code}$$" )?; } UDFDefinition::UDAFScript(UDAFScript { @@ -245,6 +253,8 @@ impl Display for UDFDefinition { return_type, language, runtime_version, + imports, + packages, }) => { for (i, item) in arg_types.iter().enumerate() { if i > 0 { @@ -259,7 +269,7 @@ impl Display for UDFDefinition { } write!(f, "{} {}", item.name(), item.data_type())?; } - write!(f, " }} RETURNS {return_type} LANGUAGE {language} RUNTIME_VERSION = {runtime_version} AS $${code}$$")?; + write!(f, " }} RETURNS {return_type} LANGUAGE {language} IMPORTS = {imports:?} PACKAGES = {packages:?} RUNTIME_VERSION = {runtime_version} AS $${code}$$")?; } } Ok(()) diff --git a/src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs index b3b39297719aa..3a11a5218b25b 100644 --- a/src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/udf_from_to_protobuf_impl.rs @@ -137,6 +137,8 @@ impl FromToProto for mt::UDFScript { handler: p.handler, language: p.language, runtime_version: p.runtime_version, + imports: p.imports, + packages: p.packages, }) } @@ -171,6 +173,8 @@ impl FromToProto for mt::UDFScript { arg_types, return_type: Some(return_type), runtime_version: self.runtime_version.clone(), + imports: self.imports.clone(), + packages: self.packages.clone(), }) } } @@ -206,6 +210,8 @@ impl FromToProto for mt::UDAFScript { return_type, language: p.language, runtime_version: p.runtime_version, + imports: p.imports, + packages: p.packages, state_fields, }) } @@ -259,6 +265,8 @@ impl FromToProto for mt::UDAFScript { arg_types, state_fields, return_type: Some(return_type), + imports: self.imports.clone(), + packages: self.packages.clone(), }) } } diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index 00be0d24f677f..6396122b8f5cd 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -159,6 +159,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (127, "2025-05-18: Add: UserOption::workload_group"), (128, "2025-05-22: Add: Storage Network config"), (129, "2025-05-30: Add: New DataType Vector"), + (130, "2025-06-19: Add: New UDF imports and packages in udf definition"), // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index d00f0494f499f..624bcc1f44bca 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -120,4 +120,4 @@ mod v125_table_index; mod v126_iceberg_storage_catalog_option; mod v127_user_option_workload_group; mod v128_storage_network_config; -mod v129_vector_datatype; +mod v130_udf_imports_packages; diff --git a/src/meta/proto-conv/tests/it/v129_vector_datatype.rs b/src/meta/proto-conv/tests/it/v129_vector_datatype copy.rs similarity index 100% rename from src/meta/proto-conv/tests/it/v129_vector_datatype.rs rename to src/meta/proto-conv/tests/it/v129_vector_datatype copy.rs diff --git a/src/meta/proto-conv/tests/it/v130_udf_imports_packages.rs b/src/meta/proto-conv/tests/it/v130_udf_imports_packages.rs new file mode 100644 index 0000000000000..cc5abd246eec6 --- /dev/null +++ b/src/meta/proto-conv/tests/it/v130_udf_imports_packages.rs @@ -0,0 +1,275 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use chrono::TimeZone; +use chrono::Utc; +use databend_common_expression::types::decimal::DecimalSize; +use databend_common_expression::types::DecimalDataType; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::VectorDataType; +use databend_common_expression::TableDataType; +use databend_common_expression::TableField; +use databend_common_expression::TableSchema; +use databend_common_expression::VariantDataType; +use databend_common_expression::VirtualDataField; +use databend_common_expression::VirtualDataSchema; +use databend_common_expression::VIRTUAL_COLUMN_ID_START; +use databend_common_meta_app::schema as mt; +use databend_common_meta_app::schema::TableIndex; +use databend_common_meta_app::schema::TableIndexType; +use fastrace::func_name; +use maplit::btreemap; +use maplit::btreeset; + +use crate::common; + +// These bytes are built when a new version in introduced, +// and are kept for backward compatibility test. +// +// ************************************************************* +// * These messages should never be updated, * +// * only be added when a new version is added, * +// * or be removed when an old version is no longer supported. * +// ************************************************************* +// +// The message bytes are built from the output of `test_pb_from_to()` +#[test] +fn test_decode_v129_schema() -> anyhow::Result<()> { + let table_schema_v129 = vec![ + 10, 31, 10, 1, 97, 26, 19, 154, 2, 9, 34, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, + 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 10, 113, 10, 1, 98, 26, 99, 202, 2, 89, 10, 2, 98, + 49, 10, 2, 98, 50, 18, 51, 202, 2, 41, 10, 3, 98, 49, 49, 10, 3, 98, 49, 50, 18, 10, 138, + 2, 0, 160, 6, 129, 1, 168, 6, 24, 18, 10, 146, 2, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, + 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 18, 19, 154, 2, 9, 66, 0, 160, 6, 129, 1, + 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, + 24, 32, 1, 160, 6, 129, 1, 168, 6, 24, 10, 33, 10, 1, 99, 26, 19, 154, 2, 9, 34, 0, 160, 6, + 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 4, 160, 6, 129, 1, 168, 6, 24, 10, 53, + 10, 10, 100, 101, 99, 105, 109, 97, 108, 49, 50, 56, 26, 30, 218, 2, 20, 10, 11, 8, 18, 16, + 3, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, + 5, 160, 6, 129, 1, 168, 6, 24, 10, 53, 10, 10, 100, 101, 99, 105, 109, 97, 108, 50, 53, 54, + 26, 30, 218, 2, 20, 18, 11, 8, 46, 16, 6, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, + 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 6, 160, 6, 129, 1, 168, 6, 24, 10, 32, 10, 9, 101, + 109, 112, 116, 121, 95, 109, 97, 112, 26, 10, 226, 2, 0, 160, 6, 129, 1, 168, 6, 24, 32, 7, + 160, 6, 129, 1, 168, 6, 24, 10, 29, 10, 6, 98, 105, 116, 109, 97, 112, 26, 10, 234, 2, 0, + 160, 6, 129, 1, 168, 6, 24, 32, 8, 160, 6, 129, 1, 168, 6, 24, 10, 27, 10, 4, 103, 101, + 111, 109, 26, 10, 250, 2, 0, 160, 6, 129, 1, 168, 6, 24, 32, 9, 160, 6, 129, 1, 168, 6, 24, + 10, 31, 10, 8, 105, 110, 116, 101, 114, 118, 97, 108, 26, 10, 138, 3, 0, 160, 6, 129, 1, + 168, 6, 24, 32, 10, 160, 6, 129, 1, 168, 6, 24, 10, 50, 10, 6, 118, 101, 99, 116, 111, 114, + 26, 31, 146, 3, 21, 10, 9, 74, 0, 160, 6, 129, 1, 168, 6, 24, 16, 128, 2, 160, 6, 129, 1, + 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 11, 160, 6, 129, 1, 168, 6, 24, 24, 12, 160, 6, + 129, 1, 168, 6, 24, + ]; + + let b1 = TableDataType::Tuple { + fields_name: vec!["b11".to_string(), "b12".to_string()], + fields_type: vec![TableDataType::Boolean, TableDataType::String], + }; + let b = TableDataType::Tuple { + fields_name: vec!["b1".to_string(), "b2".to_string()], + fields_type: vec![b1, TableDataType::Number(NumberDataType::Int64)], + }; + let fields = vec![ + TableField::new("a", TableDataType::Number(NumberDataType::UInt64)), + TableField::new("b", b), + TableField::new("c", TableDataType::Number(NumberDataType::UInt64)), + TableField::new( + "decimal128", + TableDataType::Decimal(DecimalDataType::Decimal128(DecimalSize::new_unchecked( + 18, 3, + ))), + ), + TableField::new( + "decimal256", + TableDataType::Decimal(DecimalDataType::Decimal256(DecimalSize::new_unchecked( + 46, 6, + ))), + ), + TableField::new("empty_map", TableDataType::EmptyMap), + TableField::new("bitmap", TableDataType::Bitmap), + TableField::new("geom", TableDataType::Geometry), + TableField::new("interval", TableDataType::Interval), + TableField::new( + "vector", + TableDataType::Vector(VectorDataType::Float32(256)), + ), + ]; + let want = || TableSchema::new(fields.clone()); + common::test_pb_from_to(func_name!(), want())?; + // common::test_pb_from_to2(func_name!(), want())?; + common::test_load_old(func_name!(), table_schema_v129.as_slice(), 129, want())?; + Ok(()) +} + +#[test] +fn test_decode_v129_table_meta() -> anyhow::Result<()> { + let table_meta_v129 = vec![ + 10, 224, 7, 10, 55, 10, 8, 110, 117, 108, 108, 97, 98, 108, 101, 18, 5, 97, 32, 43, 32, 51, + 26, 29, 178, 2, 19, 154, 2, 9, 42, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, + 24, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 10, 27, 10, 4, 98, 111, 111, + 108, 26, 10, 138, 2, 0, 160, 6, 129, 1, 168, 6, 24, 32, 1, 160, 6, 129, 1, 168, 6, 24, 10, + 36, 10, 4, 105, 110, 116, 56, 26, 19, 154, 2, 9, 42, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, + 129, 1, 168, 6, 24, 32, 2, 160, 6, 129, 1, 168, 6, 24, 10, 37, 10, 5, 105, 110, 116, 49, + 54, 26, 19, 154, 2, 9, 50, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, + 3, 160, 6, 129, 1, 168, 6, 24, 10, 37, 10, 5, 105, 110, 116, 51, 50, 26, 19, 154, 2, 9, 58, + 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 4, 160, 6, 129, 1, 168, 6, + 24, 10, 37, 10, 5, 105, 110, 116, 54, 52, 26, 19, 154, 2, 9, 66, 0, 160, 6, 129, 1, 168, 6, + 24, 160, 6, 129, 1, 168, 6, 24, 32, 5, 160, 6, 129, 1, 168, 6, 24, 10, 37, 10, 5, 117, 105, + 110, 116, 56, 26, 19, 154, 2, 9, 10, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, + 24, 32, 6, 160, 6, 129, 1, 168, 6, 24, 10, 38, 10, 6, 117, 105, 110, 116, 49, 54, 26, 19, + 154, 2, 9, 18, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 7, 160, 6, + 129, 1, 168, 6, 24, 10, 38, 10, 6, 117, 105, 110, 116, 51, 50, 26, 19, 154, 2, 9, 26, 0, + 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 8, 160, 6, 129, 1, 168, 6, 24, + 10, 38, 10, 6, 117, 105, 110, 116, 54, 52, 26, 19, 154, 2, 9, 34, 0, 160, 6, 129, 1, 168, + 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 9, 160, 6, 129, 1, 168, 6, 24, 10, 39, 10, 7, 102, + 108, 111, 97, 116, 51, 50, 26, 19, 154, 2, 9, 74, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, + 129, 1, 168, 6, 24, 32, 10, 160, 6, 129, 1, 168, 6, 24, 10, 39, 10, 7, 102, 108, 111, 97, + 116, 54, 52, 26, 19, 154, 2, 9, 82, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, + 24, 32, 11, 160, 6, 129, 1, 168, 6, 24, 10, 27, 10, 4, 100, 97, 116, 101, 26, 10, 170, 2, + 0, 160, 6, 129, 1, 168, 6, 24, 32, 12, 160, 6, 129, 1, 168, 6, 24, 10, 32, 10, 9, 116, 105, + 109, 101, 115, 116, 97, 109, 112, 26, 10, 162, 2, 0, 160, 6, 129, 1, 168, 6, 24, 32, 13, + 160, 6, 129, 1, 168, 6, 24, 10, 29, 10, 6, 115, 116, 114, 105, 110, 103, 26, 10, 146, 2, 0, + 160, 6, 129, 1, 168, 6, 24, 32, 14, 160, 6, 129, 1, 168, 6, 24, 10, 70, 10, 6, 115, 116, + 114, 117, 99, 116, 26, 51, 202, 2, 41, 10, 3, 102, 111, 111, 10, 3, 98, 97, 114, 18, 10, + 138, 2, 0, 160, 6, 129, 1, 168, 6, 24, 18, 10, 146, 2, 0, 160, 6, 129, 1, 168, 6, 24, 160, + 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 15, 160, 6, 129, 1, 168, 6, 24, 10, + 38, 10, 5, 97, 114, 114, 97, 121, 26, 20, 186, 2, 10, 138, 2, 0, 160, 6, 129, 1, 168, 6, + 24, 160, 6, 129, 1, 168, 6, 24, 32, 17, 160, 6, 129, 1, 168, 6, 24, 10, 30, 10, 7, 118, 97, + 114, 105, 97, 110, 116, 26, 10, 210, 2, 0, 160, 6, 129, 1, 168, 6, 24, 32, 18, 160, 6, 129, + 1, 168, 6, 24, 10, 36, 10, 13, 118, 97, 114, 105, 97, 110, 116, 95, 97, 114, 114, 97, 121, + 26, 10, 210, 2, 0, 160, 6, 129, 1, 168, 6, 24, 32, 19, 160, 6, 129, 1, 168, 6, 24, 10, 37, + 10, 14, 118, 97, 114, 105, 97, 110, 116, 95, 111, 98, 106, 101, 99, 116, 26, 10, 210, 2, 0, + 160, 6, 129, 1, 168, 6, 24, 32, 20, 160, 6, 129, 1, 168, 6, 24, 10, 31, 10, 8, 105, 110, + 116, 101, 114, 118, 97, 108, 26, 10, 250, 1, 0, 160, 6, 129, 1, 168, 6, 24, 32, 21, 160, 6, + 129, 1, 168, 6, 24, 10, 29, 10, 6, 98, 105, 116, 109, 97, 112, 26, 10, 234, 2, 0, 160, 6, + 129, 1, 168, 6, 24, 32, 22, 160, 6, 129, 1, 168, 6, 24, 10, 27, 10, 4, 103, 101, 111, 109, + 26, 10, 250, 2, 0, 160, 6, 129, 1, 168, 6, 24, 32, 23, 160, 6, 129, 1, 168, 6, 24, 10, 31, + 10, 8, 105, 110, 116, 101, 114, 118, 97, 108, 26, 10, 138, 3, 0, 160, 6, 129, 1, 168, 6, + 24, 32, 24, 160, 6, 129, 1, 168, 6, 24, 10, 50, 10, 6, 118, 101, 99, 116, 111, 114, 26, 31, + 146, 3, 21, 10, 9, 74, 0, 160, 6, 129, 1, 168, 6, 24, 16, 128, 2, 160, 6, 129, 1, 168, 6, + 24, 160, 6, 129, 1, 168, 6, 24, 32, 25, 160, 6, 129, 1, 168, 6, 24, 18, 6, 10, 1, 97, 18, + 1, 98, 24, 26, 160, 6, 129, 1, 168, 6, 24, 42, 10, 10, 3, 120, 121, 122, 18, 3, 102, 111, + 111, 50, 2, 52, 52, 58, 10, 10, 3, 97, 98, 99, 18, 3, 100, 101, 102, 64, 0, 74, 10, 40, 97, + 32, 43, 32, 50, 44, 32, 98, 41, 162, 1, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, 56, 32, 49, + 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 170, 1, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, + 57, 32, 49, 50, 58, 48, 48, 58, 49, 48, 32, 85, 84, 67, 178, 1, 13, 116, 97, 98, 108, 101, + 95, 99, 111, 109, 109, 101, 110, 116, 186, 1, 7, 160, 6, 129, 1, 168, 6, 24, 202, 1, 1, 99, + 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, + 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, + 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, + 202, 1, 1, 99, 202, 1, 1, 99, 226, 1, 1, 1, 234, 1, 6, 10, 1, 97, 18, 1, 98, 250, 1, 78, + 10, 1, 118, 18, 73, 10, 1, 118, 18, 1, 1, 24, 1, 34, 40, 55, 52, 101, 99, 55, 100, 51, 51, + 50, 54, 56, 48, 102, 57, 101, 54, 48, 50, 51, 52, 99, 48, 55, 102, 53, 100, 101, 102, 56, + 100, 48, 97, 53, 50, 98, 48, 102, 98, 53, 53, 42, 12, 10, 4, 116, 121, 112, 101, 18, 4, + 104, 110, 115, 119, 48, 2, 160, 6, 129, 1, 168, 6, 24, 130, 2, 56, 10, 31, 10, 7, 102, 105, + 101, 108, 100, 95, 48, 18, 2, 10, 0, 18, 2, 50, 0, 18, 4, 58, 2, 10, 0, 24, 19, 32, 128, + 188, 193, 150, 11, 18, 6, 10, 1, 97, 18, 1, 98, 24, 129, 188, 193, 150, 11, 32, 10, 160, 6, + 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, + ]; + + let want = || mt::TableMeta { + schema: Arc::new(TableSchema::new_from( + vec![ + TableField::new( + "nullable", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Int8))), + ) + .with_default_expr(Some("a + 3".to_string())), + TableField::new("bool", TableDataType::Boolean), + TableField::new("int8", TableDataType::Number(NumberDataType::Int8)), + TableField::new("int16", TableDataType::Number(NumberDataType::Int16)), + TableField::new("int32", TableDataType::Number(NumberDataType::Int32)), + TableField::new("int64", TableDataType::Number(NumberDataType::Int64)), + TableField::new("uint8", TableDataType::Number(NumberDataType::UInt8)), + TableField::new("uint16", TableDataType::Number(NumberDataType::UInt16)), + TableField::new("uint32", TableDataType::Number(NumberDataType::UInt32)), + TableField::new("uint64", TableDataType::Number(NumberDataType::UInt64)), + TableField::new("float32", TableDataType::Number(NumberDataType::Float32)), + TableField::new("float64", TableDataType::Number(NumberDataType::Float64)), + TableField::new("date", TableDataType::Date), + TableField::new("timestamp", TableDataType::Timestamp), + TableField::new("string", TableDataType::String), + TableField::new("struct", TableDataType::Tuple { + fields_name: vec![s("foo"), s("bar")], + fields_type: vec![TableDataType::Boolean, TableDataType::String], + }), + TableField::new( + "array", + TableDataType::Array(Box::new(TableDataType::Boolean)), + ), + TableField::new("variant", TableDataType::Variant), + TableField::new("variant_array", TableDataType::Variant), + TableField::new("variant_object", TableDataType::Variant), + // NOTE: It is safe to convert Interval to NULL, because `Interval` is never really used. + TableField::new("interval", TableDataType::Null), + TableField::new("bitmap", TableDataType::Bitmap), + TableField::new("geom", TableDataType::Geometry), + TableField::new("interval", TableDataType::Interval), + TableField::new( + "vector", + TableDataType::Vector(VectorDataType::Float32(256)), + ), + ], + btreemap! {s("a") => s("b")}, + )), + engine: "44".to_string(), + storage_params: None, + part_prefix: "".to_string(), + engine_options: btreemap! {s("abc") => s("def")}, + options: btreemap! {s("xyz") => s("foo")}, + cluster_key: Some("(a + 2, b)".to_string()), + cluster_key_seq: 0, + created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), + updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), + comment: s("table_comment"), + field_comments: vec!["c".to_string(); 21], + virtual_schema: Some(VirtualDataSchema { + fields: vec![VirtualDataField { + name: "field_0".to_string(), + data_types: vec![ + VariantDataType::Jsonb, + VariantDataType::String, + VariantDataType::Array(Box::new(VariantDataType::Jsonb)), + ], + source_column_id: 19, + column_id: VIRTUAL_COLUMN_ID_START, + }], + metadata: btreemap! {s("a") => s("b")}, + next_column_id: VIRTUAL_COLUMN_ID_START + 1, + number_of_blocks: 10, + }), + drop_on: None, + statistics: Default::default(), + shared_by: btreeset! {1}, + column_mask_policy: Some(btreemap! {s("a") => s("b")}), + indexes: btreemap! {s("v") => TableIndex { + index_type: TableIndexType::Vector, + name: "v".to_string(), + column_ids: vec![1], + sync_creation: true, + version: "74ec7d332680f9e60234c07f5def8d0a52b0fb55".to_string(), + options: btreemap! {s("type") => s("hnsw")}, + }}, + }; + common::test_pb_from_to(func_name!(), want())?; + // common::test_pb_from_to2(func_name!(), want())?; + common::test_load_old(func_name!(), table_meta_v129.as_slice(), 129, want())?; + + Ok(()) +} + +fn s(ss: impl ToString) -> String { + ss.to_string() +} diff --git a/src/meta/protos/proto/udf.proto b/src/meta/protos/proto/udf.proto index 024ed8c21ecca..8365e16f98d9a 100644 --- a/src/meta/protos/proto/udf.proto +++ b/src/meta/protos/proto/udf.proto @@ -24,7 +24,7 @@ message LambdaUDF { uint64 min_reader_ver = 101; repeated string parameters = 1; - string definition = 2; + string definition = 2; } message UDFServer { @@ -49,6 +49,8 @@ message UDFScript { repeated DataType arg_types = 4; DataType return_type = 5; string runtime_version = 6; + repeated string imports = 7; + repeated string packages = 8; } message UDAFScript { @@ -61,6 +63,8 @@ message UDAFScript { DataType return_type = 4; repeated DataType arg_types = 5; repeated DataField state_fields = 6; + repeated string imports = 7; + repeated string packages = 8; } message UserDefinedFunction { diff --git a/src/query/ast/src/ast/statements/udf.rs b/src/query/ast/src/ast/statements/udf.rs index 3b324ebce4ade..3077b41a718db 100644 --- a/src/query/ast/src/ast/statements/udf.rs +++ b/src/query/ast/src/ast/statements/udf.rs @@ -18,7 +18,9 @@ use std::fmt::Formatter; use derive_visitor::Drive; use derive_visitor::DriveMut; +use itertools::Itertools; +use crate::ast::quote::QuotedString; use crate::ast::write_comma_separated_list; use crate::ast::CreateOption; use crate::ast::Expr; @@ -43,6 +45,8 @@ pub enum UDFDefinition { arg_types: Vec, return_type: TypeName, code: String, + imports: Vec, + packages: Vec, handler: String, language: String, runtime_version: String, @@ -59,6 +63,8 @@ pub enum UDFDefinition { arg_types: Vec, state_fields: Vec, return_type: TypeName, + imports: Vec, + packages: Vec, code: String, language: String, runtime_version: String, @@ -109,12 +115,23 @@ impl Display for UDFDefinition { handler, language, runtime_version: _, + imports, + packages, } => { write!(f, "( ")?; write_comma_separated_list(f, arg_types)?; + let imports = imports + .iter() + .map(|s| QuotedString(s, '\'').to_string()) + .join(","); + let packages = packages + .iter() + .map(|s| QuotedString(s, '\'').to_string()) + .join(","); write!( f, - " ) RETURNS {return_type} LANGUAGE {language} HANDLER = '{handler}' AS $$\n{code}\n$$" + " ) RETURNS {return_type} LANGUAGE {language} IMPORTS = ({}) PACKAGES = ({}) HANDLER = '{handler}' AS $$\n{code}\n$$", + imports, packages )?; } UDFDefinition::UDAFServer { @@ -149,14 +166,26 @@ impl Display for UDFDefinition { code, language, runtime_version: _, + imports, + packages, } => { + let imports = imports + .iter() + .map(|s| QuotedString(s, '\'').to_string()) + .join(","); + let packages = packages + .iter() + .map(|s| QuotedString(s, '\'').to_string()) + .join(","); + write!(f, "( ")?; write_comma_separated_list(f, arg_types)?; write!(f, " ) STATE {{ ")?; write_comma_separated_list(f, state_types)?; write!( f, - " }} RETURNS {return_type} LANGUAGE {language} AS $$\n{code}\n$$" + " }} RETURNS {return_type} LANGUAGE {language} IMPORTS = ({}) PACKAGES = ({}) AS $$\n{code}\n$$", + imports, packages )?; } } diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index ac1a7a89b34d8..c651f166ffd7f 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -4777,6 +4777,8 @@ pub fn udf_definition(i: Input) -> IResult { "(" ~ #comma_separated_list0(type_name) ~ ")" ~ RETURNS ~ #type_name ~ LANGUAGE ~ #ident + ~ ( IMPORTS ~ ^"=" ~ "(" ~ #comma_separated_list0(literal_string) ~ ")" )? + ~ ( PACKAGES ~ ^"=" ~ "(" ~ #comma_separated_list0(literal_string) ~ ")" )? ~ HANDLER ~ ^"=" ~ ^#literal_string ~ ( HEADERS ~ ^"=" ~ "(" ~ #comma_separated_list0(udf_header) ~ ")" )? ~ #udf_script_or_address @@ -4789,6 +4791,8 @@ pub fn udf_definition(i: Input) -> IResult { return_type, _, language, + imports, + packages, _, _, handler, @@ -4800,6 +4804,12 @@ pub fn udf_definition(i: Input) -> IResult { arg_types, return_type, code: address_or_code.0, + imports: imports + .map(|(_, _, _, imports, _)| imports) + .unwrap_or_default(), + packages: packages + .map(|(_, _, _, packages, _)| packages) + .unwrap_or_default(), handler, language: language.to_string(), // TODO inject runtime_version by user @@ -4827,6 +4837,8 @@ pub fn udf_definition(i: Input) -> IResult { ~ STATE ~ "{" ~ #comma_separated_list0(udaf_state_field) ~ "}" ~ RETURNS ~ #type_name ~ LANGUAGE ~ #ident + ~ ( IMPORTS ~ ^"=" ~ "(" ~ #comma_separated_list0(literal_string) ~ ")" )? + ~ ( PACKAGES ~ ^"=" ~ "(" ~ #comma_separated_list0(literal_string) ~ ")" )? ~ ( HEADERS ~ ^"=" ~ "(" ~ #comma_separated_list0(udf_header) ~ ")" )? ~ #udf_script_or_address }, @@ -4842,6 +4854,8 @@ pub fn udf_definition(i: Input) -> IResult { return_type, _, language, + imports, + packages, headers, address_or_code, )| { @@ -4852,6 +4866,12 @@ pub fn udf_definition(i: Input) -> IResult { return_type, code: address_or_code.0, language: language.to_string(), + imports: imports + .map(|(_, _, _, imports, _)| imports) + .unwrap_or_default(), + packages: packages + .map(|(_, _, _, packages, _)| packages) + .unwrap_or_default(), // TODO inject runtime_version by user // Now we use fixed runtime version runtime_version: "".to_string(), diff --git a/src/query/ast/src/parser/token.rs b/src/query/ast/src/parser/token.rs index d6b249ad24ab0..ded8e40d31a32 100644 --- a/src/query/ast/src/parser/token.rs +++ b/src/query/ast/src/parser/token.rs @@ -1342,6 +1342,10 @@ pub enum TokenKind { HEADERS, #[token("LANGUAGE", ignore(ascii_case))] LANGUAGE, + #[token("IMPORTS", ignore(ascii_case))] + IMPORTS, + #[token("PACKAGES", ignore(ascii_case))] + PACKAGES, #[token("STATE", ignore(ascii_case))] STATE, #[token("TASK", ignore(ascii_case))] diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index 7f09b694db2e8..32de2ca45ff42 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -828,6 +828,8 @@ SELECT * from s;"#, create or replace function addone(int) returns int language python + imports = ('@ss/abc') + packages = ('numpy', 'pandas') handler = 'addone_py' as '@data/abc/a.py'; "#, diff --git a/src/query/ast/tests/it/testdata/stmt-error.txt b/src/query/ast/tests/it/testdata/stmt-error.txt index 8960235081be5..da8c2a887c70b 100644 --- a/src/query/ast/tests/it/testdata/stmt-error.txt +++ b/src/query/ast/tests/it/testdata/stmt-error.txt @@ -983,7 +983,7 @@ error: --> SQL:1:85 | 1 | CREATE FUNCTION my_agg (INT) STATE { s STRING } RETURNS BOOLEAN LANGUAGE javascript HANDLER = 'my_agg' ADDRESS = 'http://0.0.0.0:8815'; - | ------ - ^^^^^^^ unexpected `HANDLER`, expecting `HEADERS`, `ADDRESS`, or `AS` + | ------ - ^^^^^^^ unexpected `HANDLER`, expecting `HEADERS`, `ADDRESS`, `PACKAGES`, `AS`, or `IMPORTS` | | | | | while parsing (, ...) STATE {, ...} RETURNS LANGUAGE { ADDRESS= | AS } | while parsing `CREATE [OR REPLACE] FUNCTION [IF NOT EXISTS] [DESC = ]` diff --git a/src/query/ast/tests/it/testdata/stmt.txt b/src/query/ast/tests/it/testdata/stmt.txt index 182b30fa46c39..386d00e806847 100644 --- a/src/query/ast/tests/it/testdata/stmt.txt +++ b/src/query/ast/tests/it/testdata/stmt.txt @@ -24965,7 +24965,7 @@ def addone_py(i): return i+1 $$; ---------- Output --------- -CREATE OR REPLACE FUNCTION addone ( Int32 ) RETURNS Int32 LANGUAGE python HANDLER = 'addone_py' AS $$ +CREATE OR REPLACE FUNCTION addone ( Int32 ) RETURNS Int32 LANGUAGE python IMPORTS = () PACKAGES = () HANDLER = 'addone_py' AS $$ def addone_py(i): return i+1 $$ @@ -24988,6 +24988,8 @@ CreateUDF( ], return_type: Int32, code: "def addone_py(i):\nreturn i+1", + imports: [], + packages: [], handler: "addone_py", language: "python", runtime_version: "", @@ -25000,10 +25002,12 @@ CreateUDF( create or replace function addone(int) returns int language python +imports = ('@ss/abc') +packages = ('numpy', 'pandas') handler = 'addone_py' as '@data/abc/a.py'; ---------- Output --------- -CREATE OR REPLACE FUNCTION addone ( Int32 ) RETURNS Int32 LANGUAGE python HANDLER = 'addone_py' AS $$ +CREATE OR REPLACE FUNCTION addone ( Int32 ) RETURNS Int32 LANGUAGE python IMPORTS = ('@ss/abc') PACKAGES = ('numpy','pandas') HANDLER = 'addone_py' AS $$ @data/abc/a.py $$ ---------- AST ------------ @@ -25025,6 +25029,13 @@ CreateUDF( ], return_type: Int32, code: "@data/abc/a.py", + imports: [ + "@ss/abc", + ], + packages: [ + "numpy", + "pandas", + ], handler: "addone_py", language: "python", runtime_version: "", @@ -25115,7 +25126,7 @@ CreateUDF( ---------- Input ---------- CREATE FUNCTION IF NOT EXISTS my_agg (INT) STATE { s STRING, i INT NOT NULL } RETURNS BOOLEAN LANGUAGE javascript AS 'some code'; ---------- Output --------- -CREATE FUNCTION IF NOT EXISTS my_agg ( Int32 ) STATE { s STRING, i Int32 NOT NULL } RETURNS BOOLEAN LANGUAGE javascript AS $$ +CREATE FUNCTION IF NOT EXISTS my_agg ( Int32 ) STATE { s STRING, i Int32 NOT NULL } RETURNS BOOLEAN LANGUAGE javascript IMPORTS = () PACKAGES = () AS $$ some code $$ ---------- AST ------------ @@ -25162,6 +25173,8 @@ CreateUDF( }, ], return_type: Boolean, + imports: [], + packages: [], code: "some code", language: "javascript", runtime_version: "", @@ -25173,7 +25186,7 @@ CreateUDF( ---------- Input ---------- ALTER FUNCTION my_agg (INT) STATE { s STRING } RETURNS BOOLEAN LANGUAGE javascript AS 'some code'; ---------- Output --------- -ALTER FUNCTION my_agg ( Int32 ) STATE { s STRING } RETURNS BOOLEAN LANGUAGE javascript AS $$ +ALTER FUNCTION my_agg ( Int32 ) STATE { s STRING } RETURNS BOOLEAN LANGUAGE javascript IMPORTS = () PACKAGES = () AS $$ some code $$ ---------- AST ------------ @@ -25206,6 +25219,8 @@ AlterUDF( }, ], return_type: Boolean, + imports: [], + packages: [], code: "some code", language: "javascript", runtime_version: "", diff --git a/src/query/expression/src/schema.rs b/src/query/expression/src/schema.rs index 34b0f865c41c4..b9cac4f93560c 100644 --- a/src/query/expression/src/schema.rs +++ b/src/query/expression/src/schema.rs @@ -365,6 +365,11 @@ impl DataSchema { } } let valid_fields: Vec = self.fields.iter().map(|f| f.name().clone()).collect(); + panic!( + "Unable to get field named \"{}\". Valid fields: {:?}", + name, valid_fields + ); + Err(ErrorCode::BadArguments(format!( "Unable to get field named \"{}\". Valid fields: {:?}", name, valid_fields diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs index feb6e2225e35d..78b6430c3c7c6 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/udaf_script.rs @@ -613,6 +613,9 @@ def finish(state): language: UDFLanguage::Python, code: code.into(), runtime_version: "3.12".to_string(), + imports: vec![], + imports_stage_info: vec![], + packages: vec![], }; let name = "test".to_string(); let display_name = "test".to_string(); diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index 519499f055ff2..a114a5b1cb733 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -40,6 +40,7 @@ use databend_common_sql::executor::physical_plans::UdfFunctionDesc; use databend_common_sql::plans::UDFLanguage; use databend_common_sql::plans::UDFScriptCode; use databend_common_sql::plans::UDFType; +use databend_common_storage::init_stage_operator; use tempfile::TempDir; use super::runtime_pool::Pool; @@ -90,7 +91,11 @@ impl ScriptRuntime { let code = String::from_utf8(code.to_vec())?; let code = if let Some(temp_dir) = _temp_dir { format!( - "import sys\nsys.path.append('{}')\n{}", + r#"import sys +sys._xoptions['databend_import_directory'] = '{}' +sys.path.append('{}') +{}"#, + temp_dir.path().display(), temp_dir.path().display(), code ) @@ -330,10 +335,14 @@ impl TransformUdfScript { let temp_dir = match &func.udf_type { UDFType::Script(UDFScriptCode { language: UDFLanguage::Python, + packages, + imports_stage_info, .. }) => { - let dependencies = Self::extract_deps(&code_str); - if !dependencies.is_empty() { + let mut dependencies = Self::extract_deps(&code_str); + dependencies.extend_from_slice(packages.as_slice()); + + let temp_dir = if !dependencies.is_empty() || !imports_stage_info.is_empty() { // try to find the temp dir from cache let key = venv::PyVenvKeyEntry { udf_desc: func.clone(), @@ -352,7 +361,31 @@ impl TransformUdfScript { } } else { None + }; + + if !imports_stage_info.is_empty() { + let imports_stage_info = imports_stage_info.clone(); + let temp_dir_path = temp_dir.as_ref().unwrap().path(); + databend_common_base::runtime::block_on(async move { + let mut fts = Vec::with_capacity(imports_stage_info.len()); + for (stage, path) in imports_stage_info.iter() { + let op = init_stage_operator(stage)?; + let name = path.trim_end_matches('/').split('/').last().unwrap(); + let temp_file = temp_dir_path.join(name); + fts.push(async move { + let buffer = op.read(&path).await?; + databend_common_base::base::tokio::fs::write( + &temp_file, + buffer.to_bytes().as_ref(), + ) + .await + }); + } + let _ = futures::future::join_all(fts).await; + Ok::<(), ErrorCode>(()) + })?; } + temp_dir } _ => None, }; @@ -388,6 +421,11 @@ impl TransformUdfScript { } let parsed = ss.parse::().unwrap(); + + if parsed.get("dependencies").is_none() { + return Vec::new(); + } + if let Some(deps) = parsed["dependencies"].as_array() { deps.iter() .filter_map(|v| v.as_str().map(|s| s.to_string())) @@ -521,6 +559,9 @@ mod venv { use tempfile::TempDir; pub fn install_deps(temp_dir_path: &Path, deps: &[String]) -> Result<(), String> { + if deps.is_empty() { + return Ok(()); + } let target_path = temp_dir_path.display().to_string(); let status = Command::new("python") .args(["-m", "pip", "install"]) diff --git a/src/query/sql/src/executor/physical_plans/physical_exchange.rs b/src/query/sql/src/executor/physical_plans/physical_exchange.rs index 1e831519c415b..df3ab03c1a008 100644 --- a/src/query/sql/src/executor/physical_plans/physical_exchange.rs +++ b/src/query/sql/src/executor/physical_plans/physical_exchange.rs @@ -64,6 +64,7 @@ impl PhysicalPlanBuilder { let mut allow_adjust_parallelism = true; let kind = match exchange { crate::plans::Exchange::Hash(scalars) => { + println!("exchange hash, scalars: {:?}", scalars); for scalar in scalars { let expr = scalar .type_check(input_schema.as_ref())? diff --git a/src/query/sql/src/planner/binder/copy_into_table.rs b/src/query/sql/src/planner/binder/copy_into_table.rs index 7604b7aa78456..b8acbcf085766 100644 --- a/src/query/sql/src/planner/binder/copy_into_table.rs +++ b/src/query/sql/src/planner/binder/copy_into_table.rs @@ -631,24 +631,28 @@ fn check_transform_query( /// copy into mytable from @my_ext_stage /// file_format = (type = csv); /// ``` -/// +/// location can be: +/// - mystage +/// - mystage/ +/// - mystage/abc +/// - ~/abc /// Returns user's stage info and relative path towards the stage's root. /// /// If input location is empty we will convert it to `/` means the root of stage /// -/// - @mystage => (mystage, "/") +/// - mystage => (mystage, "/") /// /// If input location is endswith `/`, it's a folder. /// -/// - @mystage/ => (mystage, "/") +/// - mystage/ => (mystage, "/") /// /// Otherwise, it's a file /// -/// - @mystage/abc => (mystage, "abc") +/// - mystage/abc => (mystage, "abc") /// /// For internal stage, we will also add prefix `/stage//` /// -/// - @internal/abc => (internal, "/stage/internal/abc") +/// - ~/abc => (internal, "/stage/internal/abc") #[async_backtrace::framed] pub async fn resolve_stage_location( ctx: &dyn TableContext, @@ -672,6 +676,18 @@ pub async fn resolve_stage_location( Ok((stage, path.to_string())) } +#[async_backtrace::framed] +pub async fn resolve_stage_locations( + ctx: &dyn TableContext, + locations: &[String], +) -> Result> { + let mut results = Vec::with_capacity(locations.len()); + for location in locations { + results.push(resolve_stage_location(ctx, location).await?); + } + Ok(results) +} + #[async_backtrace::framed] pub async fn resolve_file_location( ctx: &dyn TableContext, diff --git a/src/query/sql/src/planner/binder/mod.rs b/src/query/sql/src/planner/binder/mod.rs index e4f71398bf523..36bf3dec60ed6 100644 --- a/src/query/sql/src/planner/binder/mod.rs +++ b/src/query/sql/src/planner/binder/mod.rs @@ -73,6 +73,7 @@ pub use column_binding::ColumnBindingBuilder; pub use column_binding::DummyColumnType; pub use copy_into_table::resolve_file_location; pub use copy_into_table::resolve_stage_location; +pub use copy_into_table::resolve_stage_locations; pub use default_expr::DefaultExprBinder; pub use explain::ExplainConfig; pub use internal_column_factory::INTERNAL_COLUMN_FACTORY; diff --git a/src/query/sql/src/planner/binder/udf.rs b/src/query/sql/src/planner/binder/udf.rs index e2e0e970908e0..713069f6f6640 100644 --- a/src/query/sql/src/planner/binder/udf.rs +++ b/src/query/sql/src/planner/binder/udf.rs @@ -146,6 +146,8 @@ impl Binder { handler, language, runtime_version, + imports, + packages, } => { UDFValidator::is_udf_script_allowed(&language.parse()?)?; let definition = create_udf_definition_script( @@ -153,6 +155,8 @@ impl Binder { None, return_type, runtime_version, + imports, + packages, handler, language, code, @@ -171,12 +175,16 @@ impl Binder { code, language, runtime_version, + imports, + packages, } => { let definition = create_udf_definition_script( arg_types, Some(state_fields), return_type, runtime_version, + imports, + packages, "", language, code, @@ -255,6 +263,8 @@ fn create_udf_definition_script( state_fields: Option<&[UDAFStateField]>, return_type: &TypeName, runtime_version: &str, + imports: &[String], + packages: &[String], handler: &str, language: &str, code: &str, @@ -302,6 +312,8 @@ fn create_udf_definition_script( Ok(PlanUDFDefinition::UDAFScript(UDAFScript { code: code.to_string(), arg_types, + imports: imports.to_vec(), + packages: packages.to_vec(), state_fields, return_type, language: language.to_string(), @@ -312,6 +324,8 @@ fn create_udf_definition_script( code: code.to_string(), arg_types, return_type, + imports: imports.to_vec(), + packages: packages.to_vec(), handler: handler.to_string(), language: language.to_string(), runtime_version, diff --git a/src/query/sql/src/planner/plans/scalar_expr.rs b/src/query/sql/src/planner/plans/scalar_expr.rs index 0390be9715b04..288584a44c9e8 100644 --- a/src/query/sql/src/planner/plans/scalar_expr.rs +++ b/src/query/sql/src/planner/plans/scalar_expr.rs @@ -35,6 +35,7 @@ use databend_common_expression::RemoteExpr; use databend_common_expression::Scalar; use databend_common_functions::aggregates::AggregateFunctionSortDesc; use databend_common_functions::BUILTIN_FUNCTIONS; +use databend_common_meta_app::principal::StageInfo; use databend_common_meta_app::schema::GetSequenceNextValueReq; use databend_common_meta_app::schema::SequenceIdent; use databend_common_meta_app::tenant::Tenant; @@ -1101,10 +1102,15 @@ impl Display for UDFLanguage { } } -#[derive(Clone, Debug, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, Educe, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +#[educe(Hash(bound = false))] pub struct UDFScriptCode { pub language: UDFLanguage, pub runtime_version: String, + #[educe(Hash(ignore))] + pub imports_stage_info: Vec<(StageInfo, String)>, + pub imports: Vec, + pub packages: Vec, pub code: Arc>, } diff --git a/src/query/sql/src/planner/semantic/type_check.rs b/src/query/sql/src/planner/semantic/type_check.rs index 6ef6c5274a5e9..4bc8dbe8bfb62 100644 --- a/src/query/sql/src/planner/semantic/type_check.rs +++ b/src/query/sql/src/planner/semantic/type_check.rs @@ -125,6 +125,7 @@ use super::name_resolution::NameResolutionContext; use super::normalize_identifier; use crate::binder::bind_values; use crate::binder::resolve_file_location; +use crate::binder::resolve_stage_locations; use crate::binder::wrap_cast; use crate::binder::Binder; use crate::binder::ExprContext; @@ -4608,6 +4609,8 @@ impl<'a> TypeChecker<'a> { arg_types, return_type, runtime_version, + imports, + packages, } = udf_definition; let language = language.parse()?; @@ -4624,10 +4627,22 @@ impl<'a> TypeChecker<'a> { let code_blob = databend_common_base::runtime::block_on(self.resolve_udf_with_stage(code))? .into_boxed_slice(); + + let imports_stage_info = databend_common_base::runtime::block_on(resolve_stage_locations( + self.ctx.as_ref(), + &imports + .iter() + .map(|s| s.trim_start_matches('@').to_string()) + .collect::>(), + ))?; + let udf_type = UDFType::Script(UDFScriptCode { language, runtime_version, code: code_blob.into(), + imports_stage_info, + imports, + packages, }); let arg_names = args.iter().map(|arg| format!("{arg}")).join(", "); @@ -4666,14 +4681,27 @@ impl<'a> TypeChecker<'a> { state_fields, return_type, runtime_version, + imports, + packages, } = udf_definition; let language = language.parse()?; let code_blob = databend_common_base::runtime::block_on(self.resolve_udf_with_stage(code))? .into_boxed_slice(); + let imports_stage_info = databend_common_base::runtime::block_on(resolve_stage_locations( + self.ctx.as_ref(), + &imports + .iter() + .map(|s| s.trim_start_matches('@').to_string()) + .collect::>(), + ))?; + let udf_type = UDFType::Script(UDFScriptCode { language, runtime_version, code: code_blob.into(), + imports, + imports_stage_info, + packages, }); let arguments = args From 1b53c428399052954242c26f522443531fcf93fe Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Fri, 20 Jun 2025 12:52:16 +0000 Subject: [PATCH 6/9] update --- .../proto-conv/tests/it/v081_udf_script.rs | 2 + .../tests/it/v115_add_udaf_script.rs | 2 + .../tests/it/v130_udf_imports_packages.rs | 269 ++---------------- src/query/expression/src/schema.rs | 5 - .../transforms/transform_udf_script.rs | 48 ++-- .../physical_plans/physical_exchange.rs | 1 - src/query/sql/src/planner/plans/aggregate.rs | 39 +-- .../sql/src/planner/plans/eval_scalar.rs | 25 ++ 8 files changed, 109 insertions(+), 282 deletions(-) diff --git a/src/meta/proto-conv/tests/it/v081_udf_script.rs b/src/meta/proto-conv/tests/it/v081_udf_script.rs index a275b15750763..e52a072f366eb 100644 --- a/src/meta/proto-conv/tests/it/v081_udf_script.rs +++ b/src/meta/proto-conv/tests/it/v081_udf_script.rs @@ -116,6 +116,8 @@ fn test_decode_udf_script() -> anyhow::Result<()> { language: "python".to_string(), arg_types: vec![DataType::Number(NumberDataType::Int32)], return_type: DataType::Number(NumberDataType::Float32), + imports: vec![], + packages: vec![], runtime_version: "3.12.2".to_string(), }), created_on: DateTime::::default(), diff --git a/src/meta/proto-conv/tests/it/v115_add_udaf_script.rs b/src/meta/proto-conv/tests/it/v115_add_udaf_script.rs index c6b49ee0f70cd..32100f6b0f135 100644 --- a/src/meta/proto-conv/tests/it/v115_add_udaf_script.rs +++ b/src/meta/proto-conv/tests/it/v115_add_udaf_script.rs @@ -61,6 +61,8 @@ fn test_decode_v115_add_udaf_script() -> anyhow::Result<()> { )], return_type: DataType::Number(NumberDataType::Float32), runtime_version: "".to_string(), + imports: vec![], + packages: vec![], }), created_on: DateTime::::default(), }; diff --git a/src/meta/proto-conv/tests/it/v130_udf_imports_packages.rs b/src/meta/proto-conv/tests/it/v130_udf_imports_packages.rs index cc5abd246eec6..f0b8b40497efa 100644 --- a/src/meta/proto-conv/tests/it/v130_udf_imports_packages.rs +++ b/src/meta/proto-conv/tests/it/v130_udf_imports_packages.rs @@ -12,27 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - -use chrono::TimeZone; +use chrono::DateTime; use chrono::Utc; -use databend_common_expression::types::decimal::DecimalSize; -use databend_common_expression::types::DecimalDataType; +use databend_common_expression::types::DataType; use databend_common_expression::types::NumberDataType; -use databend_common_expression::types::VectorDataType; -use databend_common_expression::TableDataType; -use databend_common_expression::TableField; -use databend_common_expression::TableSchema; -use databend_common_expression::VariantDataType; -use databend_common_expression::VirtualDataField; -use databend_common_expression::VirtualDataSchema; -use databend_common_expression::VIRTUAL_COLUMN_ID_START; -use databend_common_meta_app::schema as mt; -use databend_common_meta_app::schema::TableIndex; -use databend_common_meta_app::schema::TableIndexType; +use databend_common_meta_app::principal::UDFDefinition; +use databend_common_meta_app::principal::UDFScript; +use databend_common_meta_app::principal::UserDefinedFunction; use fastrace::func_name; -use maplit::btreemap; -use maplit::btreeset; use crate::common; @@ -47,229 +34,35 @@ use crate::common; // // The message bytes are built from the output of `test_pb_from_to()` #[test] -fn test_decode_v129_schema() -> anyhow::Result<()> { - let table_schema_v129 = vec![ - 10, 31, 10, 1, 97, 26, 19, 154, 2, 9, 34, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, - 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 10, 113, 10, 1, 98, 26, 99, 202, 2, 89, 10, 2, 98, - 49, 10, 2, 98, 50, 18, 51, 202, 2, 41, 10, 3, 98, 49, 49, 10, 3, 98, 49, 50, 18, 10, 138, - 2, 0, 160, 6, 129, 1, 168, 6, 24, 18, 10, 146, 2, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, - 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 18, 19, 154, 2, 9, 66, 0, 160, 6, 129, 1, - 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, - 24, 32, 1, 160, 6, 129, 1, 168, 6, 24, 10, 33, 10, 1, 99, 26, 19, 154, 2, 9, 34, 0, 160, 6, - 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 4, 160, 6, 129, 1, 168, 6, 24, 10, 53, - 10, 10, 100, 101, 99, 105, 109, 97, 108, 49, 50, 56, 26, 30, 218, 2, 20, 10, 11, 8, 18, 16, - 3, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, - 5, 160, 6, 129, 1, 168, 6, 24, 10, 53, 10, 10, 100, 101, 99, 105, 109, 97, 108, 50, 53, 54, - 26, 30, 218, 2, 20, 18, 11, 8, 46, 16, 6, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, - 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 6, 160, 6, 129, 1, 168, 6, 24, 10, 32, 10, 9, 101, - 109, 112, 116, 121, 95, 109, 97, 112, 26, 10, 226, 2, 0, 160, 6, 129, 1, 168, 6, 24, 32, 7, - 160, 6, 129, 1, 168, 6, 24, 10, 29, 10, 6, 98, 105, 116, 109, 97, 112, 26, 10, 234, 2, 0, - 160, 6, 129, 1, 168, 6, 24, 32, 8, 160, 6, 129, 1, 168, 6, 24, 10, 27, 10, 4, 103, 101, - 111, 109, 26, 10, 250, 2, 0, 160, 6, 129, 1, 168, 6, 24, 32, 9, 160, 6, 129, 1, 168, 6, 24, - 10, 31, 10, 8, 105, 110, 116, 101, 114, 118, 97, 108, 26, 10, 138, 3, 0, 160, 6, 129, 1, - 168, 6, 24, 32, 10, 160, 6, 129, 1, 168, 6, 24, 10, 50, 10, 6, 118, 101, 99, 116, 111, 114, - 26, 31, 146, 3, 21, 10, 9, 74, 0, 160, 6, 129, 1, 168, 6, 24, 16, 128, 2, 160, 6, 129, 1, - 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 11, 160, 6, 129, 1, 168, 6, 24, 24, 12, 160, 6, - 129, 1, 168, 6, 24, - ]; - - let b1 = TableDataType::Tuple { - fields_name: vec!["b11".to_string(), "b12".to_string()], - fields_type: vec![TableDataType::Boolean, TableDataType::String], - }; - let b = TableDataType::Tuple { - fields_name: vec!["b1".to_string(), "b2".to_string()], - fields_type: vec![b1, TableDataType::Number(NumberDataType::Int64)], - }; - let fields = vec![ - TableField::new("a", TableDataType::Number(NumberDataType::UInt64)), - TableField::new("b", b), - TableField::new("c", TableDataType::Number(NumberDataType::UInt64)), - TableField::new( - "decimal128", - TableDataType::Decimal(DecimalDataType::Decimal128(DecimalSize::new_unchecked( - 18, 3, - ))), - ), - TableField::new( - "decimal256", - TableDataType::Decimal(DecimalDataType::Decimal256(DecimalSize::new_unchecked( - 46, 6, - ))), - ), - TableField::new("empty_map", TableDataType::EmptyMap), - TableField::new("bitmap", TableDataType::Bitmap), - TableField::new("geom", TableDataType::Geometry), - TableField::new("interval", TableDataType::Interval), - TableField::new( - "vector", - TableDataType::Vector(VectorDataType::Float32(256)), - ), - ]; - let want = || TableSchema::new(fields.clone()); - common::test_pb_from_to(func_name!(), want())?; - // common::test_pb_from_to2(func_name!(), want())?; - common::test_load_old(func_name!(), table_schema_v129.as_slice(), 129, want())?; - Ok(()) -} - -#[test] -fn test_decode_v129_table_meta() -> anyhow::Result<()> { - let table_meta_v129 = vec![ - 10, 224, 7, 10, 55, 10, 8, 110, 117, 108, 108, 97, 98, 108, 101, 18, 5, 97, 32, 43, 32, 51, - 26, 29, 178, 2, 19, 154, 2, 9, 42, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, - 24, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 10, 27, 10, 4, 98, 111, 111, - 108, 26, 10, 138, 2, 0, 160, 6, 129, 1, 168, 6, 24, 32, 1, 160, 6, 129, 1, 168, 6, 24, 10, - 36, 10, 4, 105, 110, 116, 56, 26, 19, 154, 2, 9, 42, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, - 129, 1, 168, 6, 24, 32, 2, 160, 6, 129, 1, 168, 6, 24, 10, 37, 10, 5, 105, 110, 116, 49, - 54, 26, 19, 154, 2, 9, 50, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, - 3, 160, 6, 129, 1, 168, 6, 24, 10, 37, 10, 5, 105, 110, 116, 51, 50, 26, 19, 154, 2, 9, 58, - 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 4, 160, 6, 129, 1, 168, 6, - 24, 10, 37, 10, 5, 105, 110, 116, 54, 52, 26, 19, 154, 2, 9, 66, 0, 160, 6, 129, 1, 168, 6, - 24, 160, 6, 129, 1, 168, 6, 24, 32, 5, 160, 6, 129, 1, 168, 6, 24, 10, 37, 10, 5, 117, 105, - 110, 116, 56, 26, 19, 154, 2, 9, 10, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, - 24, 32, 6, 160, 6, 129, 1, 168, 6, 24, 10, 38, 10, 6, 117, 105, 110, 116, 49, 54, 26, 19, - 154, 2, 9, 18, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 7, 160, 6, - 129, 1, 168, 6, 24, 10, 38, 10, 6, 117, 105, 110, 116, 51, 50, 26, 19, 154, 2, 9, 26, 0, - 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 8, 160, 6, 129, 1, 168, 6, 24, - 10, 38, 10, 6, 117, 105, 110, 116, 54, 52, 26, 19, 154, 2, 9, 34, 0, 160, 6, 129, 1, 168, - 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 9, 160, 6, 129, 1, 168, 6, 24, 10, 39, 10, 7, 102, - 108, 111, 97, 116, 51, 50, 26, 19, 154, 2, 9, 74, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, - 129, 1, 168, 6, 24, 32, 10, 160, 6, 129, 1, 168, 6, 24, 10, 39, 10, 7, 102, 108, 111, 97, - 116, 54, 52, 26, 19, 154, 2, 9, 82, 0, 160, 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, - 24, 32, 11, 160, 6, 129, 1, 168, 6, 24, 10, 27, 10, 4, 100, 97, 116, 101, 26, 10, 170, 2, - 0, 160, 6, 129, 1, 168, 6, 24, 32, 12, 160, 6, 129, 1, 168, 6, 24, 10, 32, 10, 9, 116, 105, - 109, 101, 115, 116, 97, 109, 112, 26, 10, 162, 2, 0, 160, 6, 129, 1, 168, 6, 24, 32, 13, - 160, 6, 129, 1, 168, 6, 24, 10, 29, 10, 6, 115, 116, 114, 105, 110, 103, 26, 10, 146, 2, 0, - 160, 6, 129, 1, 168, 6, 24, 32, 14, 160, 6, 129, 1, 168, 6, 24, 10, 70, 10, 6, 115, 116, - 114, 117, 99, 116, 26, 51, 202, 2, 41, 10, 3, 102, 111, 111, 10, 3, 98, 97, 114, 18, 10, - 138, 2, 0, 160, 6, 129, 1, 168, 6, 24, 18, 10, 146, 2, 0, 160, 6, 129, 1, 168, 6, 24, 160, - 6, 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, 32, 15, 160, 6, 129, 1, 168, 6, 24, 10, - 38, 10, 5, 97, 114, 114, 97, 121, 26, 20, 186, 2, 10, 138, 2, 0, 160, 6, 129, 1, 168, 6, - 24, 160, 6, 129, 1, 168, 6, 24, 32, 17, 160, 6, 129, 1, 168, 6, 24, 10, 30, 10, 7, 118, 97, - 114, 105, 97, 110, 116, 26, 10, 210, 2, 0, 160, 6, 129, 1, 168, 6, 24, 32, 18, 160, 6, 129, - 1, 168, 6, 24, 10, 36, 10, 13, 118, 97, 114, 105, 97, 110, 116, 95, 97, 114, 114, 97, 121, - 26, 10, 210, 2, 0, 160, 6, 129, 1, 168, 6, 24, 32, 19, 160, 6, 129, 1, 168, 6, 24, 10, 37, - 10, 14, 118, 97, 114, 105, 97, 110, 116, 95, 111, 98, 106, 101, 99, 116, 26, 10, 210, 2, 0, - 160, 6, 129, 1, 168, 6, 24, 32, 20, 160, 6, 129, 1, 168, 6, 24, 10, 31, 10, 8, 105, 110, - 116, 101, 114, 118, 97, 108, 26, 10, 250, 1, 0, 160, 6, 129, 1, 168, 6, 24, 32, 21, 160, 6, - 129, 1, 168, 6, 24, 10, 29, 10, 6, 98, 105, 116, 109, 97, 112, 26, 10, 234, 2, 0, 160, 6, - 129, 1, 168, 6, 24, 32, 22, 160, 6, 129, 1, 168, 6, 24, 10, 27, 10, 4, 103, 101, 111, 109, - 26, 10, 250, 2, 0, 160, 6, 129, 1, 168, 6, 24, 32, 23, 160, 6, 129, 1, 168, 6, 24, 10, 31, - 10, 8, 105, 110, 116, 101, 114, 118, 97, 108, 26, 10, 138, 3, 0, 160, 6, 129, 1, 168, 6, - 24, 32, 24, 160, 6, 129, 1, 168, 6, 24, 10, 50, 10, 6, 118, 101, 99, 116, 111, 114, 26, 31, - 146, 3, 21, 10, 9, 74, 0, 160, 6, 129, 1, 168, 6, 24, 16, 128, 2, 160, 6, 129, 1, 168, 6, - 24, 160, 6, 129, 1, 168, 6, 24, 32, 25, 160, 6, 129, 1, 168, 6, 24, 18, 6, 10, 1, 97, 18, - 1, 98, 24, 26, 160, 6, 129, 1, 168, 6, 24, 42, 10, 10, 3, 120, 121, 122, 18, 3, 102, 111, - 111, 50, 2, 52, 52, 58, 10, 10, 3, 97, 98, 99, 18, 3, 100, 101, 102, 64, 0, 74, 10, 40, 97, - 32, 43, 32, 50, 44, 32, 98, 41, 162, 1, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, 56, 32, 49, - 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 170, 1, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, - 57, 32, 49, 50, 58, 48, 48, 58, 49, 48, 32, 85, 84, 67, 178, 1, 13, 116, 97, 98, 108, 101, - 95, 99, 111, 109, 109, 101, 110, 116, 186, 1, 7, 160, 6, 129, 1, 168, 6, 24, 202, 1, 1, 99, - 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, - 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, - 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, 202, 1, 1, 99, - 202, 1, 1, 99, 202, 1, 1, 99, 226, 1, 1, 1, 234, 1, 6, 10, 1, 97, 18, 1, 98, 250, 1, 78, - 10, 1, 118, 18, 73, 10, 1, 118, 18, 1, 1, 24, 1, 34, 40, 55, 52, 101, 99, 55, 100, 51, 51, - 50, 54, 56, 48, 102, 57, 101, 54, 48, 50, 51, 52, 99, 48, 55, 102, 53, 100, 101, 102, 56, - 100, 48, 97, 53, 50, 98, 48, 102, 98, 53, 53, 42, 12, 10, 4, 116, 121, 112, 101, 18, 4, - 104, 110, 115, 119, 48, 2, 160, 6, 129, 1, 168, 6, 24, 130, 2, 56, 10, 31, 10, 7, 102, 105, - 101, 108, 100, 95, 48, 18, 2, 10, 0, 18, 2, 50, 0, 18, 4, 58, 2, 10, 0, 24, 19, 32, 128, - 188, 193, 150, 11, 18, 6, 10, 1, 97, 18, 1, 98, 24, 129, 188, 193, 150, 11, 32, 10, 160, 6, - 129, 1, 168, 6, 24, 160, 6, 129, 1, 168, 6, 24, +fn test_decode_v130_udf_script() -> anyhow::Result<()> { + let bytes = vec![ + 10, 5, 109, 121, 95, 102, 110, 18, 21, 84, 104, 105, 115, 32, 105, 115, 32, 97, 32, 100, + 101, 115, 99, 114, 105, 112, 116, 105, 111, 110, 50, 119, 10, 9, 115, 111, 109, 101, 32, + 99, 111, 100, 101, 18, 5, 109, 121, 95, 102, 110, 26, 6, 112, 121, 116, 104, 111, 110, 34, + 19, 154, 2, 9, 58, 0, 160, 6, 130, 1, 168, 6, 24, 160, 6, 130, 1, 168, 6, 24, 42, 19, 154, + 2, 9, 74, 0, 160, 6, 130, 1, 168, 6, 24, 160, 6, 130, 1, 168, 6, 24, 50, 6, 51, 46, 49, 50, + 46, 50, 58, 9, 64, 115, 49, 47, 97, 46, 122, 105, 112, 58, 8, 64, 115, 50, 47, 98, 46, 112, + 121, 66, 5, 110, 117, 109, 112, 121, 66, 6, 112, 97, 110, 100, 97, 115, 160, 6, 130, 1, + 168, 6, 24, 42, 23, 49, 57, 55, 48, 45, 48, 49, 45, 48, 49, 32, 48, 48, 58, 48, 48, 58, 48, + 48, 32, 85, 84, 67, 160, 6, 130, 1, 168, 6, 24, ]; - let want = || mt::TableMeta { - schema: Arc::new(TableSchema::new_from( - vec![ - TableField::new( - "nullable", - TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::Int8))), - ) - .with_default_expr(Some("a + 3".to_string())), - TableField::new("bool", TableDataType::Boolean), - TableField::new("int8", TableDataType::Number(NumberDataType::Int8)), - TableField::new("int16", TableDataType::Number(NumberDataType::Int16)), - TableField::new("int32", TableDataType::Number(NumberDataType::Int32)), - TableField::new("int64", TableDataType::Number(NumberDataType::Int64)), - TableField::new("uint8", TableDataType::Number(NumberDataType::UInt8)), - TableField::new("uint16", TableDataType::Number(NumberDataType::UInt16)), - TableField::new("uint32", TableDataType::Number(NumberDataType::UInt32)), - TableField::new("uint64", TableDataType::Number(NumberDataType::UInt64)), - TableField::new("float32", TableDataType::Number(NumberDataType::Float32)), - TableField::new("float64", TableDataType::Number(NumberDataType::Float64)), - TableField::new("date", TableDataType::Date), - TableField::new("timestamp", TableDataType::Timestamp), - TableField::new("string", TableDataType::String), - TableField::new("struct", TableDataType::Tuple { - fields_name: vec![s("foo"), s("bar")], - fields_type: vec![TableDataType::Boolean, TableDataType::String], - }), - TableField::new( - "array", - TableDataType::Array(Box::new(TableDataType::Boolean)), - ), - TableField::new("variant", TableDataType::Variant), - TableField::new("variant_array", TableDataType::Variant), - TableField::new("variant_object", TableDataType::Variant), - // NOTE: It is safe to convert Interval to NULL, because `Interval` is never really used. - TableField::new("interval", TableDataType::Null), - TableField::new("bitmap", TableDataType::Bitmap), - TableField::new("geom", TableDataType::Geometry), - TableField::new("interval", TableDataType::Interval), - TableField::new( - "vector", - TableDataType::Vector(VectorDataType::Float32(256)), - ), - ], - btreemap! {s("a") => s("b")}, - )), - engine: "44".to_string(), - storage_params: None, - part_prefix: "".to_string(), - engine_options: btreemap! {s("abc") => s("def")}, - options: btreemap! {s("xyz") => s("foo")}, - cluster_key: Some("(a + 2, b)".to_string()), - cluster_key_seq: 0, - created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), - updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 10).unwrap(), - comment: s("table_comment"), - field_comments: vec!["c".to_string(); 21], - virtual_schema: Some(VirtualDataSchema { - fields: vec![VirtualDataField { - name: "field_0".to_string(), - data_types: vec![ - VariantDataType::Jsonb, - VariantDataType::String, - VariantDataType::Array(Box::new(VariantDataType::Jsonb)), - ], - source_column_id: 19, - column_id: VIRTUAL_COLUMN_ID_START, - }], - metadata: btreemap! {s("a") => s("b")}, - next_column_id: VIRTUAL_COLUMN_ID_START + 1, - number_of_blocks: 10, + let want = || UserDefinedFunction { + name: "my_fn".to_string(), + description: "This is a description".to_string(), + definition: UDFDefinition::UDFScript(UDFScript { + code: "some code".to_string(), + handler: "my_fn".to_string(), + language: "python".to_string(), + arg_types: vec![DataType::Number(NumberDataType::Int32)], + return_type: DataType::Number(NumberDataType::Float32), + imports: vec!["@s1/a.zip".to_string(), "@s2/b.py".to_string()], + packages: vec!["numpy".to_string(), "pandas".to_string()], + runtime_version: "3.12.2".to_string(), }), - drop_on: None, - statistics: Default::default(), - shared_by: btreeset! {1}, - column_mask_policy: Some(btreemap! {s("a") => s("b")}), - indexes: btreemap! {s("v") => TableIndex { - index_type: TableIndexType::Vector, - name: "v".to_string(), - column_ids: vec![1], - sync_creation: true, - version: "74ec7d332680f9e60234c07f5def8d0a52b0fb55".to_string(), - options: btreemap! {s("type") => s("hnsw")}, - }}, + created_on: DateTime::::default(), }; - common::test_pb_from_to(func_name!(), want())?; - // common::test_pb_from_to2(func_name!(), want())?; - common::test_load_old(func_name!(), table_meta_v129.as_slice(), 129, want())?; - Ok(()) -} - -fn s(ss: impl ToString) -> String { - ss.to_string() + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old(func_name!(), bytes.as_slice(), 130, want()) } diff --git a/src/query/expression/src/schema.rs b/src/query/expression/src/schema.rs index b9cac4f93560c..34b0f865c41c4 100644 --- a/src/query/expression/src/schema.rs +++ b/src/query/expression/src/schema.rs @@ -365,11 +365,6 @@ impl DataSchema { } } let valid_fields: Vec = self.fields.iter().map(|f| f.name().clone()).collect(); - panic!( - "Unable to get field named \"{}\". Valid fields: {:?}", - name, valid_fields - ); - Err(ErrorCode::BadArguments(format!( "Unable to get field named \"{}\". Valid fields: {:?}", name, valid_fields diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index a114a5b1cb733..b5fa1dbd966f1 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -354,37 +354,41 @@ impl TransformUdfScript { } else { let temp_dir = Arc::new(venv::create_venv(PY_VERSION.as_str())?); venv::install_deps(temp_dir.path(), &dependencies)?; + + if !imports_stage_info.is_empty() { + let imports_stage_info = imports_stage_info.clone(); + let temp_dir_path = temp_dir.path(); + databend_common_base::runtime::block_on(async move { + let mut fts = Vec::with_capacity(imports_stage_info.len()); + for (stage, path) in imports_stage_info.iter() { + let op = init_stage_operator(stage)?; + let name = + path.trim_end_matches('/').split('/').last().unwrap(); + let temp_file = temp_dir_path.join(name); + fts.push(async move { + let buffer = op.read(&path).await?; + databend_common_base::base::tokio::fs::write( + &temp_file, + buffer.to_bytes().as_ref(), + ) + .await + }); + } + let _ = futures::future::join_all(fts).await; + Ok::<(), ErrorCode>(()) + })?; + } + w.insert(key, venv::PyVenvCacheEntry { temp_dir: temp_dir.clone(), }); + Some(temp_dir) } } else { None }; - if !imports_stage_info.is_empty() { - let imports_stage_info = imports_stage_info.clone(); - let temp_dir_path = temp_dir.as_ref().unwrap().path(); - databend_common_base::runtime::block_on(async move { - let mut fts = Vec::with_capacity(imports_stage_info.len()); - for (stage, path) in imports_stage_info.iter() { - let op = init_stage_operator(stage)?; - let name = path.trim_end_matches('/').split('/').last().unwrap(); - let temp_file = temp_dir_path.join(name); - fts.push(async move { - let buffer = op.read(&path).await?; - databend_common_base::base::tokio::fs::write( - &temp_file, - buffer.to_bytes().as_ref(), - ) - .await - }); - } - let _ = futures::future::join_all(fts).await; - Ok::<(), ErrorCode>(()) - })?; - } temp_dir } _ => None, diff --git a/src/query/sql/src/executor/physical_plans/physical_exchange.rs b/src/query/sql/src/executor/physical_plans/physical_exchange.rs index df3ab03c1a008..1e831519c415b 100644 --- a/src/query/sql/src/executor/physical_plans/physical_exchange.rs +++ b/src/query/sql/src/executor/physical_plans/physical_exchange.rs @@ -64,7 +64,6 @@ impl PhysicalPlanBuilder { let mut allow_adjust_parallelism = true; let kind = match exchange { crate::plans::Exchange::Hash(scalars) => { - println!("exchange hash, scalars: {:?}", scalars); for scalar in scalars { let expr = scalar .type_check(input_schema.as_ref())? diff --git a/src/query/sql/src/planner/plans/aggregate.rs b/src/query/sql/src/planner/plans/aggregate.rs index e1c86ff0087c7..fe3611a29f18f 100644 --- a/src/query/sql/src/planner/plans/aggregate.rs +++ b/src/query/sql/src/planner/plans/aggregate.rs @@ -32,6 +32,7 @@ use crate::plans::RelOp; use crate::plans::ScalarItem; use crate::ColumnSet; use crate::IndexType; +use crate::ScalarExpr; #[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)] pub enum AggregateMode { @@ -83,6 +84,20 @@ impl Default for Aggregate { } impl Aggregate { + pub fn get_distribution_keys(&self, before_partial: bool) -> Result> { + if before_partial { + self.group_items + .iter() + .enumerate() + .map(|(index, item)| item.bound_column_expr(format!("_group_item_{}", index))) + .collect() + } else { + Ok(vec![ + self.group_items[0].bound_column_expr("_group_item_0".to_string())? + ]) + } + } + pub fn used_columns(&self) -> Result { let mut used_columns = ColumnSet::new(); for group_item in self.group_items.iter() { @@ -226,14 +241,11 @@ impl Operator for Aggregate { // Group aggregation, enforce `Hash` distribution required.distribution = match settings.get_group_by_shuffle_mode()?.as_str() { - "before_partial" => Ok(Distribution::Hash( - self.group_items - .iter() - .map(|item| item.scalar.clone()) - .collect(), - )), + "before_partial" => { + Ok(Distribution::Hash(self.get_distribution_keys(true)?)) + } "before_merge" => { - Ok(Distribution::Hash(vec![self.group_items[0].scalar.clone()])) + Ok(Distribution::Hash(self.get_distribution_keys(false)?)) } value => Err(ErrorCode::Internal(format!( "Bad settings value group_by_shuffle_mode = {:?}", @@ -324,19 +336,14 @@ impl Operator for Aggregate { match settings.get_group_by_shuffle_mode()?.as_str() { "before_partial" => { children_required.push(vec![RequiredProperty { - distribution: Distribution::Hash( - self.group_items - .iter() - .map(|item| item.scalar.clone()) - .collect(), - ), + distribution: Distribution::Hash(self.get_distribution_keys(true)?), }]); } "before_merge" => { children_required.push(vec![RequiredProperty { - distribution: Distribution::Hash(vec![self.group_items[0] - .scalar - .clone()]), + distribution: Distribution::Hash( + self.get_distribution_keys(false)?, + ), }]); } value => { diff --git a/src/query/sql/src/planner/plans/eval_scalar.rs b/src/query/sql/src/planner/plans/eval_scalar.rs index 06e394360179e..a9b3d44cb8caa 100644 --- a/src/query/sql/src/planner/plans/eval_scalar.rs +++ b/src/query/sql/src/planner/plans/eval_scalar.rs @@ -19,11 +19,15 @@ use databend_common_exception::Result; use crate::optimizer::ir::RelExpr; use crate::optimizer::ir::RelationalProperty; use crate::optimizer::ir::StatInfo; +use crate::plans::BoundColumnRef; use crate::plans::Operator; use crate::plans::RelOp; use crate::plans::ScalarExpr; +use crate::ColumnBinding; +use crate::ColumnBindingBuilder; use crate::ColumnSet; use crate::IndexType; +use crate::Visibility; /// Evaluate scalar expression #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -38,6 +42,27 @@ pub struct ScalarItem { pub index: IndexType, } +impl ScalarItem { + pub fn column_binding(&self, name: String) -> Result { + Ok(ColumnBindingBuilder::new( + name, + self.index, + Box::new(self.scalar.data_type()?), + Visibility::Visible, + ) + .build()) + } + + pub fn bound_column_expr(&self, name: String) -> Result { + let column_binding = self.column_binding(name)?; + Ok(BoundColumnRef { + span: None, + column: column_binding, + } + .into()) + } +} + impl EvalScalar { pub fn used_columns(&self) -> Result { let mut used_columns = ColumnSet::new(); From a9763d2969530069c622fe02836f1b5783747025 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Fri, 20 Jun 2025 14:13:16 +0000 Subject: [PATCH 7/9] add safe_codes in builder --- Cargo.lock | 2 +- Cargo.toml | 2 +- .../transforms/transform_udf_script.rs | 51 +++++++++++++++++-- .../sql/src/planner/plans/scalar_expr.rs | 2 +- .../sql/src/planner/semantic/type_check.rs | 8 +-- 5 files changed, 54 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d6a692df6c66d..f8ab743f75563 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -561,7 +561,7 @@ dependencies = [ [[package]] name = "arrow-udf-runtime" version = "0.8.0" -source = "git+https://github.com/datafuse-extras/arrow-udf.git?rev=92eeb3b#92eeb3b8ecf10a894b8bd861a8c118215426fa93" +source = "git+https://github.com/datafuse-extras/arrow-udf.git?rev=a442343#a44234332e9c182c247a510c3721b655572f323c" dependencies = [ "anyhow", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index ef0fd8939bf85..5ab2d02b9e640 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -642,7 +642,7 @@ overflow-checks = true rpath = false [patch.crates-io] -arrow-udf-runtime = { git = "https://github.com/datafuse-extras/arrow-udf.git", rev = "92eeb3b" } +arrow-udf-runtime = { git = "https://github.com/datafuse-extras/arrow-udf.git", rev = "a442343" } async-backtrace = { git = "https://github.com/datafuse-extras/async-backtrace.git", rev = "dea4553" } async-recursion = { git = "https://github.com/datafuse-extras/async-recursion.git", rev = "a353334" } backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "72265be" } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index b5fa1dbd966f1..ee56955780fb1 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -58,7 +58,7 @@ static PY_VERSION: LazyLock = impl ScriptRuntime { pub fn try_create(func: &UdfFunctionDesc, _temp_dir: Option>) -> Result { - let UDFType::Script(UDFScriptCode { language, code, .. }) = &func.udf_type else { + let UDFType::Script(box UDFScriptCode { language, code, .. }) = &func.udf_type else { unreachable!() }; match language { @@ -250,12 +250,55 @@ pub struct PyRuntimeBuilder { mod python_pool { use super::*; + const RESTRICTED_PYTHON_CODE: &str = r#" +import os +import sys +from pathlib import Path + +ALLOWED_BASE = Path("/tmp") + +_original_open = open +_original_os_open = os.open if hasattr(os, 'open') else None + +def safe_open(file, mode='r', **kwargs): + file_path = Path(file).resolve() + + try: + file_path.relative_to(ALLOWED_BASE) + except ValueError: + raise PermissionError(f"Access denied: {file} is outside allowed directory") + + return _original_open(file, mode, **kwargs) + +def safe_os_open(path, flags, mode=0o777): + file_path = Path(path).resolve() + try: + file_path.relative_to(ALLOWED_BASE) + except ValueError: + raise PermissionError(f"Access denied: {path} is outside allowed directory") + return _original_os_open(path, flags, mode) + +import builtins, sys +if "DATABEND_RESTRICTED_PYTHON" not in sys._xoptions: + builtins.open = safe_open + if _original_os_open: + os.open = safe_os_open + + dangerous_modules = ['subprocess', 'os.system', 'eval', 'exec', 'compile'] + for module in dangerous_modules: + if module in sys.modules: + del sys.modules[module] + sys._xoptions['DATABEND_RESTRICTED_PYTHON'] = '1' +"#; + impl RuntimeBuilder for PyRuntimeBuilder { type Error = ErrorCode; fn build(&self) -> Result { let start = std::time::Instant::now(); - let mut runtime = arrow_udf_runtime::python::Builder::default().build()?; + let mut runtime = arrow_udf_runtime::python::Builder::default() + .safe_codes(RESTRICTED_PYTHON_CODE.to_string()) + .build()?; runtime.add_function_with_handler( &self.name, arrow_field_from_data_type(&self.name, self.output_type.clone()), @@ -326,14 +369,14 @@ impl TransformUdfScript { let mut script_runtimes = BTreeMap::new(); for func in funcs { let (code, code_str) = match &func.udf_type { - UDFType::Script(script_code) => { + UDFType::Script(box script_code) => { (script_code, String::from_utf8(script_code.code.to_vec())?) } _ => continue, }; let temp_dir = match &func.udf_type { - UDFType::Script(UDFScriptCode { + UDFType::Script(box UDFScriptCode { language: UDFLanguage::Python, packages, imports_stage_info, diff --git a/src/query/sql/src/planner/plans/scalar_expr.rs b/src/query/sql/src/planner/plans/scalar_expr.rs index 288584a44c9e8..5563815cc4347 100644 --- a/src/query/sql/src/planner/plans/scalar_expr.rs +++ b/src/query/sql/src/planner/plans/scalar_expr.rs @@ -1117,7 +1117,7 @@ pub struct UDFScriptCode { #[derive(Clone, Debug, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize, EnumAsInner)] pub enum UDFType { Server(String), // server_addr - Script(UDFScriptCode), + Script(Box), } impl UDFType { diff --git a/src/query/sql/src/planner/semantic/type_check.rs b/src/query/sql/src/planner/semantic/type_check.rs index 4bc8dbe8bfb62..cc667c1914be6 100644 --- a/src/query/sql/src/planner/semantic/type_check.rs +++ b/src/query/sql/src/planner/semantic/type_check.rs @@ -4636,14 +4636,14 @@ impl<'a> TypeChecker<'a> { .collect::>(), ))?; - let udf_type = UDFType::Script(UDFScriptCode { + let udf_type = UDFType::Script(Box::new(UDFScriptCode { language, runtime_version, code: code_blob.into(), imports_stage_info, imports, packages, - }); + })); let arg_names = args.iter().map(|arg| format!("{arg}")).join(", "); let display_name = format!("{}({})", &handler, arg_names); @@ -4695,14 +4695,14 @@ impl<'a> TypeChecker<'a> { .collect::>(), ))?; - let udf_type = UDFType::Script(UDFScriptCode { + let udf_type = UDFType::Script(Box::new(UDFScriptCode { language, runtime_version, code: code_blob.into(), imports, imports_stage_info, packages, - }); + })); let arguments = args .iter() From 7da07ff6315e7f747947fdb5722e52be2eeaf1ff Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Fri, 20 Jun 2025 14:22:41 +0000 Subject: [PATCH 8/9] add safe_codes in builder --- .../processors/transforms/transform_udf_script.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs index ee56955780fb1..35dc956565b17 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_udf_script.rs @@ -405,11 +405,14 @@ impl TransformUdfScript { let mut fts = Vec::with_capacity(imports_stage_info.len()); for (stage, path) in imports_stage_info.iter() { let op = init_stage_operator(stage)?; - let name = - path.trim_end_matches('/').split('/').last().unwrap(); + let name = path + .trim_end_matches('/') + .split('/') + .next_back() + .unwrap(); let temp_file = temp_dir_path.join(name); fts.push(async move { - let buffer = op.read(&path).await?; + let buffer = op.read(path).await?; databend_common_base::base::tokio::fs::write( &temp_file, buffer.to_bytes().as_ref(), From 20f9a7c894aa8b62cc98a89e1ceb6bd821e95f09 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Sat, 21 Jun 2025 07:13:11 +0800 Subject: [PATCH 9/9] fix binder --- src/query/sql/src/planner/plans/eval_scalar.rs | 4 ++++ .../suites/base/03_common/03_0013_select_udf.test | 3 +++ tests/sqllogictests/suites/base/05_ddl/05_0036_sequence.test | 3 +++ 3 files changed, 10 insertions(+) diff --git a/src/query/sql/src/planner/plans/eval_scalar.rs b/src/query/sql/src/planner/plans/eval_scalar.rs index a9b3d44cb8caa..fc108fbbdaadc 100644 --- a/src/query/sql/src/planner/plans/eval_scalar.rs +++ b/src/query/sql/src/planner/plans/eval_scalar.rs @@ -54,6 +54,10 @@ impl ScalarItem { } pub fn bound_column_expr(&self, name: String) -> Result { + if let ScalarExpr::BoundColumnRef(_) = &self.scalar { + return Ok(self.scalar.clone()); + } + let column_binding = self.column_binding(name)?; Ok(BoundColumnRef { span: None, diff --git a/tests/sqllogictests/suites/base/03_common/03_0013_select_udf.test b/tests/sqllogictests/suites/base/03_common/03_0013_select_udf.test index a3c6a34146899..3f51e6856e566 100644 --- a/tests/sqllogictests/suites/base/03_common/03_0013_select_udf.test +++ b/tests/sqllogictests/suites/base/03_common/03_0013_select_udf.test @@ -66,6 +66,9 @@ select number, gcd(number * 3, number * 6), gcd(3, gcd(number * 3, number * 6)) 3 9 3 4 12 3 +statement ok +select gcd(number * 3, number) as c, uniq( gcd(number * 3, number * 2) ) from numbers(100) group by c; + statement ok DROP FUNCTION gcd diff --git a/tests/sqllogictests/suites/base/05_ddl/05_0036_sequence.test b/tests/sqllogictests/suites/base/05_ddl/05_0036_sequence.test index 306de0021988b..af6263b7c7b55 100644 --- a/tests/sqllogictests/suites/base/05_ddl/05_0036_sequence.test +++ b/tests/sqllogictests/suites/base/05_ddl/05_0036_sequence.test @@ -111,6 +111,9 @@ select count(*) from tmp; ---- 1000000 +statement ok +select nextval(seq) % 3 as c from numbers(1000000) group by c order by c; + statement ok DESC SEQUENCE SEQ