Skip to content

Add create_view to REST Catalog #2154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
)
from pyiceberg.utils.config import Config, merge_config
from pyiceberg.utils.properties import property_as_bool
from pyiceberg.view import View
from pyiceberg.view.metadata import ViewVersion

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -657,6 +659,32 @@ def drop_view(self, identifier: Union[str, Identifier]) -> None:
NoSuchViewError: If a view with the given name does not exist.
"""

@abstractmethod
def create_view(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
view_version: ViewVersion,
location: Optional[str] = None,
properties: Properties = EMPTY_DICT,
) -> View:
"""Create a view.

Args:
identifier (str | Identifier): View identifier.
schema (Schema): View's schema.
location (str | None): Location for the view. Optional Argument.
partition_spec (PartitionSpec): PartitionSpec for the view.
sort_order (SortOrder): SortOrder for the view.
properties (Properties): View properties that can be a string based dictionary.

Returns:
View: the created view instance.

Raises:
ViewAlreadyExistsError: If a view with the name already exists.
"""

@staticmethod
def identifier_to_tuple(identifier: Union[str, Identifier]) -> Identifier:
"""Parse an identifier to a tuple.
Expand Down
12 changes: 12 additions & 0 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
)
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from pyiceberg.utils.properties import get_first_property_value
from pyiceberg.view import View
from pyiceberg.view.metadata import ViewVersion

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -538,6 +540,16 @@ def update_namespace_properties(

return properties_update_summary

def create_view(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
view_version: ViewVersion,
location: Optional[str] = None,
properties: Properties = EMPTY_DICT,
) -> View:
raise NotImplementedError

def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
raise NotImplementedError

Expand Down
12 changes: 12 additions & 0 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@
UUIDType,
)
from pyiceberg.utils.properties import get_first_property_value, property_as_bool
from pyiceberg.view import View
from pyiceberg.view.metadata import ViewVersion

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -809,6 +811,16 @@ def update_namespace_properties(

return properties_update_summary

def create_view(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
view_version: ViewVersion,
location: Optional[str] = None,
properties: Properties = EMPTY_DICT,
) -> View:
raise NotImplementedError

def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
raise NotImplementedError

Expand Down
12 changes: 12 additions & 0 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@
UUIDType,
)
from pyiceberg.utils.properties import property_as_bool, property_as_float
from pyiceberg.view import View
from pyiceberg.view.metadata import ViewVersion

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -439,6 +441,16 @@ def create_table(

return self._convert_hive_into_iceberg(hive_table)

def create_view(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
view_version: ViewVersion,
location: Optional[str] = None,
properties: Properties = EMPTY_DICT,
) -> View:
raise NotImplementedError

def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.

Expand Down
12 changes: 12 additions & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
TableUpdate,
)
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from pyiceberg.view import View
from pyiceberg.view.metadata import ViewVersion

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -126,5 +128,15 @@ def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
def view_exists(self, identifier: Union[str, Identifier]) -> bool:
raise NotImplementedError

def create_view(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
view_version: ViewVersion,
location: Optional[str] = None,
properties: Properties = EMPTY_DICT,
) -> View:
raise NotImplementedError

def drop_view(self, identifier: Union[str, Identifier]) -> None:
raise NotImplementedError
66 changes: 66 additions & 0 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
NoSuchViewError,
TableAlreadyExistsError,
UnauthorizedError,
ViewAlreadyExistsError,
)
from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec, assign_fresh_partition_spec_ids
Expand All @@ -75,6 +76,8 @@
from pyiceberg.types import transform_dict_value_to_str
from pyiceberg.utils.deprecated import deprecation_message
from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool
from pyiceberg.view import View
from pyiceberg.view.metadata import ViewMetadata, ViewVersion

if TYPE_CHECKING:
import pyarrow as pa
Expand All @@ -100,6 +103,7 @@ class Endpoints:
get_token: str = "oauth/tokens"
rename_table: str = "tables/rename"
list_views: str = "namespaces/{namespace}/views"
create_view: str = "namespaces/{namespace}/views"
drop_view: str = "namespaces/{namespace}/views/{view}"
view_exists: str = "namespaces/{namespace}/views/{view}"

Expand Down Expand Up @@ -157,6 +161,12 @@ class TableResponse(IcebergBaseModel):
config: Properties = Field(default_factory=dict)


class ViewResponse(IcebergBaseModel):
metadata_location: Optional[str] = Field(alias="metadata-location", default=None)
metadata: ViewMetadata
config: Properties = Field(default_factory=dict)


class CreateTableRequest(IcebergBaseModel):
name: str = Field()
location: Optional[str] = Field()
Expand All @@ -172,6 +182,18 @@ def transform_properties_dict_value_to_str(cls, properties: Properties) -> Dict[
return transform_dict_value_to_str(properties)


class CreateViewRequest(IcebergBaseModel):
name: str = Field()
location: Optional[str] = Field()
view_schema: Schema = Field(alias="schema")
view_version: ViewVersion = Field(alias="view-version")
properties: Properties = Field(default_factory=dict)

@field_validator("properties", mode="before")
def transform_properties_dict_value_to_str(cls, properties: Properties) -> Dict[str, str]:
return transform_dict_value_to_str(properties)


class RegisterTableRequest(IcebergBaseModel):
name: str
metadata_location: str = Field(..., alias="metadata-location")
Expand Down Expand Up @@ -454,6 +476,12 @@ def _response_to_staged_table(self, identifier_tuple: Tuple[str, ...], table_res
catalog=self,
)

def _response_to_view(self, identifier_tuple: Tuple[str, ...], view_response: ViewResponse) -> View:
return View(
identifier=identifier_tuple,
metadata=view_response.metadata,
)

def _refresh_token(self) -> None:
# Reactive token refresh is atypical - we should proactively refresh tokens in a separate thread
# instead of retrying on Auth Exceptions. Keeping refresh behavior for the LegacyOAuth2AuthManager
Expand Down Expand Up @@ -551,6 +579,44 @@ def create_table_transaction(
staged_table = self._response_to_staged_table(self.identifier_to_tuple(identifier), table_response)
return CreateTableTransaction(staged_table)

@retry(**_RETRY_ARGS)
def create_view(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
view_version: ViewVersion,
location: Optional[str] = None,
properties: Properties = EMPTY_DICT,
) -> View:
iceberg_schema = self._convert_schema_if_needed(schema)
fresh_schema = assign_fresh_schema_ids(iceberg_schema)

namespace_and_view = self._split_identifier_for_path(identifier, IdentifierKind.VIEW)
if location:
location = location.rstrip("/")

request = CreateViewRequest(
name=namespace_and_view["view"],
location=location,
view_schema=fresh_schema,
view_version=view_version,
properties=properties,
)

serialized_json = request.model_dump_json().encode(UTF8)
response = self._session.post(
self.url(Endpoints.create_view, namespace=namespace_and_view["namespace"]),
data=serialized_json,
)

try:
response.raise_for_status()
except HTTPError as exc:
_handle_non_200_response(exc, {409: ViewAlreadyExistsError})

view_response = ViewResponse.model_validate_json(response.text)
return self._response_to_view(self.identifier_to_tuple(identifier), view_response)

@retry(**_RETRY_ARGS)
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.
Expand Down
12 changes: 12 additions & 0 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
)
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from pyiceberg.types import strtobool
from pyiceberg.view import View
from pyiceberg.view.metadata import ViewVersion

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -722,6 +724,16 @@ def update_namespace_properties(
session.commit()
return properties_update_summary

def create_view(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
view_version: ViewVersion,
location: Optional[str] = None,
properties: Properties = EMPTY_DICT,
) -> View:
raise NotImplementedError

def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
raise NotImplementedError

Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ class TableAlreadyExistsError(Exception):
"""Raised when creating a table with a name that already exists."""


class ViewAlreadyExistsError(Exception):
"""Raised when creating a view with a name that already exists."""


class NamespaceNotEmptyError(Exception):
"""Raised when a name-space being dropped is not empty."""

Expand Down
1 change: 1 addition & 0 deletions pyiceberg/typedef.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,4 @@ def __hash__(self) -> int:


TableVersion: TypeAlias = Literal[1, 2, 3]
ViewVersion: TypeAlias = Literal[1]
49 changes: 49 additions & 0 deletions pyiceberg/view/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import (
Any,
)

from pydantic import Field

from pyiceberg.typedef import Identifier
from pyiceberg.view.metadata import ViewMetadata


class View:
"""An Iceberg view."""

_identifier: Identifier
metadata: ViewMetadata

def __init__(
self,
identifier: Identifier,
metadata: ViewMetadata,
) -> None:
self._identifier = identifier
self.metadata = metadata

def name(self) -> Identifier:
"""Return the identifier of this view."""
return self._identifier

def __eq__(self, other: Any) -> bool:
"""Return the equality of two instances of the View class."""
return self.name() == other.name() and self.metadata == other.metadata if isinstance(other, View) else False
Loading