-
Notifications
You must be signed in to change notification settings - Fork 114
Description
Motivation
We use Databricks SQL to grab data regularly. We do this interactively, so sometimes we slightly tweak queries and rerun them. Something about the caching involved in use_cloud_fetch=True
can mean our returned query does not have the correct column names when aliases are used. Basic way to trigger this is:
- Ensure
use_cloud_fetch=True
in the connection. - Write a query with a non-trivial plan against a managed delta table.
- Ensure the returned data is large enough to actually utilize cloud fetch (not 100% sure if this is important).
- Have an alias in the final
select
statement, something likesome_column as a
. - Run the query.
- Rename the alias, i.e.
some_column as b
. - Rerun the query.
- Note that the query run in 7 has the schema (column names) of the query run in 5.
- This means that
some_column
is still calleda
and not calledb
in query 7.
Minimum Reproducible Example
I made an MRE for convenience using the taxi dataset. This is with version 4. I've seen it consistently in version 3 as well. Haven't tested 2. Here is my pip freeze
for v4 setup from minimal install of pip install 'databricks-sql-connector[pyarrow]'
:
certifi==2025.1.31
charset-normalizer==3.4.1
databricks-sql-connector==4.0.0
et_xmlfile==2.0.0
idna==3.10
lz4==4.4.3
numpy==1.26.4
oauthlib==3.2.2
openpyxl==3.1.5
pandas==2.2.3
pyarrow==19.0.1
python-dateutil==2.9.0.post0
pytz==2025.1
requests==2.32.3
six==1.17.0
thrift==0.20.0
tzdata==2025.1
urllib3==2.3.0
Here is the code. I generate an example managed table from the samples.nyctaxi.trips
data so its truly reproducible: https://docs.databricks.com/aws/en/discover/databricks-datasets#unity-catalog-datasets
from databricks.sql import connect
SERVER_HOSTNAME = '<some_server>'
HTTP_PATH = '/sql/1.0/warehouses/<some_warehouse_id>'
ACCESS_TOKEN = '<some_access_token>'
EXAMPLE_TABLE_NAME = '<some_catalog>.<some_schema>.minimum_example_table'
def run_query_via_databricks_sql(query: str, use_cloud_fetch: bool = False,):
with connect(
server_hostname=SERVER_HOSTNAME,
http_path=HTTP_PATH,
access_token=ACCESS_TOKEN,
use_cloud_fetch=use_cloud_fetch,
) as connection:
with connection.cursor() as cursor:
cursor.execute(query)
data = cursor.fetchall_arrow()
data = data.to_pandas()
return data
# Create a managed table to play with from the Delta Shared table
# We do a cross join here just to generate enough rows to trigger
# cloud fetch and hence the bug.
print("Creating the example table")
create_the_example_table = f"""
create or replace table {EXAMPLE_TABLE_NAME} as SELECT
some_trips.tpep_pickup_datetime,
some_trips.tpep_dropoff_datetime,
some_trips.trip_distance,
some_trips.fare_amount,
some_trips.pickup_zip,
some_trips.dropoff_zip
FROM
samples.nyctaxi.trips as some_trips
full outer join
samples.nyctaxi.trips
limit 20000000
"""
run_query_via_databricks_sql(create_the_example_table)
# Query A ends with an alias of "as a"
query_a = f"""
with some_trips as (
SELECT
tpep_pickup_datetime,
tpep_dropoff_datetime,
trip_distance,
fare_amount,
pickup_zip,
dropoff_zip
FROM
{EXAMPLE_TABLE_NAME}
),
grouped_trips_pickup as (
SELECT
some_trips.pickup_zip,
count(*) as trip_count
FROM
some_trips
GROUP BY
some_trips.pickup_zip
),
grouped_trips_dropoff as (
SELECT
some_trips.dropoff_zip,
count(*) as trip_count
FROM
some_trips
GROUP BY
some_trips.dropoff_zip
),
trips_with_grouped_pickup_and_dropoff as (
SELECT
some_trips.*,
grouped_trips_pickup.trip_count as pickup_trip_count,
grouped_trips_dropoff.trip_count as dropoff_trip_count
FROM
some_trips
LEFT JOIN
grouped_trips_pickup
ON
some_trips.pickup_zip = grouped_trips_pickup.pickup_zip
LEFT JOIN
grouped_trips_dropoff
ON
some_trips.dropoff_zip = grouped_trips_dropoff.dropoff_zip
)
select
tpep_pickup_datetime,
dropoff_trip_count as a
from
trips_with_grouped_pickup_and_dropoff
"""
# Query B ends with an alias of "as b"
query_b = query_a.replace(" as a", " as b")
# Without cloud fetch, correct schema in both
print("Running query a without cloud fetch")
df_a__no_cloud_fetch = run_query_via_databricks_sql(query_a, use_cloud_fetch=False)
print(f"Get back a column named 'a': {df_a__no_cloud_fetch.columns.to_list()=}")
print("Running query b without cloud fetch")
df_b__no_cloud_fetch = run_query_via_databricks_sql(query_b, use_cloud_fetch=False)
print(f"Get back a column named 'b': {df_b__no_cloud_fetch.columns.to_list()=}")
# With cloud fetch, schema is not updated in query_b
print("Running query a WITH cloud fetch")
df_a__with_cloud_fetch = run_query_via_databricks_sql(query_a, use_cloud_fetch=True)
print(f"Get back a column named 'a': {df_a__with_cloud_fetch.columns.to_list()=}")
print("Running query b WITH cloud fetch")
df_b__with_cloud_fetch = run_query_via_databricks_sql(query_b, use_cloud_fetch=True)
print(f"DO NOT back a column named 'b', still have column named 'a': {df_b__with_cloud_fetch.columns.to_list()=}")
What I get back is this:
Creating the example table
Running query a without cloud fetch
Get back a column named 'a': df_a__no_cloud_fetch.columns.to_list()=['tpep_pickup_datetime', 'a']
Running query b without cloud fetch
Get back a column named 'b': df_b__no_cloud_fetch.columns.to_list()=['tpep_pickup_datetime', 'b']
Running query a WITH cloud fetch
Get back a column named 'a': df_a__with_cloud_fetch.columns.to_list()=['tpep_pickup_datetime', 'a']
Running query b WITH cloud fetch
DO NOT back a column named 'b', still have column named 'a': df_b__with_cloud_fetch.columns.to_list()=['tpep_pickup_datetime', 'a']
Issues
- We don't get the column names back we expect.
- Potentially, if two aliases are used and swapped, columns can be mislabelled (haven't checked this case).