-
Notifications
You must be signed in to change notification settings - Fork 114
Add e2e tests #12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Add e2e tests #12
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
d764c40
Re-introduce e2e tests
susodapop 9a6ebf2
Update workflow unit test invocation so we don't fire e2e tests too
susodapop f7da497
Add core tests workflow
susodapop a9d7a6d
Update contributing guide
susodapop c5ca014
Remove Databricks-specific references
susodapop cdcc109
Revert "Add core tests workflow"
susodapop 6d00003
Simplify wording in CONTRIBUTING doc
susodapop 91677cc
Fix chopped-off sentence
susodapop 63f1510
Cleaning up formatting
susodapop File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
import decimal | ||
import datetime | ||
from collections import namedtuple | ||
|
||
TypeFailure = namedtuple( | ||
"TypeFailure", "query,columnType,resultType,resultValue," | ||
"actualValue,actualType,description,conf") | ||
ResultFailure = namedtuple( | ||
"ResultFailure", "query,columnType,resultType,resultValue," | ||
"actualValue,actualType,description,conf") | ||
ExecFailure = namedtuple( | ||
"ExecFailure", "query,columnType,resultType,resultValue," | ||
"actualValue,actualType,description,conf,error") | ||
|
||
|
||
class SmokeTestMixin: | ||
def test_smoke_test(self): | ||
with self.cursor() as cursor: | ||
cursor.execute("select 0") | ||
rows = cursor.fetchall() | ||
self.assertEqual(len(rows), 1) | ||
self.assertEqual(rows[0][0], 0) | ||
|
||
|
||
class CoreTestMixin: | ||
""" | ||
This mixin expects to be mixed with a CursorTest-like class with the following extra attributes: | ||
validate_row_value_type: bool | ||
validate_result: bool | ||
""" | ||
|
||
# A list of (subquery, column_type, python_type, expected_result) | ||
# To be executed as "SELECT {} FROM RANGE(...)" and "SELECT {}" | ||
range_queries = [ | ||
("TRUE", 'boolean', bool, True), | ||
("cast(1 AS TINYINT)", 'byte', int, 1), | ||
("cast(1000 AS SMALLINT)", 'short', int, 1000), | ||
("cast(100000 AS INTEGER)", 'integer', int, 100000), | ||
("cast(10000000000000 AS BIGINT)", 'long', int, 10000000000000), | ||
("cast(100.001 AS DECIMAL(6, 3))", 'decimal', decimal.Decimal, 100.001), | ||
("date '2020-02-20'", 'date', datetime.date, datetime.date(2020, 2, 20)), | ||
("unhex('f000')", 'binary', bytes, b'\xf0\x00'), # pyodbc internal mismatch | ||
("'foo'", 'string', str, 'foo'), | ||
# SPARK-32130: 6.x: "4 weeks 2 days" vs 7.x: "30 days" | ||
# ("interval 30 days", str, str, "interval 4 weeks 2 days"), | ||
# ("interval 3 days", str, str, "interval 3 days"), | ||
("CAST(NULL AS DOUBLE)", 'double', type(None), None), | ||
] | ||
|
||
# Full queries, only the first column of the first row is checked | ||
queries = [("NULL UNION (SELECT 1) order by 1", 'integer', type(None), None)] | ||
|
||
def run_tests_on_queries(self, default_conf): | ||
failures = [] | ||
for (query, columnType, rowValueType, answer) in self.range_queries: | ||
with self.cursor(default_conf) as cursor: | ||
failures.extend( | ||
self.run_query(cursor, query, columnType, rowValueType, answer, default_conf)) | ||
failures.extend( | ||
self.run_range_query(cursor, query, columnType, rowValueType, answer, | ||
default_conf)) | ||
|
||
for (query, columnType, rowValueType, answer) in self.queries: | ||
with self.cursor(default_conf) as cursor: | ||
failures.extend( | ||
self.run_query(cursor, query, columnType, rowValueType, answer, default_conf)) | ||
|
||
if failures: | ||
self.fail("Failed testing result set with Arrow. " | ||
"Failed queries: {}".format("\n\n".join([str(f) for f in failures]))) | ||
|
||
def run_query(self, cursor, query, columnType, rowValueType, answer, conf): | ||
full_query = "SELECT {}".format(query) | ||
expected_column_types = self.expected_column_types(columnType) | ||
try: | ||
cursor.execute(full_query) | ||
(result, ) = cursor.fetchone() | ||
if not all(cursor.description[0][1] == type for type in expected_column_types): | ||
return [ | ||
TypeFailure(full_query, expected_column_types, rowValueType, answer, result, | ||
type(result), cursor.description, conf) | ||
] | ||
if self.validate_row_value_type and type(result) is not rowValueType: | ||
return [ | ||
TypeFailure(full_query, expected_column_types, rowValueType, answer, result, | ||
type(result), cursor.description, conf) | ||
] | ||
if self.validate_result and str(answer) != str(result): | ||
return [ | ||
ResultFailure(full_query, query, expected_column_types, rowValueType, answer, | ||
result, type(result), cursor.description, conf) | ||
] | ||
return [] | ||
except Exception as e: | ||
return [ | ||
ExecFailure(full_query, columnType, rowValueType, None, None, None, | ||
cursor.description, conf, e) | ||
] | ||
|
||
def run_range_query(self, cursor, query, columnType, rowValueType, expected, conf): | ||
full_query = "SELECT {}, id FROM RANGE({})".format(query, 5000) | ||
expected_column_types = self.expected_column_types(columnType) | ||
try: | ||
cursor.execute(full_query) | ||
while True: | ||
rows = cursor.fetchmany(1000) | ||
if len(rows) <= 0: | ||
break | ||
for index, (result, id) in enumerate(rows): | ||
if not all(cursor.description[0][1] == type for type in expected_column_types): | ||
return [ | ||
TypeFailure(full_query, expected_column_types, rowValueType, expected, | ||
result, type(result), cursor.description, conf) | ||
] | ||
if self.validate_row_value_type and type(result) \ | ||
is not rowValueType: | ||
return [ | ||
TypeFailure(full_query, expected_column_types, rowValueType, expected, | ||
result, type(result), cursor.description, conf) | ||
] | ||
if self.validate_result and str(expected) != str(result): | ||
return [ | ||
ResultFailure(full_query, expected_column_types, rowValueType, expected, | ||
result, type(result), cursor.description, conf) | ||
] | ||
return [] | ||
except Exception as e: | ||
return [ | ||
ExecFailure(full_query, columnType, rowValueType, None, None, None, | ||
cursor.description, conf, e) | ||
] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
from decimal import Decimal | ||
|
||
import pyarrow | ||
|
||
|
||
class DecimalTestsMixin: | ||
decimal_and_expected_results = [ | ||
("100.001 AS DECIMAL(6, 3)", Decimal("100.001"), pyarrow.decimal128(6, 3)), | ||
("1000000.0000 AS DECIMAL(11, 4)", Decimal("1000000.0000"), pyarrow.decimal128(11, 4)), | ||
("-10.2343 AS DECIMAL(10, 6)", Decimal("-10.234300"), pyarrow.decimal128(10, 6)), | ||
# TODO(SC-90767): Re-enable this test after we have a way of passing `ansi_mode` = False | ||
#("-13872347.2343 AS DECIMAL(10, 10)", None, pyarrow.decimal128(10, 10)), | ||
("NULL AS DECIMAL(1, 1)", None, pyarrow.decimal128(1, 1)), | ||
("1 AS DECIMAL(1, 0)", Decimal("1"), pyarrow.decimal128(1, 0)), | ||
("0.00000 AS DECIMAL(5, 3)", Decimal("0.000"), pyarrow.decimal128(5, 3)), | ||
("1e-3 AS DECIMAL(38, 3)", Decimal("0.001"), pyarrow.decimal128(38, 3)), | ||
] | ||
|
||
multi_decimals_and_expected_results = [ | ||
(["1 AS DECIMAL(6, 3)", "100.001 AS DECIMAL(6, 3)", "NULL AS DECIMAL(6, 3)"], | ||
[Decimal("1.00"), Decimal("100.001"), None], pyarrow.decimal128(6, 3)), | ||
(["1 AS DECIMAL(6, 3)", "2 AS DECIMAL(5, 2)"], [Decimal('1.000'), | ||
Decimal('2.000')], pyarrow.decimal128(6, | ||
3)), | ||
] | ||
|
||
def test_decimals(self): | ||
with self.cursor({}) as cursor: | ||
for (decimal, expected_value, expected_type) in self.decimal_and_expected_results: | ||
query = "SELECT CAST ({})".format(decimal) | ||
with self.subTest(query=query): | ||
cursor.execute(query) | ||
table = cursor.fetchmany_arrow(1) | ||
self.assertEqual(table.field(0).type, expected_type) | ||
self.assertEqual(table.to_pydict().popitem()[1][0], expected_value) | ||
|
||
def test_multi_decimals(self): | ||
with self.cursor({}) as cursor: | ||
for (decimals, expected_values, | ||
expected_type) in self.multi_decimals_and_expected_results: | ||
union_str = " UNION ".join(["(SELECT CAST ({}))".format(dec) for dec in decimals]) | ||
query = "SELECT * FROM ({}) ORDER BY 1 NULLS LAST".format(union_str) | ||
|
||
with self.subTest(query=query): | ||
cursor.execute(query) | ||
table = cursor.fetchall_arrow() | ||
self.assertEqual(table.field(0).type, expected_type) | ||
self.assertEqual(table.to_pydict().popitem()[1], expected_values) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
import logging | ||
import math | ||
import time | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
class LargeQueriesMixin: | ||
""" | ||
This mixin expects to be mixed with a CursorTest-like class | ||
""" | ||
|
||
def fetch_rows(self, cursor, row_count, fetchmany_size): | ||
""" | ||
A generator for rows. Fetches until the end or up to 5 minutes. | ||
""" | ||
# TODO: Remove fetchmany_size when we have fixed the performance issues with fetchone | ||
# in the Python client | ||
max_fetch_time = 5 * 60 # Fetch for at most 5 minutes | ||
|
||
rows = self.get_some_rows(cursor, fetchmany_size) | ||
start_time = time.time() | ||
n = 0 | ||
while rows: | ||
for row in rows: | ||
n += 1 | ||
yield row | ||
if time.time() - start_time >= max_fetch_time: | ||
log.warning("Fetching rows timed out") | ||
break | ||
rows = self.get_some_rows(cursor, fetchmany_size) | ||
if not rows: | ||
# Read all the rows, row_count should match | ||
self.assertEqual(n, row_count) | ||
|
||
num_fetches = max(math.ceil(n / 10000), 1) | ||
latency_ms = int((time.time() - start_time) * 1000 / num_fetches), 1 | ||
print('Fetched {} rows with an avg latency of {} per fetch, '.format(n, latency_ms) + | ||
'assuming 10K fetch size.') | ||
|
||
def test_query_with_large_wide_result_set(self): | ||
resultSize = 300 * 1000 * 1000 # 300 MB | ||
width = 8192 # B | ||
rows = resultSize // width | ||
cols = width // 36 | ||
|
||
# Set the fetchmany_size to get 10MB of data a go | ||
fetchmany_size = 10 * 1024 * 1024 // width | ||
# This is used by PyHive tests to determine the buffer size | ||
self.arraysize = 1000 | ||
with self.cursor() as cursor: | ||
uuids = ", ".join(["uuid() uuid{}".format(i) for i in range(cols)]) | ||
cursor.execute("SELECT id, {uuids} FROM RANGE({rows})".format(uuids=uuids, rows=rows)) | ||
for row_id, row in enumerate(self.fetch_rows(cursor, rows, fetchmany_size)): | ||
self.assertEqual(row[0], row_id) # Verify no rows are dropped in the middle. | ||
self.assertEqual(len(row[1]), 36) | ||
|
||
def test_query_with_large_narrow_result_set(self): | ||
resultSize = 300 * 1000 * 1000 # 300 MB | ||
width = 8 # sizeof(long) | ||
rows = resultSize / width | ||
|
||
# Set the fetchmany_size to get 10MB of data a go | ||
fetchmany_size = 10 * 1024 * 1024 // width | ||
# This is used by PyHive tests to determine the buffer size | ||
self.arraysize = 10000000 | ||
with self.cursor() as cursor: | ||
cursor.execute("SELECT * FROM RANGE({rows})".format(rows=rows)) | ||
for row_id, row in enumerate(self.fetch_rows(cursor, rows, fetchmany_size)): | ||
self.assertEqual(row[0], row_id) | ||
|
||
def test_long_running_query(self): | ||
""" Incrementally increase query size until it takes at least 5 minutes, | ||
and asserts that the query completes successfully. | ||
""" | ||
minutes = 60 | ||
min_duration = 5 * minutes | ||
|
||
duration = -1 | ||
scale0 = 10000 | ||
scale_factor = 1 | ||
with self.cursor() as cursor: | ||
while duration < min_duration: | ||
self.assertLess(scale_factor, 512, msg="Detected infinite loop") | ||
start = time.time() | ||
|
||
cursor.execute("""SELECT count(*) | ||
FROM RANGE({scale}) x | ||
JOIN RANGE({scale0}) y | ||
ON from_unixtime(x.id * y.id, "yyyy-MM-dd") LIKE "%not%a%date%" | ||
""".format(scale=scale_factor * scale0, scale0=scale0)) | ||
|
||
n, = cursor.fetchone() | ||
self.assertEqual(n, 0) | ||
|
||
duration = time.time() - start | ||
current_fraction = duration / min_duration | ||
print('Took {} s with scale factor={}'.format(duration, scale_factor)) | ||
# Extrapolate linearly to reach 5 min and add 50% padding to push over the limit | ||
scale_factor = math.ceil(1.5 * scale_factor / current_fraction) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does unit test mean in this context?
Is this mock unit test without the need for databricks account?
or is it integration test e2e which require databricks account
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unit
means no databricks account is required.e2e
means databricks account is required.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm updating the CONTRIBUTING doc with this info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks Jesse.