Skip to content

Complete Fetch Phase (EXTERNAL_LINKS disposition and ARROW format) #598

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 329 commits into from
Jul 16, 2025

Conversation

varun-edachali-dbx
Copy link
Collaborator

@varun-edachali-dbx varun-edachali-dbx commented Jun 16, 2025

What type of PR is this?

  • Feature

Description

Complete the Fetch phase of the SEA implementation for EXTERNAL_LINKS disposition and ARROW_STREAM format. Introduce a new CloudFetchQueue.

Note that this is not yet feature complete, with some known failures in the following domains (for the SEA backend only):

  • retries
  • native, complex params
  • metadata queries

These will be resolved in future PRs.

How is this tested?

  • Unit tests
  • E2E Tests
  • Manually - using the test scripts invoked in examples/experimental/sea_connector_test.py.
  • N/A

The coverage of the key classes by the unit tests are as below:

Module Statements Missing Coverage Notes
backend/sea/queue.py (SeaCloudFetchQueue and SeaResultSetQueueFactory classes) 92 2 98% Missing lines are pyarrow import exception handling (lines 10-11) which are not meant to be covered in runtime tests
backend/sea/result_set.py (SeaResultSet class) 85 3 96% Missing lines are pyarrow import exception handling (lines 13-14) and TYPE_CHECKING import (line 17) which are not meant to be covered in runtime tests

Related Tickets & Documents

Design Doc
PECOBLR-553

Signed-off-by: varun-edachali-dbx <[email protected]>
covered by #588

Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
only relevant in Fetch phase

Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
This reverts commit 8bd12d8, reversing
changes made to 030edf8.
This reverts commit be1997e, reversing
changes made to 37813ba.
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
Signed-off-by: varun-edachali-dbx <[email protected]>
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
@databricks databricks deleted a comment from github-actions bot Jul 15, 2025
Comment on lines +134 to +211
super().__init__(
max_download_threads=max_download_threads,
ssl_options=ssl_options,
schema_bytes=None,
lz4_compressed=lz4_compressed,
description=description,
)

self._sea_client = sea_client
self._statement_id = statement_id
self._total_chunk_count = total_chunk_count

logger.debug(
"SeaCloudFetchQueue: Initialize CloudFetch loader for statement {}, total chunks: {}".format(
statement_id, total_chunk_count
)
)

initial_links = result_data.external_links or []
first_link = next((l for l in initial_links if l.chunk_index == 0), None)
if not first_link:
# possibly an empty response
return None

# Track the current chunk we're processing
self._current_chunk_index = 0
# Initialize table and position
self.table = self._create_table_from_link(first_link)

def _convert_to_thrift_link(self, link: ExternalLink) -> TSparkArrowResultLink:
"""Convert SEA external links to Thrift format for compatibility with existing download manager."""
# Parse the ISO format expiration time
expiry_time = int(dateutil.parser.parse(link.expiration).timestamp())
return TSparkArrowResultLink(
fileLink=link.external_link,
expiryTime=expiry_time,
rowCount=link.row_count,
bytesNum=link.byte_count,
startRowOffset=link.row_offset,
httpHeaders=link.http_headers or {},
)

def _get_chunk_link(self, chunk_index: int) -> Optional[ExternalLink]:
"""Progress to the next chunk link."""
if chunk_index >= self._total_chunk_count:
return None

try:
return self._sea_client.get_chunk_link(self._statement_id, chunk_index)
except Exception as e:
raise ServerOperationError(
f"Error fetching link for chunk {chunk_index}: {e}",
{
"operation-id": self._statement_id,
"diagnostic-info": None,
},
)

def _create_table_from_link(
self, link: ExternalLink
) -> Union["pyarrow.Table", None]:
"""Create a table from a link."""

thrift_link = self._convert_to_thrift_link(link)
self.download_manager.add_link(thrift_link)

row_offset = link.row_offset
arrow_table = self._create_table_at_offset(row_offset)

return arrow_table

def _create_next_table(self) -> Union["pyarrow.Table", None]:
"""Create next table by retrieving the logical next downloaded file."""
self._current_chunk_index += 1
next_chunk_link = self._get_chunk_link(self._current_chunk_index)
if not next_chunk_link:
return None
return self._create_table_from_link(next_chunk_link)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this multi-threaded? _create_table_from_link adds link one-by-one. _create_next_table calls _create_table_from_link. This seems to be serial execution

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first you will consume arrow table from one link, then call create_next_table to get table from next link and so on. link gets added to queue once the previous link is consumed. isn't this wasteful?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be solved in future PRs where you introduce a separate link downloader?

Copy link
Collaborator Author

@varun-edachali-dbx varun-edachali-dbx Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will be solved in the future PR where I introduce a separate link downloader. There, I use the multithreading capabilities of the ResultFileDownloadManager.

I intentionally kept this one quite simple and essentially single threaded so link expiry is not a concern, we download a link and create the arrow table the moment we fetch it. That way, this PR in isolation is correct.

Copy link
Contributor

@jayantsing-db jayantsing-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments related to multi-threading in SEA

Comment on lines +335 to +336
if not self.schema_bytes:
return pyarrow.Table.from_pydict({})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was this added to existing code? Is this just precautionary?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thrift provides us with arrow schema bytes, SEA does not. For the Thrift (empty) table, we can augment the column schema information into the pyarrow table, but for SEA, we return a completely empty table without this information.

This code essentially augments the schema information if it is available (which is the case for Thrift, but not SEA).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. For SEA we don't have easy way to put schema information from ResultManifest into an empty table?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is doable, but introduces some complexity - we would have to map each type given to us in the description (the 7 tuple containing name, type, etc.) to a corresponding pyarrow type (pa.int64(), pa.string(), etc.) and use that to define our pyarrow table schema, and then return an empty table. I do not see a way to do this without creating a manual mapping ("STRING" -> pa.string() etc.) so it could introduce some complexity and take some time.

If this is worth doing, I'll create a ticket and begin working on it soon.

@varun-edachali-dbx varun-edachali-dbx merged commit 1a0575a into sea-migration Jul 16, 2025
22 of 24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants