diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 4da116434e..300fdf2e70 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -28,6 +28,7 @@ TYPE_CHECKING, Callable, Dict, + Iterator, List, Optional, Set, @@ -575,42 +576,42 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: """ @abstractmethod - def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: + def list_tables(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]: """List tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: iterator of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. """ @abstractmethod - def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> Iterator[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: a List of namespace identifiers. + Iterator[Identifier]: iterator of namespace identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. """ @abstractmethod - def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: + def list_views(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]: """List views under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: iterator of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 3b37762638..25aa585dff 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -20,7 +20,7 @@ TYPE_CHECKING, Any, Dict, - List, + Iterator, Optional, Set, Tuple, @@ -385,7 +385,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) table_identifiers = self.list_tables(namespace=database_name) - if len(table_identifiers) > 0: + if len(list(table_identifiers)) > 0: raise NamespaceNotEmptyError(f"Database {database_name} is not empty") try: @@ -397,14 +397,14 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: except ConditionalCheckFailedException as e: raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e - def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: + def list_tables(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]: """List Iceberg tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: iterator of table identifiers. """ database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) @@ -429,7 +429,6 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: ) as e: raise GenericDynamoDbError(e.message) from e - table_identifiers = [] for page in page_iterator: for item in page["Items"]: _dict = _convert_dynamo_item_to_regular_dict(item) @@ -437,21 +436,19 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: if identifier_col == DYNAMODB_NAMESPACE: continue - table_identifiers.append(self.identifier_to_tuple(identifier_col)) + yield self.identifier_to_tuple(identifier_col) - return table_identifiers - - def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> Iterator[Identifier]: """List top-level namespaces from the catalog. We do not support hierarchical namespace. Returns: - List[Identifier]: a List of namespace identifiers. + Iterator[Identifier]: iterator of namespace identifiers. """ # Hierarchical namespace is not supported. Return an empty list if namespace: - return [] + return paginator = self.dynamodb.get_paginator("query") @@ -474,14 +471,11 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi ) as e: raise GenericDynamoDbError(e.message) from e - database_identifiers = [] for page in page_iterator: for item in page["Items"]: _dict = _convert_dynamo_item_to_regular_dict(item) namespace_col = _dict[DYNAMODB_COL_NAMESPACE] - database_identifiers.append(self.identifier_to_tuple(namespace_col)) - - return database_identifiers + yield self.identifier_to_tuple(namespace_col) def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: """ @@ -538,7 +532,7 @@ def update_namespace_properties( return properties_update_summary - def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: + def list_views(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]: raise NotImplementedError def drop_view(self, identifier: Union[str, Identifier]) -> None: diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 4eb4164e57..108c39f734 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -20,6 +20,7 @@ TYPE_CHECKING, Any, Dict, + Iterator, List, Optional, Set, @@ -96,7 +97,6 @@ from mypy_boto3_glue.type_defs import ( ColumnTypeDef, DatabaseInputTypeDef, - DatabaseTypeDef, StorageDescriptorTypeDef, TableInputTypeDef, TableTypeDef, @@ -710,20 +710,19 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: ) self.glue.delete_database(Name=database_name) - def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: + def list_tables(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]: """List Iceberg tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: iterator of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid. """ database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) - table_list: List["TableTypeDef"] = [] next_token: Optional[str] = None try: while True: @@ -732,37 +731,36 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: if not next_token else self.glue.get_tables(DatabaseName=database_name, NextToken=next_token) ) - table_list.extend(table_list_response["TableList"]) + for table in table_list_response["TableList"]: + if self.__is_iceberg_table(table): + yield (database_name, table["Name"]) next_token = table_list_response.get("NextToken") if not next_token: break except self.glue.exceptions.EntityNotFoundException as e: raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e - return [(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)] - def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> Iterator[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. Returns: - List[Identifier]: a List of namespace identifiers. + Iterator[Identifier]: iterator of namespace identifiers. """ # Hierarchical namespace is not supported. Return an empty list if namespace: - return [] + return - database_list: List["DatabaseTypeDef"] = [] next_token: Optional[str] = None while True: databases_response = self.glue.get_databases() if not next_token else self.glue.get_databases(NextToken=next_token) - database_list.extend(databases_response["DatabaseList"]) + for database in databases_response["DatabaseList"]: + yield self.identifier_to_tuple(database["Name"]) next_token = databases_response.get("NextToken") if not next_token: break - return [self.identifier_to_tuple(database["Name"]) for database in database_list] - def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: """Get properties for a namespace. @@ -817,7 +815,7 @@ def update_namespace_properties( return properties_update_summary - def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: + def list_views(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]: raise NotImplementedError def drop_view(self, identifier: Union[str, Identifier]) -> None: diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index cc9cd028c4..5b8a2f6bbf 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -23,6 +23,7 @@ TYPE_CHECKING, Any, Dict, + Iterator, List, Optional, Set, @@ -469,7 +470,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: return self._convert_hive_into_iceberg(hive_table) - def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: + def list_views(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]: raise NotImplementedError def view_exists(self, identifier: Union[str, Identifier]) -> bool: @@ -710,7 +711,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: except MetaException as e: raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e - def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: + def list_tables(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]: """List Iceberg tables under the given namespace in the catalog. When the database doesn't exist, it will just return an empty list. @@ -719,14 +720,14 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: namespace: Database to list. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: iterator of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid. """ database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) with self._client as open_client: - return [ + yield from [ (database_name, table.tableName) for table in open_client.get_table_objects_by_name( dbname=database_name, tbl_names=open_client.get_all_tables(db_name=database_name) @@ -734,18 +735,18 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: if table.parameters.get(TABLE_TYPE, "").lower() == ICEBERG ] - def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> Iterator[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. Returns: - List[Identifier]: a List of namespace identifiers. + Iterator[Identifier]: an iterator of namespace identifiers. """ # Hierarchical namespace is not supported. Return an empty list if namespace: - return [] + return iter([]) with self._client as open_client: - return list(map(self.identifier_to_tuple, open_client.get_all_databases())) + return iter(map(self.identifier_to_tuple, open_client.get_all_databases())) def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: """Get properties for a namespace. diff --git a/pyiceberg/catalog/noop.py b/pyiceberg/catalog/noop.py index 39a0b382a7..260003c626 100644 --- a/pyiceberg/catalog/noop.py +++ b/pyiceberg/catalog/noop.py @@ -16,7 +16,7 @@ # under the License. from typing import ( TYPE_CHECKING, - List, + Iterator, Optional, Set, Tuple, @@ -106,10 +106,10 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper def drop_namespace(self, namespace: Union[str, Identifier]) -> None: raise NotImplementedError - def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: + def list_tables(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]: raise NotImplementedError - def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> Iterator[Identifier]: raise NotImplementedError def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: @@ -120,7 +120,7 @@ def update_namespace_properties( ) -> PropertiesUpdateSummary: raise NotImplementedError - def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: + def list_views(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]: raise NotImplementedError def view_exists(self, identifier: Union[str, Identifier]) -> bool: diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 6215d17a4f..aa250e6632 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -19,6 +19,7 @@ TYPE_CHECKING, Any, Dict, + Iterator, List, Optional, Set, @@ -184,6 +185,7 @@ class ConfigResponse(IcebergBaseModel): class ListNamespaceResponse(IcebergBaseModel): namespaces: List[Identifier] = Field() + next_page_token: Optional[str] = Field(alias="next-page-token", default=None) class NamespaceResponse(IcebergBaseModel): @@ -209,10 +211,12 @@ class ListViewResponseEntry(IcebergBaseModel): class ListTablesResponse(IcebergBaseModel): identifiers: List[ListTableResponseEntry] = Field() + next_page_token: Optional[str] = Field(alias="next-page-token", default=None) class ListViewsResponse(IcebergBaseModel): identifiers: List[ListViewResponseEntry] = Field() + next_page_token: Optional[str] = Field(alias="next-page-token", default=None) class RestCatalog(Catalog): @@ -584,15 +588,43 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: return self._response_to_table(self.identifier_to_tuple(identifier), table_response) @retry(**_RETRY_ARGS) - def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: - namespace_tuple = self._check_valid_namespace_identifier(namespace) + def list_tables_raw( + self, namespace: Union[str, Identifier], page_size: Optional[int] = None, next_page_token: Optional[str] = None + ) -> ListTablesResponse: + """List Tables, optionally provide page size and next page token. + + Args: + namespace (Union[str, Identifier]): Namespace to list against + page_size (int): Number of namespaces to return per request. + next_page_token (Optional[str]): Token for pagination. + + Returns: + ListTablesResponse: List of tables. + """ + namespace_tuple = self.identifier_to_tuple(namespace) namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) - response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat)) + params: Dict[str, Any] = {} + if next_page_token is not None: + params["pageToken"] = next_page_token + if page_size is not None: + params["pageSize"] = page_size + response = self._session.get(self.url(Endpoints.list_tables, namespace=namespace_concat), params=params) try: response.raise_for_status() except HTTPError as exc: _handle_non_200_response(exc, {404: NoSuchNamespaceError}) - return [(*table.namespace, table.name) for table in ListTablesResponse.model_validate_json(response.text).identifiers] + return ListTablesResponse.model_validate_json(response.text) + + def list_tables(self, namespace: Union[str, Identifier], page_size: Optional[int] = None) -> Iterator[Identifier]: + """Lazily iterate over tables in a namespace.""" + next_page_token = None + while True: + list_tables_response = self.list_tables_raw(namespace=namespace, page_size=page_size, next_page_token=next_page_token) + yield from ((*table.namespace, table.name) for table in list_tables_response.identifiers) + if list_tables_response.next_page_token is None: + break + else: + next_page_token = list_tables_response.next_page_token @retry(**_RETRY_ARGS) def load_table(self, identifier: Union[str, Identifier]) -> Table: @@ -655,15 +687,44 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm return table_request @retry(**_RETRY_ARGS) - def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: + def list_views_raw( + self, namespace: Union[str, Identifier], page_size: Optional[int] = None, next_page_token: Optional[str] = None + ) -> ListViewsResponse: + """List Views, optionally provide page size and next page token. + + Args: + namespace (Union[str, Identifier]): Namespace to list against + page_size (int): Number of namespaces to return per request. + next_page_token (Optional[str]): Token for pagination. + + Returns: + ListViewsResponse: List of views. + """ namespace_tuple = self._check_valid_namespace_identifier(namespace) namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) - response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat)) + params: Dict[str, Any] = {} + if next_page_token is not None: + params["pageToken"] = next_page_token + if page_size is not None: + params["pageSize"] = page_size + response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat), params=params) try: response.raise_for_status() except HTTPError as exc: _handle_non_200_response(exc, {404: NoSuchNamespaceError}) - return [(*view.namespace, view.name) for view in ListViewsResponse.model_validate_json(response.text).identifiers] + + return ListViewsResponse.model_validate_json(response.text) + + def list_views(self, namespace: Union[str, Identifier], page_size: Optional[int] = None) -> Iterator[Identifier]: + """Lazily iterate over views in a namespace.""" + next_page_token = None + while True: + list_views_response = self.list_views_raw(namespace=namespace, page_size=page_size, next_page_token=next_page_token) + yield from ((*view.namespace, view.name) for view in list_views_response.identifiers) + if list_views_response.next_page_token is None: + break + else: + next_page_token = list_views_response.next_page_token @retry(**_RETRY_ARGS) def commit_table( @@ -732,21 +793,53 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: _handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError}) @retry(**_RETRY_ARGS) - def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + def list_namespaces_raw( + self, namespace: Union[str, Identifier] = (), page_size: Optional[int] = None, next_page_token: Optional[str] = None + ) -> ListNamespaceResponse: + """List namespaces, optionally provide page size and next page token. + + Args: + namespace (Union[str, Identifier]): Namespace to list sub-namespaces for. + page_size (int): Number of namespaces to return per request. + next_page_token (Optional[str]): Token for pagination. + + Returns: + ListNamespaceResponse: List of namespaces. + """ namespace_tuple = self.identifier_to_tuple(namespace) + params: Dict[str, Any] = {} + if next_page_token is not None: + params["pageToken"] = next_page_token + if page_size is not None: + params["pageSize"] = page_size + response = self._session.get( self.url( f"{Endpoints.list_namespaces}?parent={NAMESPACE_SEPARATOR.join(namespace_tuple)}" if namespace_tuple else Endpoints.list_namespaces ), + params=params, ) try: response.raise_for_status() except HTTPError as exc: _handle_non_200_response(exc, {404: NoSuchNamespaceError}) - return ListNamespaceResponse.model_validate_json(response.text).namespaces + return ListNamespaceResponse.model_validate_json(response.text) + + def list_namespaces(self, namespace: Union[str, Identifier] = (), page_size: Optional[int] = None) -> Iterator[Identifier]: + """Lazily iterate over namespaces.""" + next_page_token = None + while True: + list_namespace_response = self.list_namespaces_raw( + namespace=namespace, page_size=page_size, next_page_token=next_page_token + ) + yield from list_namespace_response.namespaces + if list_namespace_response.next_page_token is None: + break + else: + next_page_token = list_namespace_response.next_page_token @retry(**_RETRY_ARGS) def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 880a4db481..8b425b897a 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -17,7 +17,7 @@ from typing import ( TYPE_CHECKING, - List, + Iterator, Optional, Set, Tuple, @@ -569,8 +569,8 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") namespace_str = Catalog.namespace_to_string(namespace) - if tables := self.list_tables(namespace): - raise NamespaceNotEmptyError(f"Namespace {namespace_str} is not empty. {len(tables)} tables exist.") + if tables := list(self.list_tables(namespace)): + raise NamespaceNotEmptyError(f"Namespace {namespace_str} is not empty. {len(list(tables))} tables exist.") with Session(self.engine) as session: session.execute( @@ -581,14 +581,14 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: ) session.commit() - def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: + def list_tables(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]: """List tables under the given namespace in the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: list of table identifiers. + Iterator[Identifier]: iterator of table identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. @@ -600,16 +600,18 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: stmt = select(IcebergTables).where(IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace) with Session(self.engine) as session: result = session.scalars(stmt) - return [(Catalog.identifier_to_tuple(table.table_namespace) + (table.table_name,)) for table in result] + identifiers = [(Catalog.identifier_to_tuple(table.table_namespace) + (table.table_name,)) for table in result] - def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + yield from identifiers + + def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> Iterator[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. Args: namespace (str | Identifier): Namespace identifier to search. Returns: - List[Identifier]: a List of namespace identifiers. + Iterator[Identifier]: iterator of namespace identifiers. Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. @@ -630,7 +632,6 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi with Session(self.engine) as session: namespace_tuple = Catalog.identifier_to_tuple(namespace) sub_namespaces_level_length = len(namespace_tuple) + 1 - namespaces = list( { # only get distinct namespaces ns[:sub_namespaces_level_length] # truncate to the required level @@ -641,7 +642,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi } ) - return namespaces + yield from namespaces def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: """Get properties for a namespace. @@ -722,7 +723,7 @@ def update_namespace_properties( session.commit() return properties_update_summary - def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: + def list_views(self, namespace: Union[str, Identifier]) -> Iterator[Identifier]: raise NotImplementedError def view_exists(self, identifier: Union[str, Identifier]) -> bool: diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py index 3fbd9c9fc9..dc53b4caf3 100644 --- a/pyiceberg/cli/console.py +++ b/pyiceberg/cli/console.py @@ -111,13 +111,20 @@ def list(ctx: Context, parent: Optional[str]) -> None: # pylint: disable=redefi """List tables or namespaces.""" catalog, output = _catalog_and_output(ctx) - identifiers = [] + identifiers = None if parent: - # Do we have tables under parent namespace? identifiers = catalog.list_tables(parent) - if not identifiers: - # List hierarchical namespaces if parent, root namespaces otherwise. + try: + first_item = next(identifiers) + from itertools import chain + + identifiers = chain([first_item], identifiers) + except StopIteration: + identifiers = None + + if identifiers is None: identifiers = catalog.list_namespaces(parent or ()) + output.identifiers(identifiers) diff --git a/pyiceberg/cli/output.py b/pyiceberg/cli/output.py index 0eb85841bf..347eb24b88 100644 --- a/pyiceberg/cli/output.py +++ b/pyiceberg/cli/output.py @@ -19,6 +19,7 @@ from typing import ( Any, Dict, + Iterator, List, Optional, Tuple, @@ -44,7 +45,7 @@ class Output(ABC): def exception(self, ex: Exception) -> None: ... @abstractmethod - def identifiers(self, identifiers: List[Identifier]) -> None: ... + def identifiers(self, identifiers: Iterator[Identifier]) -> None: ... @abstractmethod def describe_table(self, table: Table) -> None: ... @@ -92,7 +93,7 @@ def exception(self, ex: Exception) -> None: else: Console(stderr=True).print(ex) - def identifiers(self, identifiers: List[Identifier]) -> None: + def identifiers(self, identifiers: Iterator[Identifier]) -> None: table = self._table for identifier in identifiers: table.add_row(".".join(identifier)) @@ -203,7 +204,7 @@ def _out(self, d: Any) -> None: def exception(self, ex: Exception) -> None: self._out({"type": ex.__class__.__name__, "message": str(ex)}) - def identifiers(self, identifiers: List[Identifier]) -> None: + def identifiers(self, identifiers: Iterator[Identifier]) -> None: self._out([".".join(identifier) for identifier in identifiers]) def describe_table(self, table: Table) -> None: diff --git a/tests/catalog/integration_test_dynamodb.py b/tests/catalog/integration_test_dynamodb.py index 895f233c45..4332ad0c64 100644 --- a/tests/catalog/integration_test_dynamodb.py +++ b/tests/catalog/integration_test_dynamodb.py @@ -119,7 +119,7 @@ def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, databas test_catalog.create_namespace(database_name) for table_name in table_list: test_catalog.create_table((database_name, table_name), table_schema_nested) - identifier_list = test_catalog.list_tables(database_name) + identifier_list = list(test_catalog.list_tables(database_name)) assert len(identifier_list) == LIST_TEST_NUMBER for table_name in table_list: assert (database_name, table_name) in identifier_list @@ -207,10 +207,10 @@ def test_create_namespace_with_comment_and_location(test_catalog: Catalog, datab def test_list_namespaces(test_catalog: Catalog, database_list: List[str]) -> None: for database_name in database_list: test_catalog.create_namespace(database_name) - db_list = test_catalog.list_namespaces() + db_list = list(test_catalog.list_namespaces()) for database_name in database_list: assert (database_name,) in db_list - assert len(test_catalog.list_namespaces(list(database_list)[0])) == 0 + assert len(list(test_catalog.list_namespaces(list(database_list)[0]))) == 0 def test_drop_namespace(test_catalog: Catalog, table_schema_nested: Schema, table_name: str, database_name: str) -> None: diff --git a/tests/catalog/integration_test_glue.py b/tests/catalog/integration_test_glue.py index 475fc07ead..abb09ddbbb 100644 --- a/tests/catalog/integration_test_glue.py +++ b/tests/catalog/integration_test_glue.py @@ -226,7 +226,7 @@ def test_list_tables(test_catalog: Catalog, table_schema_nested: Schema, databas test_catalog.create_namespace(database_name) for table_name in table_list: test_catalog.create_table((database_name, table_name), table_schema_nested) - identifier_list = test_catalog.list_tables(database_name) + identifier_list = list(test_catalog.list_tables(database_name)) assert len(identifier_list) == LIST_TEST_NUMBER for table_name in table_list: assert (database_name, table_name) in identifier_list @@ -315,10 +315,10 @@ def test_create_namespace_with_comment_and_location(test_catalog: Catalog, datab def test_list_namespaces(test_catalog: Catalog, database_list: List[str]) -> None: for database_name in database_list: test_catalog.create_namespace(database_name) - db_list = test_catalog.list_namespaces() + db_list = list(test_catalog.list_namespaces()) for database_name in database_list: assert (database_name,) in db_list - assert len(test_catalog.list_namespaces(list(database_list)[0])) == 0 + assert len(list(test_catalog.list_namespaces(list(database_list)[0]))) == 0 def test_drop_namespace(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None: diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 2ab97b4285..e66e18be7f 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -377,12 +377,12 @@ def test_list_namespaces(catalog: InMemoryCatalog) -> None: # Given catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES) # When - namespaces = catalog.list_namespaces() + namespaces = list(catalog.list_namespaces()) # Then assert TEST_TABLE_NAMESPACE[:1] in namespaces # When - namespaces = catalog.list_namespaces(TEST_TABLE_NAMESPACE) + namespaces = list(catalog.list_namespaces(TEST_TABLE_NAMESPACE)) # Then assert not namespaces @@ -426,7 +426,7 @@ def test_list_tables_under_a_namespace(catalog: InMemoryCatalog) -> None: catalog.create_namespace(new_namespace) # When all_tables = catalog.list_tables(namespace=TEST_TABLE_NAMESPACE) - new_namespace_tables = catalog.list_tables(new_namespace) + new_namespace_tables = list(catalog.list_tables(new_namespace)) # Then assert all_tables assert TEST_TABLE_IDENTIFIER in all_tables diff --git a/tests/catalog/test_dynamodb.py b/tests/catalog/test_dynamodb.py index c7c39a600d..f65f629775 100644 --- a/tests/catalog/test_dynamodb.py +++ b/tests/catalog/test_dynamodb.py @@ -418,7 +418,7 @@ def test_list_namespaces(_bucket_initialize: None, database_list: List[str]) -> def test_create_namespace_no_properties(_bucket_initialize: None, database_name: str) -> None: test_catalog = DynamoDbCatalog("test_ddb_catalog") test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list properties = test_catalog.load_namespace_properties(database_name) @@ -434,7 +434,7 @@ def test_create_namespace_with_comment_and_location(_bucket_initialize: None, da } test_catalog = DynamoDbCatalog("test_ddb_catalog") test_catalog.create_namespace(namespace=database_name, properties=test_properties) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list properties = test_catalog.load_namespace_properties(database_name) @@ -446,7 +446,7 @@ def test_create_namespace_with_comment_and_location(_bucket_initialize: None, da def test_create_duplicated_namespace(_bucket_initialize: None, database_name: str) -> None: test_catalog = DynamoDbCatalog("test_ddb_catalog") test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list with pytest.raises(NamespaceAlreadyExistsError): @@ -457,11 +457,11 @@ def test_create_duplicated_namespace(_bucket_initialize: None, database_name: st def test_drop_namespace(_bucket_initialize: None, database_name: str) -> None: test_catalog = DynamoDbCatalog("test_ddb_catalog") test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list test_catalog.drop_namespace(database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 0 @@ -473,7 +473,7 @@ def test_drop_non_empty_namespace( test_catalog = DynamoDbCatalog("test_ddb_catalog", **{"warehouse": f"s3://{BUCKET_NAME}", "s3.endpoint": moto_endpoint_url}) test_catalog.create_namespace(namespace=database_name) test_catalog.create_table(identifier, table_schema_nested) - assert len(test_catalog.list_tables(database_name)) == 1 + assert len(list(test_catalog.list_tables(database_name))) == 1 with pytest.raises(NamespaceNotEmptyError): test_catalog.drop_namespace(database_name) diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index 0ff43cd52b..5915e47331 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -466,7 +466,7 @@ def test_list_tables( for table_name in table_list: test_catalog.create_table((database_name, table_name), table_schema_nested) - loaded_table_list = test_catalog.list_tables(database_name) + loaded_table_list = list(test_catalog.list_tables(database_name)) assert (database_name, non_iceberg_table_name) not in loaded_table_list assert (database_name, non_table_type_table_name) not in loaded_table_list @@ -488,7 +488,7 @@ def test_list_namespaces(_bucket_initialize: None, moto_endpoint_url: str, datab def test_create_namespace_no_properties(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None: test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url}) test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list properties = test_catalog.load_namespace_properties(database_name) @@ -504,7 +504,7 @@ def test_create_namespace_with_comment_and_location(_bucket_initialize: None, mo } test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url}) test_catalog.create_namespace(namespace=database_name, properties=test_properties) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list properties = test_catalog.load_namespace_properties(database_name) @@ -516,7 +516,7 @@ def test_create_namespace_with_comment_and_location(_bucket_initialize: None, mo def test_create_duplicated_namespace(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None: test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url}) test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list with pytest.raises(NamespaceAlreadyExistsError): @@ -527,11 +527,11 @@ def test_create_duplicated_namespace(_bucket_initialize: None, moto_endpoint_url def test_drop_namespace(_bucket_initialize: None, moto_endpoint_url: str, database_name: str) -> None: test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url}) test_catalog.create_namespace(namespace=database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 1 assert (database_name,) in loaded_database_list test_catalog.drop_namespace(database_name) - loaded_database_list = test_catalog.list_namespaces() + loaded_database_list = list(test_catalog.list_namespaces()) assert len(loaded_database_list) == 0 @@ -543,7 +543,7 @@ def test_drop_non_empty_namespace( test_catalog = GlueCatalog("glue", **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}/"}) test_catalog.create_namespace(namespace=database_name) test_catalog.create_table(identifier, table_schema_nested) - assert len(test_catalog.list_tables(database_name)) == 1 + assert len(list(test_catalog.list_tables(database_name))) == 1 with pytest.raises(NamespaceNotEmptyError): test_catalog.drop_namespace(database_name) diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index a36425ebea..cd117e72fd 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -1015,7 +1015,7 @@ def test_list_tables(hive_table: HiveTable) -> None: catalog._client.__enter__().get_all_tables.return_value = ["table1", "table2", "table3", "table4"] catalog._client.__enter__().get_table_objects_by_name.return_value = [tbl1, tbl2, tbl3, tbl4] - got_tables = catalog.list_tables("database") + got_tables = list(catalog.list_tables("database")) assert got_tables == [("database", "table1"), ("database", "table2")] catalog._client.__enter__().get_all_tables.assert_called_with(db_name="database") catalog._client.__enter__().get_table_objects_by_name.assert_called_with( @@ -1029,7 +1029,7 @@ def test_list_namespaces() -> None: catalog._client = MagicMock() catalog._client.__enter__().get_all_databases.return_value = ["namespace1", "namespace2"] - assert catalog.list_namespaces() == [("namespace1",), ("namespace2",)] + assert list(catalog.list_namespaces()) == [("namespace1",), ("namespace2",)] catalog._client.__enter__().get_all_databases.assert_called() diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index ed91dd15a1..f687db5b24 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -409,7 +409,29 @@ def test_list_tables_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) == [("examples", "fooshare")] + assert list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace)) == [("examples", "fooshare")] + + +def test_list_tables_paginated_200(rest_mock: Mocker) -> None: + namespace = "examples" + rest_mock.get( + f"{TEST_URI}v1/namespaces/{namespace}/tables", + json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}], "next-page-token": "page2"}, + status_code=200, + request_headers=TEST_HEADERS, + ) + rest_mock.get( + f"{TEST_URI}v1/namespaces/{namespace}/tables?pageToken=page2", + json={"identifiers": [{"namespace": ["examples"], "name": "fooshare2"}]}, + status_code=200, + request_headers=TEST_HEADERS, + ) + + assert list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace)) == [ + ("examples", "fooshare"), + ("examples", "fooshare2"), + ] + assert rest_mock.call_count == 3 def test_list_tables_200_sigv4(rest_mock: Mocker) -> None: @@ -421,9 +443,9 @@ def test_list_tables_200_sigv4(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_tables(namespace) == [ - ("examples", "fooshare") - ] + assert list( + RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_tables(namespace) + ) == [("examples", "fooshare")] assert rest_mock.called @@ -442,7 +464,7 @@ def test_list_tables_404(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) with pytest.raises(NoSuchNamespaceError) as e: - RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace) + list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_tables(namespace)) assert "Namespace does not exist" in str(e.value) @@ -455,7 +477,31 @@ def test_list_views_200(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) == [("examples", "fooshare")] + assert list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace)) == [("examples", "fooshare")] + + +def test_list_views_paginated_200(rest_mock: Mocker) -> None: + namespace = "examples" + rest_mock.get( + f"{TEST_URI}v1/namespaces/{namespace}/views", + json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}], "next-page-token": "page2"}, + status_code=200, + request_headers=TEST_HEADERS, + ) + + rest_mock.get( + f"{TEST_URI}v1/namespaces/{namespace}/views?pageToken=page2", + json={"identifiers": [{"namespace": ["examples"], "name": "fooshare2"}]}, + status_code=200, + request_headers=TEST_HEADERS, + ) + + assert list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace)) == [ + ("examples", "fooshare"), + ("examples", "fooshare2"), + ] + + assert rest_mock.call_count == 3 def test_list_views_200_sigv4(rest_mock: Mocker) -> None: @@ -467,9 +513,9 @@ def test_list_views_200_sigv4(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_views(namespace) == [ - ("examples", "fooshare") - ] + assert list( + RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_views(namespace) + ) == [("examples", "fooshare")] assert rest_mock.called @@ -488,7 +534,7 @@ def test_list_views_404(rest_mock: Mocker) -> None: request_headers=TEST_HEADERS, ) with pytest.raises(NoSuchNamespaceError) as e: - RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) + list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace)) assert "Namespace does not exist" in str(e.value) @@ -535,13 +581,40 @@ def test_list_namespaces_200(rest_mock: Mocker) -> None: status_code=200, request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces() == [ + assert list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces()) == [ + ("default",), + ("examples",), + ("fokko",), + ("system",), + ] + + +def test_list_namespaces_paginated_200(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/namespaces", + json={"namespaces": [["default"], ["examples"], ["fokko"], ["system"]], "next-page-token": "page2"}, + status_code=200, + request_headers=TEST_HEADERS, + ) + rest_mock.get( + f"{TEST_URI}v1/namespaces?pageToken=page2", + json={"namespaces": [["default2"], ["examples2"], ["jayce"], ["system2"]]}, + status_code=200, + request_headers=TEST_HEADERS, + ) + assert list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces()) == [ ("default",), ("examples",), ("fokko",), ("system",), + ("default2",), + ("examples2",), + ("jayce",), + ("system2",), ] + assert rest_mock.call_count == 3 + def test_list_namespace_with_parent_200(rest_mock: Mocker) -> None: rest_mock.get( @@ -550,7 +623,7 @@ def test_list_namespace_with_parent_200(rest_mock: Mocker) -> None: status_code=200, request_headers=TEST_HEADERS, ) - assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("accounting",)) == [ + assert list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("accounting",))) == [ ("accounting", "tax"), ] @@ -570,7 +643,7 @@ def test_list_namespace_with_parent_404(rest_mock: Mocker) -> None: ) with pytest.raises(NoSuchNamespaceError): - RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("some_namespace",)) + list(RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces(("some_namespace",))) @pytest.mark.filterwarnings( @@ -624,7 +697,7 @@ def test_list_namespaces_token_expired_success_on_retries(rest_mock: Mocker, sta # which results in the token being refreshed twice when the RestCatalog is initialized. assert tokens.call_count == 2 - assert catalog.list_namespaces() == [ + assert list(catalog.list_namespaces()) == [ ("default",), ("examples",), ("fokko",), @@ -633,7 +706,7 @@ def test_list_namespaces_token_expired_success_on_retries(rest_mock: Mocker, sta assert namespaces.call_count == 2 assert tokens.call_count == 3 - assert catalog.list_namespaces() == [ + assert list(catalog.list_namespaces()) == [ ("default",), ("examples",), ("fokko",), diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index 235951484f..f7d85730ae 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -981,11 +981,11 @@ def test_list_tables( catalog.create_namespace(namespace_2) catalog.create_table(table_identifier_1, table_schema_nested) catalog.create_table(table_identifier_2, table_schema_nested) - identifier_list = catalog.list_tables(namespace_1) + identifier_list = list(catalog.list_tables(namespace_1)) assert len(identifier_list) == 1 assert table_identifier_1 in identifier_list - identifier_list = catalog.list_tables(namespace_2) + identifier_list = list(catalog.list_tables(namespace_2)) assert len(identifier_list) == 1 assert table_identifier_2 in identifier_list @@ -1000,7 +1000,7 @@ def test_list_tables( @pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) def test_list_tables_when_missing_namespace(catalog: SqlCatalog, namespace: str) -> None: with pytest.raises(NoSuchNamespaceError): - catalog.list_tables(namespace) + list(catalog.list_tables(namespace)) @pytest.mark.parametrize( @@ -1141,17 +1141,17 @@ def test_list_namespaces(catalog: SqlCatalog) -> None: if not catalog._namespace_exists(namespace): catalog.create_namespace(namespace) - ns_list = catalog.list_namespaces() + ns_list = list(catalog.list_namespaces()) for ns in [("db",), ("db%",), ("db2",)]: assert ns in ns_list - ns_list = catalog.list_namespaces("db") + ns_list = list(catalog.list_namespaces("db")) assert sorted(ns_list) == [("db", "ns1"), ("db", "ns2")] - ns_list = catalog.list_namespaces("db.ns1") + ns_list = list(catalog.list_namespaces("db.ns1")) assert sorted(ns_list) == [("db", "ns1", "ns2")] - ns_list = catalog.list_namespaces("db.ns1.ns2") + ns_list = list(catalog.list_namespaces("db.ns1.ns2")) assert len(ns_list) == 0 @@ -1168,9 +1168,9 @@ def test_list_namespaces_fuzzy_match(catalog: SqlCatalog) -> None: if not catalog._namespace_exists(namespace): catalog.create_namespace(namespace) - assert catalog.list_namespaces("db.ns1") == [("db", "ns1", "ns2")] + assert list(catalog.list_namespaces("db.ns1")) == [("db", "ns1", "ns2")] - assert catalog.list_namespaces("db_.ns1") == [("db_", "ns1", "ns2")] + assert list(catalog.list_namespaces("db_.ns1")) == [("db_", "ns1", "ns2")] @pytest.mark.parametrize( @@ -1182,7 +1182,7 @@ def test_list_namespaces_fuzzy_match(catalog: SqlCatalog) -> None: ) def test_list_non_existing_namespaces(catalog: SqlCatalog) -> None: with pytest.raises(NoSuchNamespaceError): - catalog.list_namespaces("does_not_exist") + list(catalog.list_namespaces("does_not_exist")) @pytest.mark.parametrize(