Skip to content

SEA: Execution Phase #645

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 86 commits into from
Jul 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
5bf5d4c
Separate Session related functionality from Connection class (#571)
varun-edachali-dbx May 28, 2025
400a8bd
Introduce Backend Interface (DatabricksClient) (#573)
varun-edachali-dbx May 30, 2025
3c78ed7
Implement ResultSet Abstraction (backend interfaces for fetch phase) …
varun-edachali-dbx Jun 3, 2025
9625229
Introduce Sea HTTP Client and test script (#583)
varun-edachali-dbx Jun 4, 2025
0887bc1
Introduce `SeaDatabricksClient` (Session Implementation) (#582)
varun-edachali-dbx Jun 9, 2025
6d63df0
Normalise Execution Response (clean backend interfaces) (#587)
varun-edachali-dbx Jun 11, 2025
ba8d9fd
Introduce models for `SeaDatabricksClient` (#595)
varun-edachali-dbx Jun 12, 2025
bb3f15a
Introduce preliminary SEA Result Set (#588)
varun-edachali-dbx Jun 12, 2025
19f1fae
Merge branch 'main' into sea-migration
varun-edachali-dbx Jun 17, 2025
6c5ba6d
remove invalid ExecuteResponse import
varun-edachali-dbx Jun 17, 2025
5e5147b
Separate Session related functionality from Connection class (#571)
varun-edachali-dbx May 28, 2025
57370b3
Introduce Backend Interface (DatabricksClient) (#573)
varun-edachali-dbx May 30, 2025
75752bf
Implement ResultSet Abstraction (backend interfaces for fetch phase) …
varun-edachali-dbx Jun 3, 2025
450b80d
remove un-necessary initialisation assertions
varun-edachali-dbx Jun 18, 2025
a926f02
remove un-necessary line break s
varun-edachali-dbx Jun 18, 2025
55ad001
more un-necessary line breaks
varun-edachali-dbx Jun 18, 2025
fa15730
constrain diff of test_closing_connection_closes_commands
varun-edachali-dbx Jun 18, 2025
019c7fb
reduce diff of test_closing_connection_closes_commands
varun-edachali-dbx Jun 18, 2025
726abe7
use pytest-like assertions for test_closing_connection_closes_commands
varun-edachali-dbx Jun 18, 2025
bf6d41c
ensure command_id is not None
varun-edachali-dbx Jun 18, 2025
5afa733
line breaks after multi-line pyfocs
varun-edachali-dbx Jun 18, 2025
e3dfd36
ensure non null operationHandle for commandId creation
varun-edachali-dbx Jun 18, 2025
63360b3
use command_id methods instead of explicit guid_to_hex_id conversion
varun-edachali-dbx Jun 18, 2025
13ffb8d
remove un-necessary artifacts in test_session, add back assertion
varun-edachali-dbx Jun 18, 2025
a74d279
Implement SeaDatabricksClient (Complete Execution Spec) (#590)
varun-edachali-dbx Jun 18, 2025
d759050
add from __future__ import annotations to remove string literals arou…
varun-edachali-dbx Jun 19, 2025
1e21434
move docstring of DatabricksClient within class
varun-edachali-dbx Jun 24, 2025
cd4015b
move ThriftResultSet import to top of file
varun-edachali-dbx Jun 24, 2025
ed8b610
make backend/utils __init__ file empty
varun-edachali-dbx Jun 24, 2025
94d951e
use from __future__ import annotations to remove string literals arou…
varun-edachali-dbx Jun 24, 2025
c20058e
use lazy logging
varun-edachali-dbx Jun 24, 2025
fe3acb1
replace getters with property tag
varun-edachali-dbx Jun 24, 2025
9fb6a76
Merge branch 'main' into backend-refactors
varun-edachali-dbx Jun 24, 2025
61dfc4d
set active_command_id to None, not active_op_handle
varun-edachali-dbx Jun 24, 2025
64fb9b2
align test_session with pytest instead of unittest
varun-edachali-dbx Jun 24, 2025
cbf63f9
Merge branch 'main' into sea-migration
varun-edachali-dbx Jun 26, 2025
59b4825
remove duplicate test, correct active_command_id attribute
varun-edachali-dbx Jun 26, 2025
e380654
SeaDatabricksClient: Add Metadata Commands (#593)
varun-edachali-dbx Jun 26, 2025
677a7b0
SEA volume operations fix: assign `manifest.is_volume_operation` to `…
varun-edachali-dbx Jun 26, 2025
45585d4
Introduce manual SEA test scripts for Exec Phase (#589)
varun-edachali-dbx Jun 27, 2025
70c7dc8
Complete Fetch Phase (for `INLINE` disposition and `JSON_ARRAY` forma…
varun-edachali-dbx Jul 2, 2025
abf9aab
Merge branch 'main' into sea-migration
varun-edachali-dbx Jul 3, 2025
9b4b606
Merge branch 'main' into backend-refactors
varun-edachali-dbx Jul 3, 2025
4f11ff0
Introduce `row_limit` param (#607)
varun-edachali-dbx Jul 7, 2025
45f5c26
Merge branch 'main' into backend-refactors
varun-edachali-dbx Jul 10, 2025
2c9368a
formatting (black)
varun-edachali-dbx Jul 10, 2025
9b1b1f5
remove repetition from Session.__init__
varun-edachali-dbx Jul 10, 2025
77e23d3
Merge branch 'backend-refactors' into sea-migration
varun-edachali-dbx Jul 11, 2025
3bd3aef
fix merge artifacts
varun-edachali-dbx Jul 11, 2025
6d4701f
correct patch paths
varun-edachali-dbx Jul 11, 2025
dc1cb6d
fix type issues
varun-edachali-dbx Jul 14, 2025
5d04cd0
Merge branch 'main' into sea-migration
varun-edachali-dbx Jul 15, 2025
922c448
explicitly close result queue
varun-edachali-dbx Jul 15, 2025
1a0575a
Complete Fetch Phase (`EXTERNAL_LINKS` disposition and `ARROW` format…
varun-edachali-dbx Jul 16, 2025
c07beb1
SEA Session Configuration Fix: Explicitly convert values to `str` (#…
varun-edachali-dbx Jul 16, 2025
640cc82
SEA: add support for `Hybrid` disposition (#631)
varun-edachali-dbx Jul 17, 2025
8fbca9d
SEA: Reduce network calls for synchronous commands (#633)
varun-edachali-dbx Jul 19, 2025
806e5f5
SEA: Decouple Link Fetching (#632)
varun-edachali-dbx Jul 21, 2025
b57c3f3
Chunk download latency (#634)
saishreeeee Jul 21, 2025
ef5836b
acquire lock before notif + formatting (black)
varun-edachali-dbx Jul 21, 2025
ad6b356
Revert "acquire lock before notif + formatting (black)"
varun-edachali-dbx Jul 22, 2025
77c0343
Revert "Chunk download latency (#634)"
varun-edachali-dbx Jul 22, 2025
0d6b53c
Revert "SEA: Decouple Link Fetching (#632)"
varun-edachali-dbx Jul 22, 2025
ab2e43d
Revert "Complete Fetch Phase (`EXTERNAL_LINKS` disposition and `ARROW…
varun-edachali-dbx Jul 22, 2025
e43c07b
Revert "Complete Fetch Phase (for `INLINE` disposition and `JSON_ARRA…
varun-edachali-dbx Jul 22, 2025
877b3b5
fix typing, errors
varun-edachali-dbx Jul 22, 2025
afdb035
Merge branch 'main' into execution-sea
varun-edachali-dbx Jul 22, 2025
809b39e
address more merge conflicts
varun-edachali-dbx Jul 22, 2025
ae5f2db
reduce changes in docstrings
varun-edachali-dbx Jul 22, 2025
01452bc
simplify param models
varun-edachali-dbx Jul 22, 2025
77e7061
align description extracted with Thrift
varun-edachali-dbx Jul 22, 2025
bc7ae81
nits: string literalrs around type defs, naming, excess changes
varun-edachali-dbx Jul 22, 2025
2fb1c95
remove excess changes
varun-edachali-dbx Jul 22, 2025
40f6ec4
remove excess changes
varun-edachali-dbx Jul 22, 2025
2485a73
remove duplicate cursor def
varun-edachali-dbx Jul 22, 2025
5db6d01
make error more descriptive on command failure
varun-edachali-dbx Jul 22, 2025
4fe5919
remove redundant ColumnInfo model
varun-edachali-dbx Jul 22, 2025
6a4faed
ensure error exists before extracting err details
varun-edachali-dbx Jul 22, 2025
3195765
demarcate error code vs message
varun-edachali-dbx Jul 22, 2025
e48a6fb
remove redundant missing statement_id check
varun-edachali-dbx Jul 22, 2025
2c0f303
docstring for _filter_session_configuration
varun-edachali-dbx Jul 22, 2025
a7f8876
remove redundant (un-used) methods
varun-edachali-dbx Jul 22, 2025
1444a67
Update src/databricks/sql/backend/sea/utils/filters.py
varun-edachali-dbx Jul 22, 2025
b3ebec5
extract status from resp instead of additional expensive call
varun-edachali-dbx Jul 22, 2025
92551b1
remove ValueError for potentially empty state
varun-edachali-dbx Jul 22, 2025
a740ece
default CommandState.RUNNING
varun-edachali-dbx Jul 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions examples/experimental/sea_connector_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""
Main script to run all SEA connector tests.

This script runs all the individual test modules and displays
a summary of test results with visual indicators.

In order to run the script, the following environment variables need to be set:
- DATABRICKS_SERVER_HOSTNAME: The hostname of the Databricks server
- DATABRICKS_HTTP_PATH: The HTTP path of the Databricks server
- DATABRICKS_TOKEN: The token to use for authentication
"""

import os
import sys
import logging
import subprocess
from typing import List, Tuple

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

TEST_MODULES = [
"test_sea_session",
"test_sea_sync_query",
"test_sea_async_query",
"test_sea_metadata",
]


def run_test_module(module_name: str) -> bool:
"""Run a test module and return success status."""
module_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "tests", f"{module_name}.py"
)

# Simply run the module as a script - each module handles its own test execution
result = subprocess.run(
[sys.executable, module_path], capture_output=True, text=True
)

# Log the output from the test module
if result.stdout:
for line in result.stdout.strip().split("\n"):
logger.info(line)

if result.stderr:
for line in result.stderr.strip().split("\n"):
logger.error(line)

return result.returncode == 0


def run_tests() -> List[Tuple[str, bool]]:
"""Run all tests and return results."""
results = []

for module_name in TEST_MODULES:
try:
logger.info(f"\n{'=' * 50}")
logger.info(f"Running test: {module_name}")
logger.info(f"{'-' * 50}")

success = run_test_module(module_name)
results.append((module_name, success))

status = "✅ PASSED" if success else "❌ FAILED"
logger.info(f"Test {module_name}: {status}")

except Exception as e:
logger.error(f"Error loading or running test {module_name}: {str(e)}")
import traceback

logger.error(traceback.format_exc())
results.append((module_name, False))

return results


def print_summary(results: List[Tuple[str, bool]]) -> None:
"""Print a summary of test results."""
logger.info(f"\n{'=' * 50}")
logger.info("TEST SUMMARY")
logger.info(f"{'-' * 50}")

passed = sum(1 for _, success in results if success)
total = len(results)

for module_name, success in results:
status = "✅ PASSED" if success else "❌ FAILED"
logger.info(f"{status} - {module_name}")

logger.info(f"{'-' * 50}")
logger.info(f"Total: {total} | Passed: {passed} | Failed: {total - passed}")
logger.info(f"{'=' * 50}")


if __name__ == "__main__":
# Check if required environment variables are set
required_vars = [
"DATABRICKS_SERVER_HOSTNAME",
"DATABRICKS_HTTP_PATH",
"DATABRICKS_TOKEN",
]
missing_vars = [var for var in required_vars if not os.environ.get(var)]

if missing_vars:
logger.error(
f"Missing required environment variables: {', '.join(missing_vars)}"
)
logger.error("Please set these variables before running the tests.")
sys.exit(1)

# Run all tests
results = run_tests()

# Print summary
print_summary(results)

# Exit with appropriate status code
all_passed = all(success for _, success in results)
sys.exit(0 if all_passed else 1)
Empty file.
192 changes: 192 additions & 0 deletions examples/experimental/tests/test_sea_async_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
"""
Test for SEA asynchronous query execution functionality.
"""
import os
import sys
import logging
import time
from databricks.sql.client import Connection
from databricks.sql.backend.types import CommandState

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def test_sea_async_query_with_cloud_fetch():
"""
Test executing a query asynchronously using the SEA backend with cloud fetch enabled.

This function connects to a Databricks SQL endpoint using the SEA backend,
executes a simple query asynchronously with cloud fetch enabled, and verifies that execution completes successfully.
"""
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
access_token = os.environ.get("DATABRICKS_TOKEN")
catalog = os.environ.get("DATABRICKS_CATALOG")

if not all([server_hostname, http_path, access_token]):
logger.error("Missing required environment variables.")
logger.error(
"Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN."
)
return False

try:
# Create connection with cloud fetch enabled
logger.info(
"Creating connection for asynchronous query execution with cloud fetch enabled"
)
connection = Connection(
server_hostname=server_hostname,
http_path=http_path,
access_token=access_token,
catalog=catalog,
schema="default",
use_sea=True,
user_agent_entry="SEA-Test-Client",
use_cloud_fetch=True,
enable_query_result_lz4_compression=False,
)

logger.info(
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
)

# Execute a simple query asynchronously
cursor = connection.cursor()
logger.info(
"Executing asynchronous query with cloud fetch: SELECT 1 as test_value"
)
cursor.execute_async("SELECT 1 as test_value")
logger.info(
"Asynchronous query submitted successfully with cloud fetch enabled"
)

# Check query state
logger.info("Checking query state...")
while cursor.is_query_pending():
logger.info("Query is still pending, waiting...")
time.sleep(1)

logger.info("Query is no longer pending, getting results...")
cursor.get_async_execution_result()
logger.info(
"Successfully retrieved asynchronous query results with cloud fetch enabled"
)

# Close resources
cursor.close()
connection.close()
logger.info("Successfully closed SEA session")

return True

except Exception as e:
logger.error(
f"Error during SEA asynchronous query execution test with cloud fetch: {str(e)}"
)
import traceback

logger.error(traceback.format_exc())
return False


def test_sea_async_query_without_cloud_fetch():
"""
Test executing a query asynchronously using the SEA backend with cloud fetch disabled.

This function connects to a Databricks SQL endpoint using the SEA backend,
executes a simple query asynchronously with cloud fetch disabled, and verifies that execution completes successfully.
"""
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
access_token = os.environ.get("DATABRICKS_TOKEN")
catalog = os.environ.get("DATABRICKS_CATALOG")

if not all([server_hostname, http_path, access_token]):
logger.error("Missing required environment variables.")
logger.error(
"Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN."
)
return False

try:
# Create connection with cloud fetch disabled
logger.info(
"Creating connection for asynchronous query execution with cloud fetch disabled"
)
connection = Connection(
server_hostname=server_hostname,
http_path=http_path,
access_token=access_token,
catalog=catalog,
schema="default",
use_sea=True,
user_agent_entry="SEA-Test-Client",
use_cloud_fetch=False,
enable_query_result_lz4_compression=False,
)

logger.info(
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
)

# Execute a simple query asynchronously
cursor = connection.cursor()
logger.info(
"Executing asynchronous query without cloud fetch: SELECT 1 as test_value"
)
cursor.execute_async("SELECT 1 as test_value")
logger.info(
"Asynchronous query submitted successfully with cloud fetch disabled"
)

# Check query state
logger.info("Checking query state...")
while cursor.is_query_pending():
logger.info("Query is still pending, waiting...")
time.sleep(1)

logger.info("Query is no longer pending, getting results...")
cursor.get_async_execution_result()
logger.info(
"Successfully retrieved asynchronous query results with cloud fetch disabled"
)

# Close resources
cursor.close()
connection.close()
logger.info("Successfully closed SEA session")

return True

except Exception as e:
logger.error(
f"Error during SEA asynchronous query execution test without cloud fetch: {str(e)}"
)
import traceback

logger.error(traceback.format_exc())
return False


def test_sea_async_query_exec():
"""
Run both asynchronous query tests and return overall success.
"""
with_cloud_fetch_success = test_sea_async_query_with_cloud_fetch()
logger.info(
f"Asynchronous query with cloud fetch: {'✅ PASSED' if with_cloud_fetch_success else '❌ FAILED'}"
)

without_cloud_fetch_success = test_sea_async_query_without_cloud_fetch()
logger.info(
f"Asynchronous query without cloud fetch: {'✅ PASSED' if without_cloud_fetch_success else '❌ FAILED'}"
)

return with_cloud_fetch_success and without_cloud_fetch_success


if __name__ == "__main__":
success = test_sea_async_query_exec()
sys.exit(0 if success else 1)
Loading
Loading