Skip to content

SEA: Fetch Phase #650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 76 commits into from
Jul 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
5bf5d4c
Separate Session related functionality from Connection class (#571)
varun-edachali-dbx May 28, 2025
400a8bd
Introduce Backend Interface (DatabricksClient) (#573)
varun-edachali-dbx May 30, 2025
3c78ed7
Implement ResultSet Abstraction (backend interfaces for fetch phase) …
varun-edachali-dbx Jun 3, 2025
9625229
Introduce Sea HTTP Client and test script (#583)
varun-edachali-dbx Jun 4, 2025
0887bc1
Introduce `SeaDatabricksClient` (Session Implementation) (#582)
varun-edachali-dbx Jun 9, 2025
6d63df0
Normalise Execution Response (clean backend interfaces) (#587)
varun-edachali-dbx Jun 11, 2025
ba8d9fd
Introduce models for `SeaDatabricksClient` (#595)
varun-edachali-dbx Jun 12, 2025
bb3f15a
Introduce preliminary SEA Result Set (#588)
varun-edachali-dbx Jun 12, 2025
19f1fae
Merge branch 'main' into sea-migration
varun-edachali-dbx Jun 17, 2025
6c5ba6d
remove invalid ExecuteResponse import
varun-edachali-dbx Jun 17, 2025
5e5147b
Separate Session related functionality from Connection class (#571)
varun-edachali-dbx May 28, 2025
57370b3
Introduce Backend Interface (DatabricksClient) (#573)
varun-edachali-dbx May 30, 2025
75752bf
Implement ResultSet Abstraction (backend interfaces for fetch phase) …
varun-edachali-dbx Jun 3, 2025
450b80d
remove un-necessary initialisation assertions
varun-edachali-dbx Jun 18, 2025
a926f02
remove un-necessary line break s
varun-edachali-dbx Jun 18, 2025
55ad001
more un-necessary line breaks
varun-edachali-dbx Jun 18, 2025
fa15730
constrain diff of test_closing_connection_closes_commands
varun-edachali-dbx Jun 18, 2025
019c7fb
reduce diff of test_closing_connection_closes_commands
varun-edachali-dbx Jun 18, 2025
726abe7
use pytest-like assertions for test_closing_connection_closes_commands
varun-edachali-dbx Jun 18, 2025
bf6d41c
ensure command_id is not None
varun-edachali-dbx Jun 18, 2025
5afa733
line breaks after multi-line pyfocs
varun-edachali-dbx Jun 18, 2025
e3dfd36
ensure non null operationHandle for commandId creation
varun-edachali-dbx Jun 18, 2025
63360b3
use command_id methods instead of explicit guid_to_hex_id conversion
varun-edachali-dbx Jun 18, 2025
13ffb8d
remove un-necessary artifacts in test_session, add back assertion
varun-edachali-dbx Jun 18, 2025
a74d279
Implement SeaDatabricksClient (Complete Execution Spec) (#590)
varun-edachali-dbx Jun 18, 2025
d759050
add from __future__ import annotations to remove string literals arou…
varun-edachali-dbx Jun 19, 2025
1e21434
move docstring of DatabricksClient within class
varun-edachali-dbx Jun 24, 2025
cd4015b
move ThriftResultSet import to top of file
varun-edachali-dbx Jun 24, 2025
ed8b610
make backend/utils __init__ file empty
varun-edachali-dbx Jun 24, 2025
94d951e
use from __future__ import annotations to remove string literals arou…
varun-edachali-dbx Jun 24, 2025
c20058e
use lazy logging
varun-edachali-dbx Jun 24, 2025
fe3acb1
replace getters with property tag
varun-edachali-dbx Jun 24, 2025
9fb6a76
Merge branch 'main' into backend-refactors
varun-edachali-dbx Jun 24, 2025
61dfc4d
set active_command_id to None, not active_op_handle
varun-edachali-dbx Jun 24, 2025
64fb9b2
align test_session with pytest instead of unittest
varun-edachali-dbx Jun 24, 2025
cbf63f9
Merge branch 'main' into sea-migration
varun-edachali-dbx Jun 26, 2025
59b4825
remove duplicate test, correct active_command_id attribute
varun-edachali-dbx Jun 26, 2025
e380654
SeaDatabricksClient: Add Metadata Commands (#593)
varun-edachali-dbx Jun 26, 2025
677a7b0
SEA volume operations fix: assign `manifest.is_volume_operation` to `…
varun-edachali-dbx Jun 26, 2025
45585d4
Introduce manual SEA test scripts for Exec Phase (#589)
varun-edachali-dbx Jun 27, 2025
70c7dc8
Complete Fetch Phase (for `INLINE` disposition and `JSON_ARRAY` forma…
varun-edachali-dbx Jul 2, 2025
abf9aab
Merge branch 'main' into sea-migration
varun-edachali-dbx Jul 3, 2025
9b4b606
Merge branch 'main' into backend-refactors
varun-edachali-dbx Jul 3, 2025
4f11ff0
Introduce `row_limit` param (#607)
varun-edachali-dbx Jul 7, 2025
45f5c26
Merge branch 'main' into backend-refactors
varun-edachali-dbx Jul 10, 2025
2c9368a
formatting (black)
varun-edachali-dbx Jul 10, 2025
9b1b1f5
remove repetition from Session.__init__
varun-edachali-dbx Jul 10, 2025
77e23d3
Merge branch 'backend-refactors' into sea-migration
varun-edachali-dbx Jul 11, 2025
3bd3aef
fix merge artifacts
varun-edachali-dbx Jul 11, 2025
6d4701f
correct patch paths
varun-edachali-dbx Jul 11, 2025
dc1cb6d
fix type issues
varun-edachali-dbx Jul 14, 2025
5d04cd0
Merge branch 'main' into sea-migration
varun-edachali-dbx Jul 15, 2025
922c448
explicitly close result queue
varun-edachali-dbx Jul 15, 2025
1a0575a
Complete Fetch Phase (`EXTERNAL_LINKS` disposition and `ARROW` format…
varun-edachali-dbx Jul 16, 2025
c07beb1
SEA Session Configuration Fix: Explicitly convert values to `str` (#…
varun-edachali-dbx Jul 16, 2025
640cc82
SEA: add support for `Hybrid` disposition (#631)
varun-edachali-dbx Jul 17, 2025
8fbca9d
SEA: Reduce network calls for synchronous commands (#633)
varun-edachali-dbx Jul 19, 2025
806e5f5
SEA: Decouple Link Fetching (#632)
varun-edachali-dbx Jul 21, 2025
b57c3f3
Chunk download latency (#634)
saishreeeee Jul 21, 2025
ef5836b
acquire lock before notif + formatting (black)
varun-edachali-dbx Jul 21, 2025
4fd2a3f
Merge branch 'main' into sea-migration
varun-edachali-dbx Jul 23, 2025
26f8947
fix imports
varun-edachali-dbx Jul 23, 2025
2d44596
add get_chunk_link s
varun-edachali-dbx Jul 23, 2025
99e7435
simplify description extraction
varun-edachali-dbx Jul 23, 2025
54ec080
pass session_id_hex to ThriftResultSet
varun-edachali-dbx Jul 23, 2025
f9f9f31
revert to main's extract description
varun-edachali-dbx Jul 23, 2025
51cef2b
validate row count for sync query tests as well
varun-edachali-dbx Jul 23, 2025
387102d
guid_hex -> hex_guid
varun-edachali-dbx Jul 23, 2025
d53d1ea
reduce diff
varun-edachali-dbx Jul 23, 2025
c7810aa
reduce diff
varun-edachali-dbx Jul 23, 2025
b3072bd
reduce diff
varun-edachali-dbx Jul 23, 2025
8be5264
set .value in compression
varun-edachali-dbx Jul 23, 2025
80692e3
reduce diff
varun-edachali-dbx Jul 23, 2025
83e45ae
is_direct_results -> has_more_rows
varun-edachali-dbx Jul 25, 2025
ddfae38
ensure result set initialised
varun-edachali-dbx Jul 28, 2025
0cd17c8
minor telemetry changes
saishreeeee Jul 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 57 additions & 8 deletions examples/experimental/tests/test_sea_async_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,20 @@ def test_sea_async_query_with_cloud_fetch():
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
)

# Execute a simple query asynchronously
# Execute a query that generates large rows to force multiple chunks
requested_row_count = 5000
cursor = connection.cursor()
query = f"""
SELECT
id,
concat('value_', repeat('a', 10000)) as test_value
FROM range(1, {requested_row_count} + 1) AS t(id)
"""

logger.info(
"Executing asynchronous query with cloud fetch: SELECT 1 as test_value"
f"Executing asynchronous query with cloud fetch to generate {requested_row_count} rows"
)
cursor.execute_async("SELECT 1 as test_value")
cursor.execute_async(query)
logger.info(
"Asynchronous query submitted successfully with cloud fetch enabled"
)
Expand All @@ -70,8 +78,25 @@ def test_sea_async_query_with_cloud_fetch():

logger.info("Query is no longer pending, getting results...")
cursor.get_async_execution_result()

results = [cursor.fetchone()]
results.extend(cursor.fetchmany(10))
results.extend(cursor.fetchall())
actual_row_count = len(results)

logger.info(
f"Requested {requested_row_count} rows, received {actual_row_count} rows"
)

# Verify total row count
if actual_row_count != requested_row_count:
logger.error(
f"FAIL: Row count mismatch. Expected {requested_row_count}, got {actual_row_count}"
)
return False

logger.info(
"Successfully retrieved asynchronous query results with cloud fetch enabled"
"PASS: Received correct number of rows with cloud fetch and all fetch methods work correctly"
)

# Close resources
Expand Down Expand Up @@ -131,12 +156,20 @@ def test_sea_async_query_without_cloud_fetch():
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
)

# Execute a simple query asynchronously
# For non-cloud fetch, use a smaller row count to avoid exceeding inline limits
requested_row_count = 100
cursor = connection.cursor()
query = f"""
SELECT
id,
concat('value_', repeat('a', 100)) as test_value
FROM range(1, {requested_row_count} + 1) AS t(id)
"""

logger.info(
"Executing asynchronous query without cloud fetch: SELECT 1 as test_value"
f"Executing asynchronous query without cloud fetch to generate {requested_row_count} rows"
)
cursor.execute_async("SELECT 1 as test_value")
cursor.execute_async(query)
logger.info(
"Asynchronous query submitted successfully with cloud fetch disabled"
)
Expand All @@ -149,8 +182,24 @@ def test_sea_async_query_without_cloud_fetch():

logger.info("Query is no longer pending, getting results...")
cursor.get_async_execution_result()
results = [cursor.fetchone()]
results.extend(cursor.fetchmany(10))
results.extend(cursor.fetchall())
actual_row_count = len(results)

logger.info(
f"Requested {requested_row_count} rows, received {actual_row_count} rows"
)

# Verify total row count
if actual_row_count != requested_row_count:
logger.error(
f"FAIL: Row count mismatch. Expected {requested_row_count}, got {actual_row_count}"
)
return False

logger.info(
"Successfully retrieved asynchronous query results with cloud fetch disabled"
"PASS: Received correct number of rows without cloud fetch and all fetch methods work correctly"
)

# Close resources
Expand Down
54 changes: 46 additions & 8 deletions examples/experimental/tests/test_sea_sync_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,34 @@ def test_sea_sync_query_with_cloud_fetch():
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
)

# Execute a simple query
# Execute a query that generates large rows to force multiple chunks
requested_row_count = 10000
cursor = connection.cursor()
query = f"""
SELECT
id,
concat('value_', repeat('a', 10000)) as test_value
FROM range(1, {requested_row_count} + 1) AS t(id)
"""

logger.info(
f"Executing synchronous query with cloud fetch to generate {requested_row_count} rows"
)
cursor.execute(query)
results = [cursor.fetchone()]
results.extend(cursor.fetchmany(10))
results.extend(cursor.fetchall())
actual_row_count = len(results)
logger.info(
"Executing synchronous query with cloud fetch: SELECT 1 as test_value"
f"{actual_row_count} rows retrieved against {requested_row_count} requested"
)
cursor.execute("SELECT 1 as test_value")
logger.info("Query executed successfully with cloud fetch enabled")

# Verify total row count
if actual_row_count != requested_row_count:
logger.error(
f"FAIL: Row count mismatch. Expected {requested_row_count}, got {actual_row_count}"
)
return False

# Close resources
cursor.close()
Expand Down Expand Up @@ -115,13 +136,30 @@ def test_sea_sync_query_without_cloud_fetch():
f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}"
)

# Execute a simple query
# For non-cloud fetch, use a smaller row count to avoid exceeding inline limits
requested_row_count = 100
cursor = connection.cursor()
logger.info(
"Executing synchronous query without cloud fetch: SELECT 1 as test_value"
f"Executing synchronous query without cloud fetch: SELECT {requested_row_count} rows"
)
cursor.execute(
"SELECT id, 'test_value_' || CAST(id as STRING) as test_value FROM range(1, 101)"
)
cursor.execute("SELECT 1 as test_value")
logger.info("Query executed successfully with cloud fetch disabled")

results = [cursor.fetchone()]
results.extend(cursor.fetchmany(10))
results.extend(cursor.fetchall())
actual_row_count = len(results)
logger.info(
f"{actual_row_count} rows retrieved against {requested_row_count} requested"
)

# Verify total row count
if actual_row_count != requested_row_count:
logger.error(
f"FAIL: Row count mismatch. Expected {requested_row_count}, got {actual_row_count}"
)
return False

# Close resources
cursor.close()
Expand Down
40 changes: 32 additions & 8 deletions src/databricks/sql/backend/sea/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import re
from typing import Any, Dict, Tuple, List, Optional, Union, TYPE_CHECKING, Set

from databricks.sql.backend.sea.models.base import ResultManifest, StatementStatus
from databricks.sql.backend.sea.models.base import (
ExternalLink,
ResultManifest,
StatementStatus,
)
from databricks.sql.backend.sea.models.responses import GetChunksResponse
from databricks.sql.backend.sea.utils.constants import (
ALLOWED_SESSION_CONF_TO_DEFAULT_VALUES_MAP,
ResultFormat,
Expand All @@ -19,7 +24,7 @@
if TYPE_CHECKING:
from databricks.sql.client import Cursor

from databricks.sql.result_set import SeaResultSet
from databricks.sql.backend.sea.result_set import SeaResultSet

from databricks.sql.backend.databricks_client import DatabricksClient
from databricks.sql.backend.types import (
Expand Down Expand Up @@ -110,6 +115,7 @@ class SeaDatabricksClient(DatabricksClient):
STATEMENT_PATH = BASE_PATH + "statements"
STATEMENT_PATH_WITH_ID = STATEMENT_PATH + "/{}"
CANCEL_STATEMENT_PATH_WITH_ID = STATEMENT_PATH + "/{}/cancel"
CHUNK_PATH_WITH_ID_AND_INDEX = STATEMENT_PATH + "/{}/result/chunks/{}"

# SEA constants
POLL_INTERVAL_SECONDS = 0.2
Expand Down Expand Up @@ -296,7 +302,7 @@ def close_session(self, session_id: SessionId) -> None:

def _extract_description_from_manifest(
self, manifest: ResultManifest
) -> Optional[List]:
) -> List[Tuple]:
"""
Extract column description from a manifest object, in the format defined by
the spec: https://peps.python.org/pep-0249/#description
Expand All @@ -311,9 +317,6 @@ def _extract_description_from_manifest(
schema_data = manifest.schema
columns_data = schema_data.get("columns", [])

if not columns_data:
return None

columns = []
for col_data in columns_data:
# Format: (name, type_code, display_size, internal_size, precision, scale, null_ok)
Expand All @@ -337,7 +340,7 @@ def _extract_description_from_manifest(
)
)

return columns if columns else None
return columns

def _results_message_to_execute_response(
self, response: Union[ExecuteStatementResponse, GetStatementResponse]
Expand All @@ -358,7 +361,7 @@ def _results_message_to_execute_response(

# Check for compression
lz4_compressed = (
response.manifest.result_compression == ResultCompression.LZ4_FRAME
response.manifest.result_compression == ResultCompression.LZ4_FRAME.value
)

execute_response = ExecuteResponse(
Expand Down Expand Up @@ -647,6 +650,27 @@ def get_execution_result(
response = self._poll_query(command_id)
return self._response_to_result_set(response, cursor)

def get_chunk_links(
self, statement_id: str, chunk_index: int
) -> List[ExternalLink]:
"""
Get links for chunks starting from the specified index.
Args:
statement_id: The statement ID
chunk_index: The starting chunk index
Returns:
ExternalLink: External link for the chunk
"""

response_data = self._http_client._make_request(
method="GET",
path=self.CHUNK_PATH_WITH_ID_AND_INDEX.format(statement_id, chunk_index),
)
response = GetChunksResponse.from_dict(response_data)

links = response.external_links or []
return links

# == Metadata Operations ==

def get_catalogs(
Expand Down
2 changes: 2 additions & 0 deletions src/databricks/sql/backend/sea/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
ExecuteStatementResponse,
GetStatementResponse,
CreateSessionResponse,
GetChunksResponse,
)

__all__ = [
Expand All @@ -47,4 +48,5 @@
"ExecuteStatementResponse",
"GetStatementResponse",
"CreateSessionResponse",
"GetChunksResponse",
]
34 changes: 34 additions & 0 deletions src/databricks/sql/backend/sea/models/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,37 @@ class CreateSessionResponse:
def from_dict(cls, data: Dict[str, Any]) -> "CreateSessionResponse":
"""Create a CreateSessionResponse from a dictionary."""
return cls(session_id=data.get("session_id", ""))


@dataclass
class GetChunksResponse:
"""
Response from getting chunks for a statement.

The response model can be found in the docs, here:
https://docs.databricks.com/api/workspace/statementexecution/getstatementresultchunkn
"""

data: Optional[List[List[Any]]] = None
external_links: Optional[List[ExternalLink]] = None
byte_count: Optional[int] = None
chunk_index: Optional[int] = None
next_chunk_index: Optional[int] = None
next_chunk_internal_link: Optional[str] = None
row_count: Optional[int] = None
row_offset: Optional[int] = None

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "GetChunksResponse":
"""Create a GetChunksResponse from a dictionary."""
result = _parse_result({"result": data})
return cls(
data=result.data,
external_links=result.external_links,
byte_count=result.byte_count,
chunk_index=result.chunk_index,
next_chunk_index=result.next_chunk_index,
next_chunk_internal_link=result.next_chunk_internal_link,
row_count=result.row_count,
row_offset=result.row_offset,
)
Loading
Loading