diff --git a/ci/requirements-2.6.txt b/ci/requirements-2.6.txt index 60a8b57e72907..751d034ef97f5 100644 --- a/ci/requirements-2.6.txt +++ b/ci/requirements-2.6.txt @@ -4,4 +4,4 @@ python-dateutil==1.5 pytz==2013b http://www.crummy.com/software/BeautifulSoup/bs4/download/4.2/beautifulsoup4-4.2.0.tar.gz html5lib==1.0b2 -bigquery==2.0.15 +bigquery==2.0.17 diff --git a/ci/requirements-2.7.txt b/ci/requirements-2.7.txt index fe27fe10f7c04..3b786152cd653 100644 --- a/ci/requirements-2.7.txt +++ b/ci/requirements-2.7.txt @@ -18,4 +18,4 @@ MySQL-python==1.2.4 scipy==0.10.0 beautifulsoup4==4.2.1 statsmodels==0.5.0 -bigquery==2.0.15 +bigquery==2.0.17 diff --git a/ci/requirements-2.7_LOCALE.txt b/ci/requirements-2.7_LOCALE.txt index f037cbed15160..b18bff6797840 100644 --- a/ci/requirements-2.7_LOCALE.txt +++ b/ci/requirements-2.7_LOCALE.txt @@ -16,4 +16,4 @@ lxml==3.2.1 scipy==0.10.0 beautifulsoup4==4.2.1 statsmodels==0.5.0 -bigquery==2.0.15 +bigquery==2.0.17 diff --git a/pandas/io/gbq.py b/pandas/io/gbq.py index 931aa732d5286..2d490ec071b4e 100644 --- a/pandas/io/gbq.py +++ b/pandas/io/gbq.py @@ -7,6 +7,8 @@ import csv import logging from datetime import datetime +import pkg_resources +from distutils.version import LooseVersion import pandas as pd import numpy as np @@ -19,6 +21,13 @@ import bigquery_client import gflags as flags _BQ_INSTALLED = True + + _BQ_VERSION = pkg_resources.get_distribution('bigquery').version + if LooseVersion(_BQ_VERSION) >= '2.0.17': + _BQ_VALID_VERSION = True + else: + _BQ_VALID_VERSION = False + except ImportError: _BQ_INSTALLED = False @@ -102,6 +111,9 @@ def _parse_entry(field_value, field_type): field_value = np.datetime64(timestamp) elif field_type == 'BOOLEAN': field_value = field_value == 'true' + # Note that results are unicode, so this will + # fail for non-ASCII characters.. this probably + # functions differently in Python 3 else: field_value = str(field_value) return field_value @@ -228,68 +240,76 @@ def _parse_data(client, job, index_col=None, col_order=None): # Iterate over the result rows. # Since Google's API now requires pagination of results, # we do that here. The following is repurposed from - # bigquery_client.py :: Client.ReadTableRows() + # bigquery_client.py :: Client._JobTableReader._ReadOnePage + + # TODO: Enable Reading From Table, see Client._TableTableReader._ReadOnePage # Initially, no page token is set page_token = None - # Most of Google's client API's allow one to set total_rows in case - # the user only wants the first 'n' results from a query. Typically - # they set this to sys.maxint by default, but this caused problems - # during testing - specifically on OS X. It appears that at some - # point in bigquery_client.py, there is an attempt to cast this value - # to an unsigned integer. Depending on the python install, - # sys.maxint may exceed the limitations of unsigned integers. - # - # See: - # https://code.google.com/p/google-bigquery-tools/issues/detail?id=14 - - # This is hardcoded value for 32bit sys.maxint per - # the above note. Theoretically, we could simply use - # 100,000 (or whatever the current max page size is), - # but this is more flexible in the event of an API change - total_rows = 2147483647 - - # Keep track of rows read - row_count = 0 + # This number is the current max results per page + max_rows = bigquery_client._MAX_ROWS_PER_REQUEST + + # How many rows in result set? Initialize to max_rows + total_rows = max_rows + + # This is the starting row for a particular page... + # is ignored if page_token is present, though + # it may be useful if we wish to implement SQL like LIMITs + # with minimums + start_row = 0 # Keep our page DataFrames until the end when we # concatentate them dataframe_list = list() - # Iterate over all rows - while row_count < total_rows: - data = client.apiclient.tabledata().list(maxResults=total_rows - row_count, - pageToken=page_token, - **table_dict).execute() + current_job = job['jobReference'] - # If there are more results than will fit on a page, - # you will recieve a token for the next page. - page_token = data.get('pageToken', None) + # Iterate over all rows + while start_row < total_rows: + # Setup the parameters for getQueryResults() API Call + kwds = dict(current_job) + kwds['maxResults'] = max_rows + # Sets the timeout to 0 because we assume the table is already ready. + # This is because our previous call to Query() is synchronous + # and will block until it's actually done + kwds['timeoutMs'] = 0 + # Use start row if there's no page_token ... in other words, the + # user requested to start somewhere other than the beginning... + # presently this is not a parameter to read_gbq(), but it will be + # added eventually. + if page_token: + kwds['pageToken'] = page_token + else: + kwds['startIndex'] = start_row + data = client.apiclient.jobs().getQueryResults(**kwds).execute() + if not data['jobComplete']: + raise BigqueryError('Job was not completed, or was invalid') # How many rows are there across all pages? - total_rows = min(total_rows, int(data['totalRows'])) # Changed to use get(data[rows],0) + # Note: This is presently the only reason we don't just use + # _ReadOnePage() directly + total_rows = int(data['totalRows']) + + page_token = data.get('pageToken', None) raw_page = data.get('rows', []) page_array = _parse_page(raw_page, col_names, col_types, col_dtypes) - row_count += len(page_array) + start_row += len(raw_page) if total_rows > 0: - completed = (100 * row_count) / total_rows - logger.info('Remaining Rows: ' + str(total_rows - row_count) + '(' + str(completed) + '% Complete)') + completed = (100 * start_row) / total_rows + logger.info('Remaining Rows: ' + str(total_rows - start_row) + '(' + str(completed) + '% Complete)') else: logger.info('No Rows') dataframe_list.append(DataFrame(page_array)) - # Handle any exceptions that might have occured - if not page_token and row_count != total_rows: + # Did we get enough rows? Note: gbq.py stopped checking for this + # but we felt it was still a good idea. + if not page_token and not raw_page and start_row != total_rows: raise bigquery_client.BigqueryInterfaceError( - 'PageToken missing for %r' % ( - bigquery_client.ApiClientHelper.TableReference.Create(**table_dict),)) - if not raw_page and row_count != total_rows: - raise bigquery_client.BigqueryInterfaceError( - 'Not enough rows returned by server for %r' % ( - bigquery_client.ApiClientHelper.TableReference.Create(**table_dict),)) + ("Not enough rows returned by server. Expected: {0}" + \ + " Rows, But Recieved {1}").format(total_rows, start_row)) # Build final dataframe final_df = concat(dataframe_list, ignore_index=True) @@ -355,6 +375,10 @@ def to_gbq(dataframe, destination_table, schema=None, col_order=None, if_exists= else: raise ImportError('Could not import Google BigQuery Client.') + if not _BQ_VALID_VERSION: + raise ImportError("pandas requires bigquery >= 2.0.17 for Google BigQuery " + "support, current version " + _BQ_VERSION) + ALLOWED_TYPES = ['STRING', 'INTEGER', 'FLOAT', 'BOOLEAN', 'TIMESTAMP', 'RECORD'] if if_exists == 'replace' and schema is None: @@ -456,6 +480,10 @@ def read_gbq(query, project_id = None, destination_table = None, index_col=None, else: raise ImportError('Could not import Google BigQuery Client.') + if not _BQ_VALID_VERSION: + raise ImportError("pandas requires bigquery >= 2.0.17 for Google BigQuery " + "support, current version " + _BQ_VERSION) + query_args = kwargs query_args['project_id'] = project_id query_args['query'] = query diff --git a/pandas/io/tests/test_gbq.py b/pandas/io/tests/test_gbq.py index 89b048d472d5f..f56c1aa042421 100644 --- a/pandas/io/tests/test_gbq.py +++ b/pandas/io/tests/test_gbq.py @@ -40,20 +40,21 @@ def GetTableSchema(self,table_dict): # Fake Google BigQuery API Client class FakeApiClient: def __init__(self): - self._tabledata = FakeTableData() + self._fakejobs = FakeJobs() - def tabledata(self): - return self._tabledata + def jobs(self): + return self._fakejobs -class FakeTableData: +class FakeJobs: def __init__(self): - self._list = FakeList() + self._fakequeryresults = FakeResults() - def list(self,maxResults = None, pageToken = None, **table_dict): - return self._list + def getQueryResults(self, job_id=None, project_id=None, + max_results=None, timeout_ms=None, **kwargs): + return self._fakequeryresults -class FakeList: +class FakeResults: def execute(self): return {'rows': [ {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'brave'}, {'v': '3'}]}, {'f': [{'v': 'othello'}, {'v': '1603'}, {'v': 'attended'}, {'v': '1'}]}, @@ -68,7 +69,8 @@ def execute(self): ], 'kind': 'bigquery#tableDataList', 'etag': '"4PTsVxg68bQkQs1RJ1Ndewqkgg4/hoRHzb4qfhJAIa2mEewC-jhs9Bg"', - 'totalRows': '10'} + 'totalRows': '10', + 'jobComplete' : True} #################################################################################### @@ -225,16 +227,16 @@ def test_column_order_plus_index(self): correct_frame_small = DataFrame(correct_frame_small)[col_order] tm.assert_index_equal(result_frame.columns, correct_frame_small.columns) - # @with_connectivity_check - # def test_download_dataset_larger_than_100k_rows(self): - # # Test for known BigQuery bug in datasets larger than 100k rows - # # http://stackoverflow.com/questions/19145587/bq-py-not-paging-results - # if not os.path.exists(self.bq_token): - # raise nose.SkipTest('Skipped because authentication information is not available.') + @with_connectivity_check + def test_download_dataset_larger_than_100k_rows(self): + # Test for known BigQuery bug in datasets larger than 100k rows + # http://stackoverflow.com/questions/19145587/bq-py-not-paging-results + if not os.path.exists(self.bq_token): + raise nose.SkipTest('Skipped because authentication information is not available.') - # client = gbq._authenticate() - # a = gbq.read_gbq("SELECT id, FROM [publicdata:samples.wikipedia] LIMIT 100005") - # self.assertTrue(len(a) == 100005) + client = gbq._authenticate() + a = gbq.read_gbq("SELECT id, FROM [publicdata:samples.wikipedia] LIMIT 100005") + self.assertTrue(len(a) == 100005) @with_connectivity_check def test_download_all_data_types(self):