Skip to content

SEA: add support for Hybrid disposition #631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 18 commits into
base: ext-links-sea
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/experimental/tests/test_sea_async_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def test_sea_async_query_with_cloud_fetch():
use_sea=True,
user_agent_entry="SEA-Test-Client",
use_cloud_fetch=True,
enable_query_result_lz4_compression=False,
)

logger.info(
Expand Down
1 change: 1 addition & 0 deletions examples/experimental/tests/test_sea_sync_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def test_sea_sync_query_with_cloud_fetch():
use_sea=True,
user_agent_entry="SEA-Test-Client",
use_cloud_fetch=True,
enable_query_result_lz4_compression=False,
)

logger.info(
Expand Down
24 changes: 11 additions & 13 deletions src/databricks/sql/backend/sea/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ def __init__(
"_use_arrow_native_complex_types", True
)

self.use_hybrid_disposition = kwargs.get("use_hybrid_disposition", True)

# Extract warehouse ID from http_path
self.warehouse_id = self._extract_warehouse_id(http_path)

Expand Down Expand Up @@ -456,7 +458,11 @@ def execute_command(
ResultFormat.ARROW_STREAM if use_cloud_fetch else ResultFormat.JSON_ARRAY
).value
disposition = (
ResultDisposition.EXTERNAL_LINKS
(
ResultDisposition.HYBRID
if self.use_hybrid_disposition
else ResultDisposition.EXTERNAL_LINKS
)
if use_cloud_fetch
else ResultDisposition.INLINE
).value
Expand Down Expand Up @@ -637,7 +643,9 @@ def get_execution_result(
arraysize=cursor.arraysize,
)

def get_chunk_link(self, statement_id: str, chunk_index: int) -> ExternalLink:
def get_chunk_links(
self, statement_id: str, chunk_index: int
) -> List[ExternalLink]:
"""
Get links for chunks starting from the specified index.
Args:
Expand All @@ -654,17 +662,7 @@ def get_chunk_link(self, statement_id: str, chunk_index: int) -> ExternalLink:
response = GetChunksResponse.from_dict(response_data)

links = response.external_links or []
link = next((l for l in links if l.chunk_index == chunk_index), None)
if not link:
raise ServerOperationError(
f"No link found for chunk index {chunk_index}",
{
"operation-id": statement_id,
"diagnostic-info": None,
},
)

return link
return links

# == Metadata Operations ==

Expand Down
8 changes: 7 additions & 1 deletion src/databricks/sql/backend/sea/models/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
These models define the structures used in SEA API responses.
"""

import base64
from typing import Dict, Any, List, Optional
from dataclasses import dataclass

Expand Down Expand Up @@ -91,6 +92,11 @@ def _parse_result(data: Dict[str, Any]) -> ResultData:
)
)

# Handle attachment field - decode from base64 if present
attachment = result_data.get("attachment")
if attachment is not None:
attachment = base64.b64decode(attachment)

return ResultData(
data=result_data.get("data_array"),
external_links=external_links,
Expand All @@ -100,7 +106,7 @@ def _parse_result(data: Dict[str, Any]) -> ResultData:
next_chunk_internal_link=result_data.get("next_chunk_internal_link"),
row_count=result_data.get("row_count"),
row_offset=result_data.get("row_offset"),
attachment=result_data.get("attachment"),
attachment=attachment,
)


Expand Down
43 changes: 35 additions & 8 deletions src/databricks/sql/backend/sea/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from databricks.sql.cloudfetch.download_manager import ResultFileDownloadManager

from databricks.sql.cloudfetch.downloader import ResultSetDownloadHandler

try:
import pyarrow
except ImportError:
Expand All @@ -23,7 +25,12 @@
from databricks.sql.exc import ProgrammingError, ServerOperationError
from databricks.sql.thrift_api.TCLIService.ttypes import TSparkArrowResultLink
from databricks.sql.types import SSLOptions
from databricks.sql.utils import CloudFetchQueue, ResultSetQueue
from databricks.sql.utils import (
ArrowQueue,
CloudFetchQueue,
ResultSetQueue,
create_arrow_table_from_arrow_file,
)

import logging

Expand Down Expand Up @@ -62,6 +69,18 @@ def build_queue(
# INLINE disposition with JSON_ARRAY format
return JsonQueue(result_data.data)
elif manifest.format == ResultFormat.ARROW_STREAM.value:
if result_data.attachment is not None:
arrow_file = (
ResultSetDownloadHandler._decompress_data(result_data.attachment)
if lz4_compressed
else result_data.attachment
)
arrow_table = create_arrow_table_from_arrow_file(
arrow_file, description
)
logger.debug(f"Created arrow table with {arrow_table.num_rows} rows")
return ArrowQueue(arrow_table, manifest.total_row_count)

# EXTERNAL_LINKS disposition
return SeaCloudFetchQueue(
result_data=result_data,
Expand Down Expand Up @@ -142,6 +161,7 @@ def __init__(
self._sea_client = sea_client
self._statement_id = statement_id
self._total_chunk_count = total_chunk_count
self._total_chunk_count = total_chunk_count

logger.debug(
"SeaCloudFetchQueue: Initialize CloudFetch loader for statement {}, total chunks: {}".format(
Expand All @@ -150,7 +170,11 @@ def __init__(
)

initial_links = result_data.external_links or []
first_link = next((l for l in initial_links if l.chunk_index == 0), None)
self._chunk_index_to_link = {link.chunk_index: link for link in initial_links}

# Track the current chunk we're processing
self._current_chunk_index = 0
first_link = self._chunk_index_to_link.get(self._current_chunk_index, None)
if not first_link:
# possibly an empty response
return None
Expand All @@ -173,21 +197,24 @@ def _convert_to_thrift_link(self, link: ExternalLink) -> TSparkArrowResultLink:
httpHeaders=link.http_headers or {},
)

def _get_chunk_link(self, chunk_index: int) -> Optional[ExternalLink]:
"""Progress to the next chunk link."""
def _get_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
if chunk_index >= self._total_chunk_count:
return None

try:
return self._sea_client.get_chunk_link(self._statement_id, chunk_index)
except Exception as e:
if chunk_index not in self._chunk_index_to_link:
links = self._sea_client.get_chunk_links(self._statement_id, chunk_index)
self._chunk_index_to_link.update({l.chunk_index: l for l in links})

link = self._chunk_index_to_link.get(chunk_index, None)
if not link:
raise ServerOperationError(
f"Error fetching link for chunk {chunk_index}: {e}",
f"Error fetching link for chunk {chunk_index}",
{
"operation-id": self._statement_id,
"diagnostic-info": None,
},
)
return link

def _create_table_from_link(
self, link: ExternalLink
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/sql/backend/sea/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ResultFormat(Enum):
class ResultDisposition(Enum):
"""Enum for result disposition values."""

# TODO: add support for hybrid disposition
HYBRID = "INLINE_OR_EXTERNAL_LINKS"
EXTERNAL_LINKS = "EXTERNAL_LINKS"
INLINE = "INLINE"

Expand Down
4 changes: 4 additions & 0 deletions src/databricks/sql/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ def __init__(
Connect to a Databricks SQL endpoint or a Databricks cluster.

Parameters:
:param use_sea: `bool`, optional (default is False)
Use the SEA backend instead of the Thrift backend.
:param use_hybrid_disposition: `bool`, optional (default is False)
Use the hybrid disposition instead of the inline disposition.
:param server_hostname: Databricks instance host name.
:param http_path: Http path either to a DBSQL endpoint (e.g. /sql/1.0/endpoints/1234567890abcdef)
or to a DBR interactive cluster (e.g. /sql/protocolv1/o/1234567890123456/1234-123456-slid123)
Expand Down
41 changes: 16 additions & 25 deletions tests/unit/test_sea_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,8 +894,8 @@ def test_get_columns(self, sea_client, sea_session_id, mock_cursor):
)
assert "Catalog name is required for get_columns" in str(excinfo.value)

def test_get_chunk_link(self, sea_client, mock_http_client, sea_command_id):
"""Test get_chunk_link method."""
def test_get_chunk_links(self, sea_client, mock_http_client, sea_command_id):
"""Test get_chunk_links method when links are available."""
# Setup mock response
mock_response = {
"external_links": [
Expand All @@ -914,7 +914,7 @@ def test_get_chunk_link(self, sea_client, mock_http_client, sea_command_id):
mock_http_client._make_request.return_value = mock_response

# Call the method
result = sea_client.get_chunk_link("test-statement-123", 0)
results = sea_client.get_chunk_links("test-statement-123", 0)

# Verify the HTTP client was called correctly
mock_http_client._make_request.assert_called_once_with(
Expand All @@ -924,7 +924,10 @@ def test_get_chunk_link(self, sea_client, mock_http_client, sea_command_id):
),
)

# Verify the result
# Verify the results
assert isinstance(results, list)
assert len(results) == 1
result = results[0]
assert result.external_link == "https://example.com/data/chunk0"
assert result.expiration == "2025-07-03T05:51:18.118009"
assert result.row_count == 100
Expand All @@ -934,30 +937,14 @@ def test_get_chunk_link(self, sea_client, mock_http_client, sea_command_id):
assert result.next_chunk_index == 1
assert result.http_headers == {"Authorization": "Bearer token123"}

def test_get_chunk_link_not_found(self, sea_client, mock_http_client):
"""Test get_chunk_link when the requested chunk is not found."""
def test_get_chunk_links_empty(self, sea_client, mock_http_client):
"""Test get_chunk_links when no links are returned (empty list)."""
# Setup mock response with no matching chunk
mock_response = {
"external_links": [
{
"external_link": "https://example.com/data/chunk1",
"expiration": "2025-07-03T05:51:18.118009",
"row_count": 100,
"byte_count": 1024,
"row_offset": 100,
"chunk_index": 1, # Different chunk index
"next_chunk_index": 2,
"http_headers": {"Authorization": "Bearer token123"},
}
]
}
mock_response = {"external_links": []}
mock_http_client._make_request.return_value = mock_response

# Call the method and expect an exception
with pytest.raises(
ServerOperationError, match="No link found for chunk index 0"
):
sea_client.get_chunk_link("test-statement-123", 0)
# Call the method
results = sea_client.get_chunk_links("test-statement-123", 0)

# Verify the HTTP client was called correctly
mock_http_client._make_request.assert_called_once_with(
Expand All @@ -966,3 +953,7 @@ def test_get_chunk_link_not_found(self, sea_client, mock_http_client):
"test-statement-123", 0
),
)

# Verify the results are empty
assert isinstance(results, list)
assert results == []
Loading