diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 4da116434e..8752eafdcc 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -73,6 +73,8 @@ ) from pyiceberg.utils.config import Config, merge_config from pyiceberg.utils.properties import property_as_bool +from pyiceberg.view import View +from pyiceberg.view.metadata import ViewVersion if TYPE_CHECKING: import pyarrow as pa @@ -657,6 +659,32 @@ def drop_view(self, identifier: Union[str, Identifier]) -> None: NoSuchViewError: If a view with the given name does not exist. """ + @abstractmethod + def create_view( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + view_version: ViewVersion, + location: Optional[str] = None, + properties: Properties = EMPTY_DICT, + ) -> View: + """Create a view. + + Args: + identifier (str | Identifier): View identifier. + schema (Schema): View's schema. + location (str | None): Location for the view. Optional Argument. + partition_spec (PartitionSpec): PartitionSpec for the view. + sort_order (SortOrder): SortOrder for the view. + properties (Properties): View properties that can be a string based dictionary. + + Returns: + View: the created view instance. + + Raises: + ViewAlreadyExistsError: If a view with the name already exists. + """ + @staticmethod def identifier_to_tuple(identifier: Union[str, Identifier]) -> Identifier: """Parse an identifier to a tuple. diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 3b37762638..e441fd9806 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -63,6 +63,8 @@ ) from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties from pyiceberg.utils.properties import get_first_property_value +from pyiceberg.view import View +from pyiceberg.view.metadata import ViewVersion if TYPE_CHECKING: import pyarrow as pa @@ -538,6 +540,16 @@ def update_namespace_properties( return properties_update_summary + def create_view( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + view_version: ViewVersion, + location: Optional[str] = None, + properties: Properties = EMPTY_DICT, + ) -> View: + raise NotImplementedError + def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: raise NotImplementedError diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 88ad8aa433..4227dc3648 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -89,6 +89,8 @@ UUIDType, ) from pyiceberg.utils.properties import get_first_property_value, property_as_bool +from pyiceberg.view import View +from pyiceberg.view.metadata import ViewVersion if TYPE_CHECKING: import pyarrow as pa @@ -809,6 +811,16 @@ def update_namespace_properties( return properties_update_summary + def create_view( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + view_version: ViewVersion, + location: Optional[str] = None, + properties: Properties = EMPTY_DICT, + ) -> View: + raise NotImplementedError + def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: raise NotImplementedError diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 09437dd1b6..8e7c059aec 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -116,6 +116,8 @@ UUIDType, ) from pyiceberg.utils.properties import property_as_bool, property_as_float +from pyiceberg.view import View +from pyiceberg.view.metadata import ViewVersion if TYPE_CHECKING: import pyarrow as pa @@ -439,6 +441,16 @@ def create_table( return self._convert_hive_into_iceberg(hive_table) + def create_view( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + view_version: ViewVersion, + location: Optional[str] = None, + properties: Properties = EMPTY_DICT, + ) -> View: + raise NotImplementedError + def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. diff --git a/pyiceberg/catalog/noop.py b/pyiceberg/catalog/noop.py index 39a0b382a7..fb1dc77e6a 100644 --- a/pyiceberg/catalog/noop.py +++ b/pyiceberg/catalog/noop.py @@ -37,6 +37,8 @@ TableUpdate, ) from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties +from pyiceberg.view import View +from pyiceberg.view.metadata import ViewVersion if TYPE_CHECKING: import pyarrow as pa @@ -126,5 +128,15 @@ def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: def view_exists(self, identifier: Union[str, Identifier]) -> bool: raise NotImplementedError + def create_view( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + view_version: ViewVersion, + location: Optional[str] = None, + properties: Properties = EMPTY_DICT, + ) -> View: + raise NotImplementedError + def drop_view(self, identifier: Union[str, Identifier]) -> None: raise NotImplementedError diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 6215d17a4f..a65ca7bb64 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -53,6 +53,7 @@ NoSuchViewError, TableAlreadyExistsError, UnauthorizedError, + ViewAlreadyExistsError, ) from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids @@ -75,6 +76,8 @@ from pyiceberg.types import transform_dict_value_to_str from pyiceberg.utils.deprecated import deprecation_message from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool +from pyiceberg.view import View +from pyiceberg.view.metadata import ViewMetadata, ViewVersion if TYPE_CHECKING: import pyarrow as pa @@ -100,6 +103,7 @@ class Endpoints: get_token: str = "oauth/tokens" rename_table: str = "tables/rename" list_views: str = "namespaces/{namespace}/views" + create_view: str = "namespaces/{namespace}/views" drop_view: str = "namespaces/{namespace}/views/{view}" view_exists: str = "namespaces/{namespace}/views/{view}" @@ -157,6 +161,12 @@ class TableResponse(IcebergBaseModel): config: Properties = Field(default_factory=dict) +class ViewResponse(IcebergBaseModel): + metadata_location: Optional[str] = Field(alias="metadata-location", default=None) + metadata: ViewMetadata + config: Properties = Field(default_factory=dict) + + class CreateTableRequest(IcebergBaseModel): name: str = Field() location: Optional[str] = Field() @@ -172,6 +182,18 @@ def transform_properties_dict_value_to_str(cls, properties: Properties) -> Dict[ return transform_dict_value_to_str(properties) +class CreateViewRequest(IcebergBaseModel): + name: str = Field() + location: Optional[str] = Field() + view_schema: Schema = Field(alias="schema") + view_version: ViewVersion = Field(alias="view-version") + properties: Properties = Field(default_factory=dict) + + @field_validator("properties", mode="before") + def transform_properties_dict_value_to_str(cls, properties: Properties) -> Dict[str, str]: + return transform_dict_value_to_str(properties) + + class RegisterTableRequest(IcebergBaseModel): name: str metadata_location: str = Field(..., alias="metadata-location") @@ -454,6 +476,12 @@ def _response_to_staged_table(self, identifier_tuple: Tuple[str, ...], table_res catalog=self, ) + def _response_to_view(self, identifier_tuple: Tuple[str, ...], view_response: ViewResponse) -> View: + return View( + identifier=identifier_tuple, + metadata=view_response.metadata, + ) + def _refresh_token(self) -> None: # Reactive token refresh is atypical - we should proactively refresh tokens in a separate thread # instead of retrying on Auth Exceptions. Keeping refresh behavior for the LegacyOAuth2AuthManager @@ -551,6 +579,44 @@ def create_table_transaction( staged_table = self._response_to_staged_table(self.identifier_to_tuple(identifier), table_response) return CreateTableTransaction(staged_table) + @retry(**_RETRY_ARGS) + def create_view( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + view_version: ViewVersion, + location: Optional[str] = None, + properties: Properties = EMPTY_DICT, + ) -> View: + iceberg_schema = self._convert_schema_if_needed(schema) + fresh_schema = assign_fresh_schema_ids(iceberg_schema) + + namespace_and_view = self._split_identifier_for_path(identifier, IdentifierKind.VIEW) + if location: + location = location.rstrip("/") + + request = CreateViewRequest( + name=namespace_and_view["view"], + location=location, + view_schema=fresh_schema, + view_version=view_version, + properties=properties, + ) + + serialized_json = request.model_dump_json().encode(UTF8) + response = self._session.post( + self.url(Endpoints.create_view, namespace=namespace_and_view["namespace"]), + data=serialized_json, + ) + + try: + response.raise_for_status() + except HTTPError as exc: + _handle_non_200_response(exc, {409: ViewAlreadyExistsError}) + + view_response = ViewResponse.model_validate_json(response.text) + return self._response_to_view(self.identifier_to_tuple(identifier), view_response) + @retry(**_RETRY_ARGS) def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 880a4db481..cf7fa97780 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -72,6 +72,8 @@ ) from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties from pyiceberg.types import strtobool +from pyiceberg.view import View +from pyiceberg.view.metadata import ViewVersion if TYPE_CHECKING: import pyarrow as pa @@ -722,6 +724,16 @@ def update_namespace_properties( session.commit() return properties_update_summary + def create_view( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + view_version: ViewVersion, + location: Optional[str] = None, + properties: Properties = EMPTY_DICT, + ) -> View: + raise NotImplementedError + def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: raise NotImplementedError diff --git a/pyiceberg/exceptions.py b/pyiceberg/exceptions.py index c80f104e46..cec47e5911 100644 --- a/pyiceberg/exceptions.py +++ b/pyiceberg/exceptions.py @@ -20,6 +20,10 @@ class TableAlreadyExistsError(Exception): """Raised when creating a table with a name that already exists.""" +class ViewAlreadyExistsError(Exception): + """Raised when creating a view with a name that already exists.""" + + class NamespaceNotEmptyError(Exception): """Raised when a name-space being dropped is not empty.""" diff --git a/pyiceberg/typedef.py b/pyiceberg/typedef.py index d9ace9d971..a98c0c72bd 100644 --- a/pyiceberg/typedef.py +++ b/pyiceberg/typedef.py @@ -208,3 +208,4 @@ def __hash__(self) -> int: TableVersion: TypeAlias = Literal[1, 2, 3] +ViewVersion: TypeAlias = Literal[1] diff --git a/pyiceberg/view/__init__.py b/pyiceberg/view/__init__.py new file mode 100644 index 0000000000..4ddb21a112 --- /dev/null +++ b/pyiceberg/view/__init__.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import ( + Any, +) + +from pyiceberg.typedef import Identifier +from pyiceberg.view.metadata import ViewMetadata + + +class View: + """An Iceberg view.""" + + _identifier: Identifier + metadata: ViewMetadata + + def __init__( + self, + identifier: Identifier, + metadata: ViewMetadata, + ) -> None: + self._identifier = identifier + self.metadata = metadata + + def name(self) -> Identifier: + """Return the identifier of this view.""" + return self._identifier + + def __eq__(self, other: Any) -> bool: + """Return the equality of two instances of the View class.""" + return self.name() == other.name() and self.metadata == other.metadata if isinstance(other, View) else False diff --git a/pyiceberg/view/metadata.py b/pyiceberg/view/metadata.py new file mode 100644 index 0000000000..3061d5c8c6 --- /dev/null +++ b/pyiceberg/view/metadata.py @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Dict, List, Literal, Optional + +from pydantic import Field, RootModel, field_validator + +from pyiceberg.schema import Schema +from pyiceberg.typedef import IcebergBaseModel, Identifier, Properties +from pyiceberg.typedef import ViewVersion as ViewVersionLiteral +from pyiceberg.types import transform_dict_value_to_str + + +class SQLViewRepresentation(IcebergBaseModel): + """Represents the SQL query that defines the view.""" + + type: Literal["sql"] = Field() + """A string that indicates the type of representation. Must be `sql`""" + sql: str = Field() + """A string that contains the SQL text of the view definition.""" + dialect: str = Field() + """The dialect of the SQL, e.g. `spark`, `trino`, `presto`.""" + + +class ViewRepresentation(IcebergBaseModel, RootModel): + root: SQLViewRepresentation + + +class ViewVersion(IcebergBaseModel): + """A version of the view definition.""" + + version_id: int = Field(alias="version-id") + """ID for the version""" + schema_id: int = Field(alias="schema-id") + """ID of the schema for the view version""" + timestamp_ms: int = Field(alias="timestamp-ms") + """Timestamp when the version was created (ms from epoch)""" + summary: Dict[str, str] = Field() + """A string to string map of summary metadata about the version""" + representations: List[ViewRepresentation] = Field() + """A list of representations for the view definition""" + default_catalog: Optional[str] = Field(alias="default-catalog", default=None) + """Catalog name to use when a reference in the SELECT does not contain a catalog""" + default_namespace: Identifier = Field(alias="default-namespace") + """Namespace to use when a reference in the SELECT is a single identifier""" + + +class ViewHistoryEntry(IcebergBaseModel): + """A log entry of a view version change.""" + + timestamp_ms: int = Field(alias="timestamp-ms") + """Timestamp when the version was created (ms from epoch)""" + version_id: int = Field(alias="version-id") + """ID for the version""" + + +class ViewMetadata(IcebergBaseModel): + """The metadata for a view.""" + + view_uuid: str = Field(alias="view-uuid") + """A UUID that identifies the view, generated when the view is created.""" + format_version: ViewVersionLiteral = Field(alias="format-version", ge=1, le=1) + """An integer version number for the view format; must be 1""" + location: str = Field() + """The view's base location; used to create metadata file locations""" + schemas: List[Schema] = Field() + """A list of known schemas""" + current_version_id: int = Field(alias="current-version-id") + """ID of the current version of the view (version-id)""" + versions: List[ViewVersion] = Field() + """A list of known versions of the view""" + version_log: List[ViewHistoryEntry] = Field(alias="version-log") + """A list of version log entries""" + properties: Properties = Field(default_factory=dict) + """A string to string map of view properties""" + + @field_validator("properties", mode="before") + def transform_properties_dict_value_to_str(cls, properties: Properties) -> Dict[str, str]: + return transform_dict_value_to_str(properties) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index ed91dd15a1..08ae47b3cc 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -36,6 +36,7 @@ OAuthError, ServerError, TableAlreadyExistsError, + ViewAlreadyExistsError, ) from pyiceberg.io import load_file_io from pyiceberg.partitioning import PartitionField, PartitionSpec @@ -46,6 +47,8 @@ from pyiceberg.transforms import IdentityTransform, TruncateTransform from pyiceberg.typedef import RecursiveDict from pyiceberg.utils.config import Config +from pyiceberg.view import View +from pyiceberg.view.metadata import ViewMetadata, ViewVersion TEST_URI = "https://iceberg-test-catalog/" TEST_CREDENTIALS = "client:secret" @@ -102,6 +105,18 @@ def example_table_metadata_no_snapshot_v1_rest_json(example_table_metadata_no_sn } +@pytest.fixture +def example_view_metadata_rest_json(example_view_metadata_v1: Dict[str, Any]) -> Dict[str, Any]: + return { + "metadata-location": "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", + "metadata": example_view_metadata_v1, + "config": { + "client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory", + "region": "us-west-2", + }, + } + + @pytest.fixture def rest_mock(requests_mock: Mocker) -> Mocker: """Takes the default requests_mock and adds the config endpoint to it @@ -1162,6 +1177,76 @@ def test_create_table_409(rest_mock: Mocker, table_schema_simple: Schema) -> Non assert "Table already exists" in str(e.value) +def test_create_view_200(rest_mock: Mocker, table_schema_simple: Schema, example_view_metadata_rest_json: Dict[str, Any]) -> None: + rest_mock.post( + f"{TEST_URI}v1/namespaces/fokko/views", + json=example_view_metadata_rest_json, + status_code=200, + request_headers=TEST_HEADERS, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + actual = catalog.create_view( + identifier=("fokko", "fokko2"), + schema=table_schema_simple, + view_version=ViewVersion( + version_id=1, + timestamp_ms=12345, + schema_id=1, + summary={"engine-name": "spark", "engineVersion": "3.3"}, + representations=[ + { + "type": "sql", + "sql": "SELECT * FROM prod.db.table", + "dialect": "spark", + } + ], + default_namespace=["default"], + ), + location=None, + properties={"owner": "fokko"}, + ) + expected = View( + identifier=("fokko", "fokko2"), + metadata=ViewMetadata(**example_view_metadata_rest_json["metadata"]), + ) + assert actual == expected + + +def test_create_view_409( + rest_mock: Mocker, + table_schema_simple: Schema, +) -> None: + rest_mock.post( + f"{TEST_URI}v1/namespaces/fokko/views", + json={ + "error": { + "message": "View already exists: fokko.already_exists in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e", + "type": "AlreadyExistsException", + "code": 409, + } + }, + status_code=409, + request_headers=TEST_HEADERS, + ) + + with pytest.raises(ViewAlreadyExistsError) as e: + RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).create_view( + identifier=("fokko", "fokko2"), + schema=table_schema_simple, + view_version=ViewVersion( + version_id=1, + timestamp_ms=12345, + schema_id=1, + summary={"engine-name": "spark", "engineVersion": "3.3"}, + representations=[], + default_namespace=[], + ), + location=None, + properties={"owner": "fokko"}, + ) + assert "View already exists" in str(e.value) + + def test_create_table_if_not_exists_200( rest_mock: Mocker, table_schema_simple: Schema, example_table_metadata_no_snapshot_v1_rest_json: Dict[str, Any] ) -> None: diff --git a/tests/conftest.py b/tests/conftest.py index a584f98c10..9082d845af 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1127,6 +1127,45 @@ def table_metadata_v2_with_statistics() -> Dict[str, Any]: return TABLE_METADATA_V2_WITH_STATISTICS +@pytest.fixture +def example_view_metadata_v1() -> Dict[str, Any]: + return { + "view-uuid": "a20125c8-7284-442c-9aea-15fee620737c", + "format-version": 1, + "location": "s3://bucket/test/location/test_view", + "current-version-id": 1, + "versions": [ + { + "version-id": 1, + "timestamp-ms": 1602638573874, + "schema-id": 1, + "summary": {"engine-name": "spark", "engineVersion": "3.3"}, + "representations": [ + { + "type": "sql", + "sql": "SELECT * FROM prod.db.table", + "dialect": "spark", + } + ], + "default-namespace": ["default"], + } + ], + "schemas": [ + { + "type": "struct", + "schema-id": 1, + "fields": [ + {"id": 1, "name": "x", "required": True, "type": "long"}, + {"id": 2, "name": "y", "required": True, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": True, "type": "long"}, + ], + } + ], + "version-log": [{"timestamp-ms": 1602638573874, "version-id": 1}], + "properties": {"comment": "this is a test view"}, + } + + @pytest.fixture def example_table_metadata_v3() -> Dict[str, Any]: return EXAMPLE_TABLE_METADATA_V3 diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 033a9f7c0d..3dd5f3545e 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -59,6 +59,7 @@ NestedField, StringType, ) +from pyiceberg.view.metadata import SQLViewRepresentation, ViewVersion from utils import _create_table @@ -1519,6 +1520,45 @@ def test_table_v1_with_null_nested_namespace(session_catalog: Catalog, arrow_tab session_catalog.drop_table(identifier) +@pytest.mark.integration +def test_create_view( + spark: SparkSession, + session_catalog: Catalog, +) -> None: + # Create a view using the REST Catalog. + identifier = "default.some_view" + schema = pa.schema([pa.field("some_col", pa.int32())]) + view_version = ViewVersion( + version_id=1, + schema_id=1, + timestamp_ms=int(time.time() * 1000), + summary={}, + representations=[ + SQLViewRepresentation( + type="sql", + sql="SELECT 1 as some_col", + dialect="spark", + ) + ], + default_namespace=["default"], + ) + session_catalog.create_view( + identifier=identifier, + schema=schema, + view_version=view_version, + ) + + # Ensure the view exists. + assert session_catalog.view_exists(identifier) + + # Query the view in spark to ensure it was properly created. + df = spark.table(identifier) + assert df.count() == 1 + assert df.collect()[0].some_col == 1 + + session_catalog.drop_view(identifier) # clean up + + @pytest.mark.integration def test_view_exists( spark: SparkSession,