@@ -45,6 +45,8 @@ def __init__(
45
45
results_queue = None ,
46
46
description = None ,
47
47
is_staging_operation : bool = False ,
48
+ lz4_compressed : bool = False ,
49
+ arrow_schema_bytes : Optional [bytes ] = b"" ,
48
50
):
49
51
"""
50
52
A ResultSet manages the results of a single command.
@@ -75,6 +77,8 @@ def __init__(
75
77
self .is_direct_results = is_direct_results
76
78
self .results = results_queue
77
79
self ._is_staging_operation = is_staging_operation
80
+ self .lz4_compressed = lz4_compressed
81
+ self ._arrow_schema_bytes = arrow_schema_bytes
78
82
79
83
def __iter__ (self ):
80
84
while True :
@@ -177,10 +181,10 @@ def __init__(
177
181
:param ssl_options: SSL options for cloud fetch
178
182
:param is_direct_results: Whether there are more rows to fetch
179
183
"""
184
+
180
185
# Initialize ThriftResultSet-specific attributes
181
- self ._arrow_schema_bytes = execute_response .arrow_schema_bytes
182
186
self ._use_cloud_fetch = use_cloud_fetch
183
- self .lz4_compressed = execute_response . lz4_compressed
187
+ self .is_direct_results = is_direct_results
184
188
185
189
# Build the results queue if t_row_set is provided
186
190
results_queue = None
@@ -211,6 +215,8 @@ def __init__(
211
215
results_queue = results_queue ,
212
216
description = execute_response .description ,
213
217
is_staging_operation = execute_response .is_staging_operation ,
218
+ lz4_compressed = execute_response .lz4_compressed ,
219
+ arrow_schema_bytes = execute_response .arrow_schema_bytes ,
214
220
)
215
221
216
222
# Initialize results queue if not provided
@@ -438,3 +444,82 @@ def map_col_type(type_):
438
444
(column .name , map_col_type (column .datatype ), None , None , None , None , None )
439
445
for column in table_schema_message .columns
440
446
]
447
+
448
+
449
+ class SeaResultSet (ResultSet ):
450
+ """ResultSet implementation for SEA backend."""
451
+
452
+ def __init__ (
453
+ self ,
454
+ connection : "Connection" ,
455
+ execute_response : "ExecuteResponse" ,
456
+ sea_client : "SeaDatabricksClient" ,
457
+ buffer_size_bytes : int = 104857600 ,
458
+ arraysize : int = 10000 ,
459
+ result_data = None ,
460
+ manifest = None ,
461
+ ):
462
+ """
463
+ Initialize a SeaResultSet with the response from a SEA query execution.
464
+
465
+ Args:
466
+ connection: The parent connection
467
+ execute_response: Response from the execute command
468
+ sea_client: The SeaDatabricksClient instance for direct access
469
+ buffer_size_bytes: Buffer size for fetching results
470
+ arraysize: Default number of rows to fetch
471
+ result_data: Result data from SEA response (optional)
472
+ manifest: Manifest from SEA response (optional)
473
+ """
474
+
475
+ super ().__init__ (
476
+ connection = connection ,
477
+ backend = sea_client ,
478
+ arraysize = arraysize ,
479
+ buffer_size_bytes = buffer_size_bytes ,
480
+ command_id = execute_response .command_id ,
481
+ status = execute_response .status ,
482
+ has_been_closed_server_side = execute_response .has_been_closed_server_side ,
483
+ description = execute_response .description ,
484
+ is_staging_operation = execute_response .is_staging_operation ,
485
+ lz4_compressed = execute_response .lz4_compressed ,
486
+ arrow_schema_bytes = execute_response .arrow_schema_bytes ,
487
+ )
488
+
489
+ def _fill_results_buffer (self ):
490
+ """Fill the results buffer from the backend."""
491
+ raise NotImplementedError (
492
+ "_fill_results_buffer is not implemented for SEA backend"
493
+ )
494
+
495
+ def fetchone (self ) -> Optional [Row ]:
496
+ """
497
+ Fetch the next row of a query result set, returning a single sequence,
498
+ or None when no more data is available.
499
+ """
500
+
501
+ raise NotImplementedError ("fetchone is not implemented for SEA backend" )
502
+
503
+ def fetchmany (self , size : Optional [int ] = None ) -> List [Row ]:
504
+ """
505
+ Fetch the next set of rows of a query result, returning a list of rows.
506
+
507
+ An empty sequence is returned when no more rows are available.
508
+ """
509
+
510
+ raise NotImplementedError ("fetchmany is not implemented for SEA backend" )
511
+
512
+ def fetchall (self ) -> List [Row ]:
513
+ """
514
+ Fetch all (remaining) rows of a query result, returning them as a list of rows.
515
+ """
516
+
517
+ raise NotImplementedError ("fetchall is not implemented for SEA backend" )
518
+
519
+ def fetchmany_arrow (self , size : int ) -> Any :
520
+ """Fetch the next set of rows as an Arrow table."""
521
+ raise NotImplementedError ("fetchmany_arrow is not implemented for SEA backend" )
522
+
523
+ def fetchall_arrow (self ) -> Any :
524
+ """Fetch all remaining rows as an Arrow table."""
525
+ raise NotImplementedError ("fetchall_arrow is not implemented for SEA backend" )
0 commit comments