Skip to content

feat(query): support imports and packages in python udf scripts #18187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโ€™ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 22, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/query/service/src/pipelines/builders/builder_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_pipeline_transforms::processors::TransformPipelineHelper;
use databend_common_sql::executor::physical_plans::Udf;
Expand All @@ -25,13 +27,15 @@ impl PipelineBuilder {
self.build_pipeline(&udf.input)?;

if udf.script_udf {
let runtimes = TransformUdfScript::init_runtime(&udf.udf_funcs)?;
let (runtimes, py_temp_dir) = TransformUdfScript::init_runtime(&udf.udf_funcs)?;
let py_temp_dir = Arc::new(py_temp_dir);
self.main_pipeline.try_add_transformer(|| {
Ok(TransformUdfScript::new(
self.func_ctx.clone(),
udf.udf_funcs.clone(),
runtimes.clone(),
))
)
.with_py_temp_dir(py_temp_dir.clone()))
})
} else {
let semaphore = TransformUdfServer::init_semaphore(self.ctx.clone())?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::sync::LazyLock;

use arrow_array::RecordBatch;
use arrow_udf_runtime::javascript::FunctionOptions;
Expand All @@ -38,6 +39,7 @@ use databend_common_sql::executor::physical_plans::UdfFunctionDesc;
use databend_common_sql::plans::UDFLanguage;
use databend_common_sql::plans::UDFScriptCode;
use databend_common_sql::plans::UDFType;
use tempfile::TempDir;

use super::runtime_pool::Pool;
use super::runtime_pool::RuntimeBuilder;
Expand All @@ -49,8 +51,11 @@ pub enum ScriptRuntime {
Python(python_pool::PyRuntimePool),
}

static PY_VERSION: LazyLock<String> =
LazyLock::new(|| uv::detect_python_version().unwrap_or("3.12".to_string()));

impl ScriptRuntime {
pub fn try_create(func: &UdfFunctionDesc) -> Result<Self> {
pub fn try_create(func: &UdfFunctionDesc, _temp_dir: &Option<TempDir>) -> Result<Self> {
let UDFType::Script(UDFScriptCode { language, code, .. }) = &func.udf_type else {
unreachable!()
};
Expand Down Expand Up @@ -81,10 +86,22 @@ impl ScriptRuntime {
}
#[cfg(feature = "python-udf")]
UDFLanguage::Python => {
let code = String::from_utf8(code.to_vec())?;
let code = if let Some(temp_dir) = _temp_dir {
format!(
"import sys\nsys.path.append('{}/.venv/lib/python{}/site-packages')\n{}",
temp_dir.path().display(),
PY_VERSION.as_str(),
code
)
} else {
code
};

let builder = PyRuntimeBuilder {
name: func.name.clone(),
handler: func.func_name.clone(),
code: String::from_utf8(code.to_vec())?,
code,
output_type: func.data_type.as_ref().clone(),
counter: Default::default(),
};
Expand Down Expand Up @@ -260,6 +277,7 @@ mod python_pool {
pub struct TransformUdfScript {
funcs: Vec<UdfFunctionDesc>,
script_runtimes: BTreeMap<String, Arc<ScriptRuntime>>,
py_temp_dir: Arc<Option<TempDir>>,
}

impl TransformUdfScript {
Expand All @@ -271,8 +289,14 @@ impl TransformUdfScript {
Self {
funcs,
script_runtimes,
py_temp_dir: Arc::new(None),
}
}

pub fn with_py_temp_dir(mut self, py_temp_dir: Arc<Option<TempDir>>) -> Self {
self.py_temp_dir = py_temp_dir;
self
}
}

impl Transform for TransformUdfScript {
Expand All @@ -297,18 +321,19 @@ impl Transform for TransformUdfScript {
}
}

type RuntimeTimeRes = (BTreeMap<String, Arc<ScriptRuntime>>, Option<TempDir>);
impl TransformUdfScript {
pub fn init_runtime(funcs: &[UdfFunctionDesc]) -> Result<BTreeMap<String, Arc<ScriptRuntime>>> {
pub fn init_runtime(funcs: &[UdfFunctionDesc]) -> Result<RuntimeTimeRes> {
let mut script_runtimes: BTreeMap<String, Arc<ScriptRuntime>> = BTreeMap::new();

let temp_dir = Self::prepare_py_env(funcs)?;
for func in funcs {
let code = match &func.udf_type {
UDFType::Script(code) => code,
_ => continue,
};

if let Entry::Vacant(entry) = script_runtimes.entry(func.name.clone()) {
let runtime = ScriptRuntime::try_create(func).map_err(|err| {
let runtime = ScriptRuntime::try_create(func, &temp_dir).map_err(|err| {
ErrorCode::UDFDataError(format!(
"Failed to create UDF runtime for language {:?} with error: {err}",
code.language
Expand All @@ -318,7 +343,62 @@ impl TransformUdfScript {
};
}

Ok(script_runtimes)
Ok((script_runtimes, temp_dir))
}

// returns the injection codes for python
fn prepare_py_env(funcs: &[UdfFunctionDesc]) -> Result<Option<TempDir>> {
let mut dependencies = Vec::new();
for func in funcs {
match &func.udf_type {
UDFType::Script(UDFScriptCode {
language: UDFLanguage::Python,
code,
..
}) => {
let code = String::from_utf8(code.to_vec())?;
dependencies.extend_from_slice(&Self::extract_deps(&code));
}
_ => continue,
};
}

// use uv to install dependencies
if !dependencies.is_empty() {
dependencies.dedup();

let temp_dir = uv::create_uv_env(PY_VERSION.as_str())?;
uv::install_deps(temp_dir.path(), &dependencies)?;
return Ok(Some(temp_dir));
}

Ok(None)
}

fn extract_deps(script: &str) -> Vec<String> {
let mut ss = String::new();
let mut meta_start = false;
for line in script.lines() {
if meta_start {
if line.starts_with("# ///") {
break;
}
ss.push_str(line.trim_start_matches('#').trim());
ss.push('\n');
}
if !meta_start && line.starts_with("# /// script") {
meta_start = true;
}
}

let parsed = ss.parse::<toml::Value>().unwrap();
if let Some(deps) = parsed["dependencies"].as_array() {
deps.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
} else {
Vec::new()
}
}

fn prepare_block_entries(
Expand Down Expand Up @@ -431,3 +511,71 @@ impl TransformUdfScript {
Ok(())
}
}

mod uv {
use std::path::Path;
use std::process::Command;

use tempfile::TempDir;

pub fn install_deps(temp_dir_path: &Path, deps: &[String]) -> Result<(), String> {
let status = Command::new("uv")
.current_dir(temp_dir_path.join(".venv"))
.args(["pip", "install"])
.args(deps)
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.map_err(|e| format!("Failed to install dependencies: {}", e))?;

log::info!("Dependency installation success {}", deps.join(", "));

if status.success() {
Ok(())
} else {
Err("Dependency installation failed".into())
}
}

pub fn create_uv_env(python_version: &str) -> Result<TempDir, String> {
let temp_dir =
tempfile::tempdir().map_err(|e| format!("Failed to create temp dir: {}", e))?;
let env_path = temp_dir.path().join(".venv");

Command::new("uv")
.args([
"venv",
"--python",
python_version,
env_path.to_str().unwrap(),
])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.status()
.map_err(|e| format!("Failed to create UV env: {}", e))?;

Ok(temp_dir)
}

pub fn detect_python_version() -> Result<String, String> {
let output = Command::new("python")
.arg("--version")
.output()
.map_err(|e| format!("Failed to detect python version: {}", e))?;

if output.status.success() {
let version = String::from_utf8_lossy(&output.stdout);
let version = version
.trim()
.to_string()
.replace("Python ", "")
.split('.')
.take(2)
.collect::<Vec<_>>()
.join(".");
Ok(version)
} else {
Err("Failed to detect python version".into())
}
}
}
12 changes: 12 additions & 0 deletions tests/sqllogictests/suites/udf_native/03_0001_udf_py.test
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
## enable it when compiled with ee feature
## statement ok
## CREATE OR REPLACE FUNCTION gcd_py (INT, INT) RETURNS BIGINT LANGUAGE python HANDLER = 'gcd' AS $$
## # /// script
## # requires-python = ">=3.12"
## # dependencies = ["numpy", "pandas"]
## # ///
## import numpy as np
## import pandas as pd
##
## def gcd(a: int, b: int) -> int:
## x = int(pd.DataFrame(np.random.rand(3, 3)).sum().sum())
## a += x
## b -= x
## a -= x
## b += x
## while b:
## a, b = b, a % b
## return a
Expand Down
Loading