Skip to content

Commit 1266863

Browse files
Revert "SEA: Decouple Link Fetching (#632)"
This reverts commit 806e5f5.
1 parent 24332c8 commit 1266863

File tree

2 files changed

+89
-371
lines changed

2 files changed

+89
-371
lines changed
Lines changed: 58 additions & 201 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
from __future__ import annotations
22

33
from abc import ABC
4-
import threading
5-
from typing import Dict, List, Optional, Tuple, Union, TYPE_CHECKING
4+
from typing import List, Optional, Tuple, Union, TYPE_CHECKING
65

76
from databricks.sql.cloudfetch.download_manager import ResultFileDownloadManager
87
from databricks.sql.telemetry.models.enums import StatementType
@@ -123,179 +122,6 @@ def close(self):
123122
return
124123

125124

126-
class LinkFetcher:
127-
"""
128-
Background helper that incrementally retrieves *external links* for a
129-
result set produced by the SEA backend and feeds them to a
130-
:class:`databricks.sql.cloudfetch.download_manager.ResultFileDownloadManager`.
131-
132-
The SEA backend splits large result sets into *chunks*. Each chunk is
133-
stored remotely (e.g., in object storage) and exposed via a signed URL
134-
encapsulated by an :class:`ExternalLink`. Only the first batch of links is
135-
returned with the initial query response. The remaining links must be
136-
pulled on demand using the *next-chunk* token embedded in each
137-
:pyattr:`ExternalLink.next_chunk_index`.
138-
139-
LinkFetcher takes care of this choreography so callers (primarily
140-
``SeaCloudFetchQueue``) can simply ask for the link of a specific
141-
``chunk_index`` and block until it becomes available.
142-
143-
Key responsibilities:
144-
145-
• Maintain an in-memory mapping from ``chunk_index`` → ``ExternalLink``.
146-
• Launch a background worker thread that continuously requests the next
147-
batch of links from the backend until all chunks have been discovered or
148-
an unrecoverable error occurs.
149-
• Bridge SEA link objects to the Thrift representation expected by the
150-
existing download manager.
151-
• Provide a synchronous API (`get_chunk_link`) that blocks until the desired
152-
link is present in the cache.
153-
"""
154-
155-
def __init__(
156-
self,
157-
download_manager: ResultFileDownloadManager,
158-
backend: SeaDatabricksClient,
159-
statement_id: str,
160-
initial_links: List[ExternalLink],
161-
total_chunk_count: int,
162-
):
163-
self.download_manager = download_manager
164-
self.backend = backend
165-
self._statement_id = statement_id
166-
167-
self._shutdown_event = threading.Event()
168-
169-
self._link_data_update = threading.Condition()
170-
self._error: Optional[Exception] = None
171-
self.chunk_index_to_link: Dict[int, ExternalLink] = {}
172-
173-
self._add_links(initial_links)
174-
self.total_chunk_count = total_chunk_count
175-
176-
# DEBUG: capture initial state for observability
177-
logger.debug(
178-
"LinkFetcher[%s]: initialized with %d initial link(s); expecting %d total chunk(s)",
179-
statement_id,
180-
len(initial_links),
181-
total_chunk_count,
182-
)
183-
184-
def _add_links(self, links: List[ExternalLink]):
185-
"""Cache *links* locally and enqueue them with the download manager."""
186-
logger.debug(
187-
"LinkFetcher[%s]: caching %d link(s) – chunks %s",
188-
self._statement_id,
189-
len(links),
190-
", ".join(str(l.chunk_index) for l in links) if links else "<none>",
191-
)
192-
for link in links:
193-
self.chunk_index_to_link[link.chunk_index] = link
194-
self.download_manager.add_link(LinkFetcher._convert_to_thrift_link(link))
195-
196-
def _get_next_chunk_index(self) -> Optional[int]:
197-
"""Return the next *chunk_index* that should be requested from the backend, or ``None`` if we have them all."""
198-
with self._link_data_update:
199-
max_chunk_index = max(self.chunk_index_to_link.keys(), default=None)
200-
if max_chunk_index is None:
201-
return 0
202-
max_link = self.chunk_index_to_link[max_chunk_index]
203-
return max_link.next_chunk_index
204-
205-
def _trigger_next_batch_download(self) -> bool:
206-
"""Fetch the next batch of links from the backend and return *True* on success."""
207-
logger.debug(
208-
"LinkFetcher[%s]: requesting next batch of links", self._statement_id
209-
)
210-
next_chunk_index = self._get_next_chunk_index()
211-
if next_chunk_index is None:
212-
return False
213-
214-
try:
215-
links = self.backend.get_chunk_links(self._statement_id, next_chunk_index)
216-
with self._link_data_update:
217-
self._add_links(links)
218-
self._link_data_update.notify_all()
219-
except Exception as e:
220-
logger.error(
221-
f"LinkFetcher: Error fetching links for chunk {next_chunk_index}: {e}"
222-
)
223-
with self._link_data_update:
224-
self._error = e
225-
self._link_data_update.notify_all()
226-
return False
227-
228-
logger.debug(
229-
"LinkFetcher[%s]: received %d new link(s)",
230-
self._statement_id,
231-
len(links),
232-
)
233-
return True
234-
235-
def get_chunk_link(self, chunk_index: int) -> Optional[ExternalLink]:
236-
"""Return (blocking) the :class:`ExternalLink` associated with *chunk_index*."""
237-
logger.debug(
238-
"LinkFetcher[%s]: waiting for link of chunk %d",
239-
self._statement_id,
240-
chunk_index,
241-
)
242-
if chunk_index >= self.total_chunk_count:
243-
return None
244-
245-
with self._link_data_update:
246-
while chunk_index not in self.chunk_index_to_link:
247-
if self._error:
248-
raise self._error
249-
if self._shutdown_event.is_set():
250-
raise ProgrammingError(
251-
"LinkFetcher is shutting down without providing link for chunk index {}".format(
252-
chunk_index
253-
)
254-
)
255-
self._link_data_update.wait()
256-
257-
return self.chunk_index_to_link[chunk_index]
258-
259-
@staticmethod
260-
def _convert_to_thrift_link(link: ExternalLink) -> TSparkArrowResultLink:
261-
"""Convert SEA external links to Thrift format for compatibility with existing download manager."""
262-
# Parse the ISO format expiration time
263-
expiry_time = int(dateutil.parser.parse(link.expiration).timestamp())
264-
return TSparkArrowResultLink(
265-
fileLink=link.external_link,
266-
expiryTime=expiry_time,
267-
rowCount=link.row_count,
268-
bytesNum=link.byte_count,
269-
startRowOffset=link.row_offset,
270-
httpHeaders=link.http_headers or {},
271-
)
272-
273-
def _worker_loop(self):
274-
"""Entry point for the background thread."""
275-
logger.debug("LinkFetcher[%s]: worker thread started", self._statement_id)
276-
while not self._shutdown_event.is_set():
277-
links_downloaded = self._trigger_next_batch_download()
278-
if not links_downloaded:
279-
self._shutdown_event.set()
280-
logger.debug("LinkFetcher[%s]: worker thread exiting", self._statement_id)
281-
self._link_data_update.notify_all()
282-
283-
def start(self):
284-
"""Spawn the worker thread."""
285-
logger.debug("LinkFetcher[%s]: starting worker thread", self._statement_id)
286-
self._worker_thread = threading.Thread(
287-
target=self._worker_loop, name=f"LinkFetcher-{self._statement_id}"
288-
)
289-
self._worker_thread.start()
290-
291-
def stop(self):
292-
"""Signal the worker thread to stop and wait for its termination."""
293-
logger.debug("LinkFetcher[%s]: stopping worker thread", self._statement_id)
294-
self._shutdown_event.set()
295-
self._worker_thread.join()
296-
logger.debug("LinkFetcher[%s]: worker thread stopped", self._statement_id)
297-
298-
299125
class SeaCloudFetchQueue(CloudFetchQueue):
300126
"""Queue implementation for EXTERNAL_LINKS disposition with ARROW format for SEA backend."""
301127

@@ -337,49 +163,80 @@ def __init__(
337163
chunk_id=0,
338164
)
339165

166+
self._sea_client = sea_client
167+
self._statement_id = statement_id
168+
self._total_chunk_count = total_chunk_count
169+
340170
logger.debug(
341171
"SeaCloudFetchQueue: Initialize CloudFetch loader for statement {}, total chunks: {}".format(
342172
statement_id, total_chunk_count
343173
)
344174
)
345175

346176
initial_links = result_data.external_links or []
177+
self._chunk_index_to_link = {link.chunk_index: link for link in initial_links}
347178

348179
# Track the current chunk we're processing
349180
self._current_chunk_index = 0
181+
first_link = self._chunk_index_to_link.get(self._current_chunk_index, None)
182+
if not first_link:
183+
# possibly an empty response
184+
return None
350185

351-
self.link_fetcher = None # for empty responses, we do not need a link fetcher
352-
if total_chunk_count > 0:
353-
self.link_fetcher = LinkFetcher(
354-
download_manager=self.download_manager,
355-
backend=sea_client,
356-
statement_id=statement_id,
357-
initial_links=initial_links,
358-
total_chunk_count=total_chunk_count,
359-
)
360-
self.link_fetcher.start()
361-
186+
# Track the current chunk we're processing
187+
self._current_chunk_index = 0
362188
# Initialize table and position
363-
self.table = self._create_next_table()
189+
self.table = self._create_table_from_link(first_link)
364190

365-
def _create_next_table(self) -> Union["pyarrow.Table", None]:
366-
"""Create next table by retrieving the logical next downloaded file."""
367-
if self.link_fetcher is None:
368-
return None
191+
def _convert_to_thrift_link(self, link: ExternalLink) -> TSparkArrowResultLink:
192+
"""Convert SEA external links to Thrift format for compatibility with existing download manager."""
193+
# Parse the ISO format expiration time
194+
expiry_time = int(dateutil.parser.parse(link.expiration).timestamp())
195+
return TSparkArrowResultLink(
196+
fileLink=link.external_link,
197+
expiryTime=expiry_time,
198+
rowCount=link.row_count,
199+
bytesNum=link.byte_count,
200+
startRowOffset=link.row_offset,
201+
httpHeaders=link.http_headers or {},
202+
)
369203

370-
chunk_link = self.link_fetcher.get_chunk_link(self._current_chunk_index)
371-
if chunk_link is None:
204+
def _get_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
205+
if chunk_index >= self._total_chunk_count:
372206
return None
373207

374-
row_offset = chunk_link.row_offset
375-
# NOTE: link has already been submitted to download manager at this point
376-
arrow_table = self._create_table_at_offset(row_offset)
208+
if chunk_index not in self._chunk_index_to_link:
209+
links = self._sea_client.get_chunk_links(self._statement_id, chunk_index)
210+
self._chunk_index_to_link.update({l.chunk_index: l for l in links})
211+
212+
link = self._chunk_index_to_link.get(chunk_index, None)
213+
if not link:
214+
raise ServerOperationError(
215+
f"Error fetching link for chunk {chunk_index}",
216+
{
217+
"operation-id": self._statement_id,
218+
"diagnostic-info": None,
219+
},
220+
)
221+
return link
377222

378-
self._current_chunk_index += 1
223+
def _create_table_from_link(
224+
self, link: ExternalLink
225+
) -> Union["pyarrow.Table", None]:
226+
"""Create a table from a link."""
227+
228+
thrift_link = self._convert_to_thrift_link(link)
229+
self.download_manager.add_link(thrift_link)
230+
231+
row_offset = link.row_offset
232+
arrow_table = self._create_table_at_offset(row_offset)
379233

380234
return arrow_table
381235

382-
def close(self):
383-
super().close()
384-
if self.link_fetcher:
385-
self.link_fetcher.stop()
236+
def _create_next_table(self) -> Union["pyarrow.Table", None]:
237+
"""Create next table by retrieving the logical next downloaded file."""
238+
self._current_chunk_index += 1
239+
next_chunk_link = self._get_chunk_link(self._current_chunk_index)
240+
if not next_chunk_link:
241+
return None
242+
return self._create_table_from_link(next_chunk_link)

0 commit comments

Comments
 (0)