From ed0f73bc784325cb0ee164bde5e5952d626fc9ff Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Tue, 25 Feb 2025 21:10:33 +0530 Subject: [PATCH 1/5] Fixed the async issue --- src/databricks/sql/thrift_backend.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 9beab0371..b8c23a5cd 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -898,8 +898,10 @@ def execute_command( sessionHandle=session_handle, statement=operation, runAsync=True, + # For async operation we don't want the direct results getDirectResults=ttypes.TSparkGetDirectResults( - maxRows=max_rows, maxBytes=max_bytes + maxRows=0 if async_op else max_rows, + maxBytes=0 if async_op else max_bytes, ), canReadArrowResult=True if pyarrow else False, canDecompressLZ4Result=lz4_compression, From 3b51b4ee36cb567998b4dd534f0b8dc265358e96 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Thu, 27 Feb 2025 12:19:03 +0530 Subject: [PATCH 2/5] Added unit tests --- src/databricks/sql/thrift_backend.py | 8 ++-- tests/e2e/test_driver.py | 66 +++++++++++++++++++++++++--- 2 files changed, 64 insertions(+), 10 deletions(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index b8c23a5cd..7bb0ae3ee 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -899,9 +899,11 @@ def execute_command( statement=operation, runAsync=True, # For async operation we don't want the direct results - getDirectResults=ttypes.TSparkGetDirectResults( - maxRows=0 if async_op else max_rows, - maxBytes=0 if async_op else max_bytes, + getDirectResults=None + if async_op + else ttypes.TSparkGetDirectResults( + maxRows=max_rows, + maxBytes=max_bytes, ), canReadArrowResult=True if pyarrow else False, canDecompressLZ4Result=lz4_compression, diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index 2f0881cda..45fea480b 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -177,19 +177,22 @@ def test_cloud_fetch(self): for i in range(len(cf_result)): assert cf_result[i] == noop_result[i] - def test_execute_async(self): - def isExecuting(operation_state): - return not operation_state or operation_state in [ - ttypes.TOperationState.RUNNING_STATE, - ttypes.TOperationState.PENDING_STATE, - ] + +class TestPySQLAsyncQueriesSuite(PySQLPytestTestCase): + def isExecuting(self, operation_state): + return not operation_state or operation_state in [ + ttypes.TOperationState.RUNNING_STATE, + ttypes.TOperationState.PENDING_STATE, + ] + + def test_execute_async__long_running(self): long_running_query = "SELECT COUNT(*) FROM RANGE(10000 * 16) x JOIN RANGE(10000) y ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'" with self.cursor() as cursor: cursor.execute_async(long_running_query) ## Polling after every POLLING_INTERVAL seconds - while isExecuting(cursor.get_query_state()): + while self.isExecuting(cursor.get_query_state()): time.sleep(self.POLLING_INTERVAL) log.info("Polling the status in test_execute_async") @@ -198,6 +201,55 @@ def isExecuting(operation_state): assert result[0].asDict() == {"count(1)": 0} + def test_execute_async__small_result(self): + small_result_query = "SELECT 1" + + with self.cursor() as cursor: + cursor.execute_async(small_result_query) + + ## Fake sleep for 5 secs + time.sleep(5) + + ## Polling after every POLLING_INTERVAL seconds + while self.isExecuting(cursor.get_query_state()): + time.sleep(self.POLLING_INTERVAL) + log.info("Polling the status in test_execute_async") + + cursor.get_async_execution_result() + result = cursor.fetchall() + + assert result[0].asDict() == {"1": 1} + + def test_execute_async__large_result(self): + x_dimension = 1000 + y_dimension = 1000 + large_result_query = f""" + SELECT + x.id AS x_id, + y.id AS y_id, + FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') AS date + FROM + RANGE({x_dimension}) x + JOIN + RANGE({y_dimension}) y + """ + + with self.cursor() as cursor: + cursor.execute_async(large_result_query) + + ## Fake sleep for 5 secs + time.sleep(5) + + ## Polling after every POLLING_INTERVAL seconds + while self.isExecuting(cursor.get_query_state()): + time.sleep(self.POLLING_INTERVAL) + log.info("Polling the status in test_execute_async") + + cursor.get_async_execution_result() + result = cursor.fetchall() + + assert len(result) == x_dimension * y_dimension + # Exclude Retry tests because they require specific setups, and LargeQueries too slow for core # tests From d84c15bc055d733aea84bc5a695395f45ac3e973 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Thu, 27 Feb 2025 17:10:33 +0530 Subject: [PATCH 3/5] Minor change --- tests/e2e/common/large_queries_mixin.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/e2e/common/large_queries_mixin.py b/tests/e2e/common/large_queries_mixin.py index 41ef029bb..29231103a 100644 --- a/tests/e2e/common/large_queries_mixin.py +++ b/tests/e2e/common/large_queries_mixin.py @@ -83,11 +83,11 @@ def test_query_with_large_narrow_result_set(self): assert row[0] == row_id def test_long_running_query(self): - """Incrementally increase query size until it takes at least 5 minutes, + """Incrementally increase query size until it takes at least 4 minutes, and asserts that the query completes successfully. """ minutes = 60 - min_duration = 5 * minutes + min_duration = 4 * minutes duration = -1 scale0 = 10000 @@ -113,5 +113,5 @@ def test_long_running_query(self): duration = time.time() - start current_fraction = duration / min_duration print("Took {} s with scale factor={}".format(duration, scale_factor)) - # Extrapolate linearly to reach 5 min and add 50% padding to push over the limit + # Extrapolate linearly to reach 4 min and add 50% padding to push over the limit scale_factor = math.ceil(1.5 * scale_factor / current_fraction) From 0c0ea0cf838303cda7ccde459eb400c1f2ce48a6 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Thu, 27 Feb 2025 17:45:17 +0530 Subject: [PATCH 4/5] Changed to 5 --- tests/e2e/common/large_queries_mixin.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/e2e/common/large_queries_mixin.py b/tests/e2e/common/large_queries_mixin.py index 29231103a..41ef029bb 100644 --- a/tests/e2e/common/large_queries_mixin.py +++ b/tests/e2e/common/large_queries_mixin.py @@ -83,11 +83,11 @@ def test_query_with_large_narrow_result_set(self): assert row[0] == row_id def test_long_running_query(self): - """Incrementally increase query size until it takes at least 4 minutes, + """Incrementally increase query size until it takes at least 5 minutes, and asserts that the query completes successfully. """ minutes = 60 - min_duration = 4 * minutes + min_duration = 5 * minutes duration = -1 scale0 = 10000 @@ -113,5 +113,5 @@ def test_long_running_query(self): duration = time.time() - start current_fraction = duration / min_duration print("Took {} s with scale factor={}".format(duration, scale_factor)) - # Extrapolate linearly to reach 4 min and add 50% padding to push over the limit + # Extrapolate linearly to reach 5 min and add 50% padding to push over the limit scale_factor = math.ceil(1.5 * scale_factor / current_fraction) From f2a01b1cb840d0db7a3d67cf3e4de33d471369e6 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Thu, 27 Feb 2025 21:34:07 +0530 Subject: [PATCH 5/5] Increased time --- tests/e2e/common/large_queries_mixin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e/common/large_queries_mixin.py b/tests/e2e/common/large_queries_mixin.py index 41ef029bb..ed8ac4574 100644 --- a/tests/e2e/common/large_queries_mixin.py +++ b/tests/e2e/common/large_queries_mixin.py @@ -94,7 +94,7 @@ def test_long_running_query(self): scale_factor = 1 with self.cursor() as cursor: while duration < min_duration: - assert scale_factor < 512, "Detected infinite loop" + assert scale_factor < 1024, "Detected infinite loop" start = time.time() cursor.execute(