Skip to content

to_timestamp_* scalar functions family #1259

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jul 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

80 changes: 77 additions & 3 deletions crates/core-executor/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ use datafusion_expr::logical_plan::dml::{DmlStatement, InsertOp, WriteOp};
use datafusion_expr::planner::ContextProvider;
use datafusion_expr::{
BinaryExpr, CreateMemoryTable, DdlStatement, Expr as DFExpr, Extension, JoinType,
LogicalPlanBuilder, Operator, Projection, SubqueryAlias, TryCast, and, build_join_schema,
is_null, lit, or, when,
LogicalPlanBuilder, Operator, Projection, ScalarUDF, SubqueryAlias, TryCast, and,
build_join_schema, is_null, lit, or, when,
};
use datafusion_iceberg::DataFusionTable;
use datafusion_iceberg::catalog::catalog::IcebergCatalog;
Expand All @@ -65,11 +65,12 @@ use df_catalog::catalog::CachingCatalog;
use df_catalog::catalog_list::CachedEntity;
use df_catalog::information_schema::session_params::SessionProperty;
use df_catalog::table::CachingTable;
use embucket_functions::conversion::to_timestamp::ToTimestampFunc;
use embucket_functions::semi_structured::variant::visitors::visit_all;
use embucket_functions::visitors::{
copy_into_identifiers, fetch_to_limit, functions_rewriter, inline_aliases_in_query,
json_element, qualify_in_query, select_expr_aliases, table_functions,
table_functions_cte_relation, top_limit,
table_functions_cte_relation, timestamp, top_limit,
unimplemented::functions_checker::visit as unimplemented_functions_checker,
};
use iceberg_rust::catalog::Catalog;
Expand Down Expand Up @@ -249,6 +250,77 @@ impl UserQuery {
}
}

fn register_session_udfs(&self) {
// TO_TIMESTAMP
let format = self
.session
.get_session_variable("timestamp_input_format")
.unwrap_or("YYYY-MM-DD HH24:MI:SS.FF3 TZHTZM".to_string());
let tz = self
.session
.get_session_variable("timezone")
.unwrap_or("America/Los_Angeles".to_string());

let mapping = self
.session
.get_session_variable("timestamp_input_mapping")
.unwrap_or("timestamp_ntz".to_string());

let funcs = [
(
if mapping != "timestamp_ntz" {
Some(Arc::from(tz.clone()))
} else {
None
},
false,
"to_timestamp".to_string(),
),
(
if mapping != "timestamp_ntz" {
Some(Arc::from(tz.clone()))
} else {
None
},
true,
"try_to_timestamp".to_string(),
),
(None, false, "to_timestamp_ntz".to_string()),
(None, true, "try_to_timestamp_ntz".to_string()),
(
Some(Arc::from(tz.clone())),
false,
"to_timestamp_tz".to_string(),
),
(
Some(Arc::from(tz.clone())),
true,
"try_to_timestamp_tz".to_string(),
),
(
Some(Arc::from(tz.clone())),
false,
"to_timestamp_ltz".to_string(),
),
(
Some(Arc::from(tz.clone())),
true,
"try_to_timestamp_ltz".to_string(),
),
];

for (tz, r#try, name) in funcs {
self.session
.ctx
.register_udf(ScalarUDF::from(ToTimestampFunc::new(
tz,
format.clone(),
r#try,
name,
)));
}
}

#[instrument(name = "UserQuery::postprocess_query_statement", level = "trace", err)]
pub fn postprocess_query_statement_with_validation(statement: &mut DFStatement) -> Result<()> {
if let DFStatement::Statement(value) = statement {
Expand All @@ -262,6 +334,7 @@ impl UserQuery {
fetch_to_limit::visit(value).context(ex_error::SqlParserSnafu)?;
table_functions::visit(value);
qualify_in_query::visit(value);
timestamp::visit(value);
table_functions_cte_relation::visit(value);
visit_all(value);
}
Expand All @@ -279,6 +352,7 @@ impl UserQuery {
pub async fn execute(&mut self) -> Result<QueryResult> {
let statement = self.parse_query().context(ex_error::DataFusionSnafu)?;
self.query = statement.to_string();
self.register_session_udfs();

// Record the result as part of the current span.
tracing::Span::current().record("statement", format!("{statement:#?}"));
Expand Down
69 changes: 69 additions & 0 deletions crates/core-executor/src/tests/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -797,3 +797,72 @@ test_query!(
"MERGE INTO t1 USING (SELECT * FROM t2) AS t2 ON t1.a = t2.a WHEN MATCHED THEN UPDATE SET t1.c = t2.c WHEN NOT MATCHED THEN INSERT (a,c) VALUES(t2.a,t2.c)",
]
);

test_query!(
timestamp_scale,
r#"SELECT
TO_TIMESTAMP(1000000000, 0) AS "Scale in seconds",
TO_TIMESTAMP(1000000000, 3) AS "Scale in milliseconds",
TO_TIMESTAMP(1000000000, 6) AS "Scale in microseconds",
TO_TIMESTAMP(1000000000, 9) AS "Scale in nanoseconds";"#
);

test_query!(
timestamp_scaled,
r#"SELECT
TO_TIMESTAMP(1000000000) AS "Scale in seconds",
TO_TIMESTAMP(1000000000000, 3) AS "Scale in milliseconds",
TO_TIMESTAMP(1000000000000000, 6) AS "Scale in microseconds",
TO_TIMESTAMP(1000000000000000000, 9) AS "Scale in nanoseconds";"#
);

test_query!(
timestamp_scale_decimal,
r#"SELECT
TO_TIMESTAMP(1000000000::DECIMAL, 0) AS "Scale in seconds",
TO_TIMESTAMP(1000000000::DECIMAL, 3) AS "Scale in milliseconds",
TO_TIMESTAMP(1000000000::DECIMAL, 6) AS "Scale in microseconds",
TO_TIMESTAMP(1000000000::DECIMAL, 9) AS "Scale in nanoseconds";"#
);

test_query!(
timestamp_scale_decimal_scaled,
r#"SELECT
TO_TIMESTAMP(1000000000::DECIMAL, 0) AS "Scale in seconds",
TO_TIMESTAMP(1000000000000::DECIMAL, 3) AS "Scale in milliseconds",
TO_TIMESTAMP(1000000000000000::DECIMAL, 6) AS "Scale in microseconds",
TO_TIMESTAMP(1000000000000000000::DECIMAL, 9) AS "Scale in nanoseconds";"#
);

test_query!(
timestamp_scale_int_str,
r#"SELECT
TO_TIMESTAMP('1000000000') AS "Scale in seconds",
TO_TIMESTAMP('1000000000000') AS "Scale in milliseconds",
TO_TIMESTAMP('1000000000000000') AS "Scale in microseconds",
TO_TIMESTAMP('1000000000000000000') AS "Scale in nanoseconds";"#
);

test_query!(
timestamp_str_format,
"SELECT
TO_TIMESTAMP('04/05/2024 01:02:03', 'mm/dd/yyyy hh24:mi:ss') as a,
TO_TIMESTAMP('04/05/2024 01:02:03') as b",
setup_queries = ["SET timestamp_input_format = 'mm/dd/yyyy hh24:mi:ss'"]
);

test_query!(
timestamp_timestamp,
"SELECT TO_TIMESTAMP(1000000000::TIMESTAMP)"
);

test_query!(
timestamp_date,
"SELECT TO_TIMESTAMP('2022-01-01 11:30:00'::date)"
);

test_query!(
timestamp_timezone,
"SELECT TO_TIMESTAMP(1000000000)",
setup_queries = ["ALTER SESSION SET timestamp_input_mapping = 'timestamp_tz'"]
);
13 changes: 13 additions & 0 deletions crates/core-executor/src/tests/snapshots/query_timestamp_date.snap
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
source: crates/core-executor/src/tests/query.rs
description: "\"SELECT TO_TIMESTAMP('2022-01-01 11:30:00'::date)\""
---
Ok(
[
"+-------------------------------------------+",
"| to_timestamp(Utf8(\"2022-01-01 11:30:00\")) |",
"+-------------------------------------------+",
"| 2022-01-01T00:00:00 |",
"+-------------------------------------------+",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
source: crates/core-executor/src/tests/query.rs
description: "r#\"SELECT\n TO_TIMESTAMP(1000000000, 0) AS \"Scale in seconds\",\n TO_TIMESTAMP(1000000000, 3) AS \"Scale in milliseconds\",\n TO_TIMESTAMP(1000000000, 6) AS \"Scale in microseconds\",\n TO_TIMESTAMP(1000000000, 9) AS \"Scale in nanoseconds\";\"#"
---
Ok(
[
"+---------------------+-----------------------+-----------------------+----------------------+",
"| Scale in seconds | Scale in milliseconds | Scale in microseconds | Scale in nanoseconds |",
"+---------------------+-----------------------+-----------------------+----------------------+",
"| 2001-09-09T01:46:40 | 1970-01-12T13:46:40 | 1970-01-01T00:16:40 | 1970-01-01T00:00:01 |",
"+---------------------+-----------------------+-----------------------+----------------------+",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
source: crates/core-executor/src/tests/query.rs
description: "r#\"SELECT\n TO_TIMESTAMP(1000000000::DECIMAL, 0) AS \"Scale in seconds\",\n TO_TIMESTAMP(1000000000::DECIMAL, 3) AS \"Scale in milliseconds\",\n TO_TIMESTAMP(1000000000::DECIMAL, 6) AS \"Scale in microseconds\",\n TO_TIMESTAMP(1000000000::DECIMAL, 9) AS \"Scale in nanoseconds\";\"#"
---
Ok(
[
"+---------------------+-----------------------+-----------------------+----------------------+",
"| Scale in seconds | Scale in milliseconds | Scale in microseconds | Scale in nanoseconds |",
"+---------------------+-----------------------+-----------------------+----------------------+",
"| 2001-09-09T01:46:40 | 1970-01-12T13:46:40 | 1970-01-01T00:16:40 | 1970-01-01T00:00:01 |",
"+---------------------+-----------------------+-----------------------+----------------------+",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
source: crates/core-executor/src/tests/query.rs
description: "r#\"SELECT\n TO_TIMESTAMP(1000000000::DECIMAL, 0) AS \"Scale in seconds\",\n TO_TIMESTAMP(1000000000000::DECIMAL, 3) AS \"Scale in milliseconds\",\n TO_TIMESTAMP(1000000000000000::DECIMAL, 6) AS \"Scale in microseconds\",\n TO_TIMESTAMP(1000000000000000000::DECIMAL, 9) AS \"Scale in nanoseconds\";\"#"
---
Ok(
[
"+---------------------+-----------------------+-----------------------+----------------------+",
"| Scale in seconds | Scale in milliseconds | Scale in microseconds | Scale in nanoseconds |",
"+---------------------+-----------------------+-----------------------+----------------------+",
"| 2001-09-09T01:46:40 | 2001-09-09T01:46:40 | 2001-09-09T01:46:40 | 2001-09-09T01:46:40 |",
"+---------------------+-----------------------+-----------------------+----------------------+",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
source: crates/core-executor/src/tests/query.rs
description: "r#\"SELECT\n TO_TIMESTAMP('1000000000') AS \"Scale in seconds\",\n TO_TIMESTAMP('1000000000000') AS \"Scale in milliseconds\",\n TO_TIMESTAMP('1000000000000000') AS \"Scale in microseconds\",\n TO_TIMESTAMP('1000000000000000000') AS \"Scale in nanoseconds\";\"#"
---
Ok(
[
"+---------------------+-----------------------+-----------------------+----------------------+",
"| Scale in seconds | Scale in milliseconds | Scale in microseconds | Scale in nanoseconds |",
"+---------------------+-----------------------+-----------------------+----------------------+",
"| 2001-09-09T01:46:40 | 2001-09-09T01:46:40 | 2001-09-09T01:46:40 | 2001-09-09T01:46:40 |",
"+---------------------+-----------------------+-----------------------+----------------------+",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
source: crates/core-executor/src/tests/query.rs
description: "r#\"SELECT\n TO_TIMESTAMP(1000000000) AS \"Scale in seconds\",\n TO_TIMESTAMP(1000000000000, 3) AS \"Scale in milliseconds\",\n TO_TIMESTAMP(1000000000000000, 6) AS \"Scale in microseconds\",\n TO_TIMESTAMP(1000000000000000000, 9) AS \"Scale in nanoseconds\";\"#"
---
Ok(
[
"+---------------------+-----------------------+-----------------------+----------------------+",
"| Scale in seconds | Scale in milliseconds | Scale in microseconds | Scale in nanoseconds |",
"+---------------------+-----------------------+-----------------------+----------------------+",
"| 2001-09-09T01:46:40 | 2001-09-09T01:46:40 | 2001-09-09T01:46:40 | 2001-09-09T01:46:40 |",
"+---------------------+-----------------------+-----------------------+----------------------+",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
source: crates/core-executor/src/tests/query.rs
description: "\"SELECT\n TO_TIMESTAMP('04/05/2024 01:02:03', 'mm/dd/yyyy hh24:mi:ss') as a,\n TO_TIMESTAMP('04/05/2024 01:02:03') as b\""
info: "Setup queries: SET timestamp_input_format = 'mm/dd/yyyy hh24:mi:ss'"
---
Ok(
[
"+---------------------+---------------------+",
"| a | b |",
"+---------------------+---------------------+",
"| 2024-04-05T01:02:03 | 2024-04-05T01:02:03 |",
"+---------------------+---------------------+",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
source: crates/core-executor/src/tests/query.rs
description: "\"SELECT TO_TIMESTAMP(1000000000::TIMESTAMP)\""
---
Ok(
[
"+-----------------------------------------------+",
"| to_timestamp(to_timestamp(Int64(1000000000))) |",
"+-----------------------------------------------+",
"| 2001-09-09T01:46:40 |",
"+-----------------------------------------------+",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
source: crates/core-executor/src/tests/query.rs
description: "\"SELECT TO_TIMESTAMP(1000000000)\""
info: "Setup queries: ALTER SESSION SET timestamp_input_mapping = 'timestamp_tz'"
---
Ok(
[
"+---------------------------------+",
"| to_timestamp(Int64(1000000000)) |",
"+---------------------------------+",
"| 2001-09-08T18:46:40-07:00 |",
"+---------------------------------+",
],
)
3 changes: 3 additions & 0 deletions crates/embucket-functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ async-trait = { workspace = true }
aes-gcm = "0.10"
aes = "0.8"
rand = "0.8"
lazy_static = "1.5.0"
regex = "1.11.1"
chrono-tz = "0.10.3"

[dev-dependencies]
bytes = { workspace = true }
Expand Down
Loading