Skip to content

refactor: consolidate snapshot expiration into MaintenanceTable #2143

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9dcb580
feat: delete orphaned files
jayceslesar Apr 29, 2025
e43505c
simpler and a test
jayceslesar Apr 29, 2025
eed5ea8
remove
jayceslesar Apr 29, 2025
8cca600
updates from review!
jayceslesar May 2, 2025
75b1240
include dry run and older than
jayceslesar May 2, 2025
6379480
add case for dry run
jayceslesar May 2, 2025
0c2822e
use .path so we get paths pack
jayceslesar May 3, 2025
aaf8fc2
actually pass in iterable
jayceslesar May 3, 2025
b09641b
capture manifest_list files
jayceslesar May 3, 2025
beec233
refactor into `all_known_files`
jayceslesar May 3, 2025
b888c56
fix type in docstring
jayceslesar May 3, 2025
ff461ed
mildly more readable
jayceslesar May 3, 2025
3b3b10e
beef up tests
jayceslesar May 3, 2025
a62c8cf
make `older_than` required
jayceslesar May 4, 2025
07cbf1b
move under `optimize` namespace
jayceslesar May 4, 2025
54e1e00
add some better logging about what was/was not deleted
jayceslesar May 4, 2025
7c780d3
Merge branch 'main' into feat/orphan-files
jayceslesar May 10, 2025
9b6c9ed
Merge branch 'main' into feat/orphan-files
jayceslesar May 13, 2025
34d10b9
rename optimize -> maintenance
jayceslesar May 17, 2025
0335957
make orphaned_files private
jayceslesar May 17, 2025
9f8145c
correctly coerce list
jayceslesar May 17, 2025
fbdcbd3
add metadata files
jayceslesar May 28, 2025
85b4ab3
Merge branch 'main' into feat/orphan-files
jayceslesar May 28, 2025
c414df8
Merge branch 'main' into feat/orphan-files
jayceslesar Jun 10, 2025
aa9d536
Merge branch 'main' into feat/orphan-files
jayceslesar Jun 21, 2025
b4c14fc
fix test
jayceslesar Jun 21, 2025
f4d98d2
allow older_than to be None
jayceslesar Jun 21, 2025
0c1e4c7
refactor: consolidate snapshot expiration into MaintenanceTable
ForeverAngry Jun 23, 2025
5fee547
refactor: consolidate snapshot expiration into MaintenanceTable
ForeverAngry Jun 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from pyiceberg.schema import Schema
from pyiceberg.table.inspect import InspectTable
from pyiceberg.table.locations import LocationProvider, load_location_provider
from pyiceberg.table.maintenance import MaintenanceTable
from pyiceberg.table.metadata import (
INITIAL_SEQUENCE_NUMBER,
TableMetadata,
Expand Down Expand Up @@ -115,7 +116,7 @@
update_table_metadata,
)
from pyiceberg.table.update.schema import UpdateSchema
from pyiceberg.table.update.snapshot import ExpireSnapshots, ManageSnapshots, UpdateSnapshot, _FastAppendFiles
from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles
from pyiceberg.table.update.spec import UpdateSpec
from pyiceberg.table.update.statistics import UpdateStatistics
from pyiceberg.transforms import IdentityTransform
Expand Down Expand Up @@ -1038,6 +1039,15 @@ def inspect(self) -> InspectTable:
"""
return InspectTable(self)

@property
def maintenance(self) -> MaintenanceTable:
"""Return the MaintenanceTable object for maintenance.

Returns:
MaintenanceTable object based on this Table.
"""
return MaintenanceTable(self)

def refresh(self) -> Table:
"""Refresh the current table metadata.

Expand Down Expand Up @@ -1210,10 +1220,6 @@ def manage_snapshots(self) -> ManageSnapshots:
"""
return ManageSnapshots(transaction=Transaction(self, autocommit=True))

def expire_snapshots(self) -> ExpireSnapshots:
"""Shorthand to run expire snapshots by id or by a timestamp."""
return ExpireSnapshots(transaction=Transaction(self, autocommit=True))

def update_statistics(self) -> UpdateStatistics:
"""
Shorthand to run statistics management operations like add statistics and remove statistics.
Expand Down
43 changes: 40 additions & 3 deletions pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from __future__ import annotations

from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple
from functools import reduce
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union

from pyiceberg.conversions import from_bytes
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary
Expand Down Expand Up @@ -665,10 +666,20 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})

def all_manifests(self) -> "pa.Table":
def all_manifests(self, snapshots: Optional[Union[list[Snapshot], list[int]]] = None) -> "pa.Table":
import pyarrow as pa

snapshots = self.tbl.snapshots()
# coerce into snapshot objects if users passes in snapshot ids
if snapshots is not None:
if isinstance(snapshots[0], int):
snapshots = [
snapshot
for snapshot_id in snapshots
if (snapshot := self.tbl.metadata.snapshot_by_id(snapshot_id)) is not None
]
else:
snapshots = self.tbl.snapshots()

if not snapshots:
return pa.Table.from_pylist([], schema=self._get_all_manifests_schema())

Expand All @@ -678,6 +689,32 @@ def all_manifests(self) -> "pa.Table":
)
return pa.concat_tables(manifests_by_snapshots)

def _all_known_files(self) -> dict[str, set[str]]:
"""Get all the known files in the table.

Returns:
dict of {file_type: set of file paths} for each file type.
"""
snapshots = self.tbl.snapshots()

_all_known_files = {}
_all_known_files["manifests"] = set(self.all_manifests(snapshots)["path"].to_pylist())
_all_known_files["manifest_lists"] = {snapshot.manifest_list for snapshot in snapshots}
_all_known_files["statistics"] = {statistic.statistics_path for statistic in self.tbl.metadata.statistics}

metadata_files = {entry.metadata_file for entry in self.tbl.metadata.metadata_log}
metadata_files.add(self.tbl.metadata_location) # Include current metadata file
_all_known_files["metadata"] = metadata_files

executor = ExecutorFactory.get_or_create()
snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots]
files_by_snapshots: Iterator[Set[str]] = executor.map(
lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids
)
_all_known_files["datafiles"] = reduce(set.union, files_by_snapshots, set())

return _all_known_files

def _all_files(self, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table":
import pyarrow as pa

Expand Down
208 changes: 208 additions & 0 deletions pyiceberg/table/maintenance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
from datetime import datetime, timedelta, timezone
from functools import reduce
from typing import TYPE_CHECKING, List, Optional, Set

from pyiceberg.utils.concurrent import ExecutorFactory

logger = logging.getLogger(__name__)


if TYPE_CHECKING:
from pyiceberg.table import Table
from pyiceberg.table.metadata import TableMetadata


class MaintenanceTable:
tbl: Table

def __init__(self, tbl: Table) -> None:
self.tbl = tbl

try:
import pyarrow as pa # noqa
except ModuleNotFoundError as e:
raise ModuleNotFoundError("For metadata operations PyArrow needs to be installed") from e

def _orphaned_files(self, location: str, older_than: Optional[timedelta] = None) -> Set[str]:
"""Get all files which are not referenced in any metadata files of an Iceberg table and can thus be considered "orphaned".

Args:
location: The location to check for orphaned files.
older_than: The time period to check for orphaned files. Defaults to 3 days.

Returns:
A set of orphaned file paths.
"""
try:
import pyarrow as pa # noqa: F401
except ModuleNotFoundError as e:
raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e

from pyarrow.fs import FileSelector, FileType

from pyiceberg.io.pyarrow import PyArrowFileIO

all_known_files = self.tbl.inspect._all_known_files()
flat_known_files: set[str] = reduce(set.union, all_known_files.values(), set())

scheme, _, _ = PyArrowFileIO.parse_location(location)
pyarrow_io = PyArrowFileIO()
fs = pyarrow_io.fs_by_scheme(scheme, None)

_, _, path = pyarrow_io.parse_location(location)
selector = FileSelector(path, recursive=True)

# filter to just files as it may return directories, and filter on time
if older_than is None:
older_than = timedelta(0)
as_of = datetime.now(timezone.utc) - older_than
all_files = [f.path for f in fs.get_file_info(selector) if f.type == FileType.File and f.mtime < as_of]

orphaned_files = set(all_files).difference(flat_known_files)

return orphaned_files

def remove_orphaned_files(self, older_than: Optional[timedelta] = None, dry_run: bool = False) -> None:
"""Remove files which are not referenced in any metadata files of an Iceberg table and can thus be considered "orphaned".

Args:
older_than: The time period to check for orphaned files. Defaults to 3 days.
dry_run: If True, only log the files that would be deleted. Defaults to False.
"""
location = self.tbl.location()
orphaned_files = self._orphaned_files(location, older_than)
logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!")
deleted_files = set()
failed_to_delete_files = set()

def _delete(file: str) -> None:
# don't error if the file doesn't exist
# still catch ctrl-c, etc.
try:
self.tbl.io.delete(file)
deleted_files.add(file)
except Exception:
failed_to_delete_files.add(file)

if orphaned_files:
if dry_run:
logger.info(f"(Dry Run) Deleted {len(orphaned_files)} orphaned files at {location}!")
else:
executor = ExecutorFactory.get_or_create()
deletes = executor.map(_delete, orphaned_files)
# exhaust
list(deletes)
logger.info(f"Deleted {len(deleted_files)} orphaned files at {location}!")
logger.info(f"Files:\n{deleted_files}")
if failed_to_delete_files:
logger.warning(f"Failed to delete {len(failed_to_delete_files)} orphaned files at {location}!")
logger.warning(f"Files:\n{failed_to_delete_files}")
else:
logger.info(f"No orphaned files found at {location}!")

def expire_snapshot_by_id(self, snapshot_id: int) -> None:
"""Expire a single snapshot by its ID.

Args:
snapshot_id: The ID of the snapshot to expire.

Raises:
ValueError: If the snapshot does not exist or is protected.
"""
with self.tbl.transaction() as txn:
# Check if snapshot exists
if txn.table_metadata.snapshot_by_id(snapshot_id) is None:
raise ValueError(f"Snapshot with ID {snapshot_id} does not exist.")

# Check if snapshot is protected
protected_ids = self._get_protected_snapshot_ids(txn.table_metadata)
if snapshot_id in protected_ids:
raise ValueError(f"Snapshot with ID {snapshot_id} is protected and cannot be expired.")

# Remove the snapshot
from pyiceberg.table.update import RemoveSnapshotsUpdate

txn._apply((RemoveSnapshotsUpdate(snapshot_ids=[snapshot_id]),))

def expire_snapshots_by_ids(self, snapshot_ids: List[int]) -> None:
"""Expire multiple snapshots by their IDs.

Args:
snapshot_ids: List of snapshot IDs to expire.

Raises:
ValueError: If any snapshot does not exist or is protected.
"""
with self.tbl.transaction() as txn:
protected_ids = self._get_protected_snapshot_ids(txn.table_metadata)

# Validate all snapshots before expiring any
for snapshot_id in snapshot_ids:
if txn.table_metadata.snapshot_by_id(snapshot_id) is None:
raise ValueError(f"Snapshot with ID {snapshot_id} does not exist.")
if snapshot_id in protected_ids:
raise ValueError(f"Snapshot with ID {snapshot_id} is protected and cannot be expired.")

# Remove all snapshots
from pyiceberg.table.update import RemoveSnapshotsUpdate

txn._apply((RemoveSnapshotsUpdate(snapshot_ids=snapshot_ids),))

def expire_snapshots_older_than(self, timestamp_ms: int) -> None:
"""Expire all unprotected snapshots with a timestamp older than a given value.

Args:
timestamp_ms: Only snapshots with timestamp_ms < this value will be expired.
"""
# First check if there are any snapshots to expire to avoid unnecessary transactions
protected_ids = self._get_protected_snapshot_ids(self.tbl.metadata)
snapshots_to_expire = []

for snapshot in self.tbl.metadata.snapshots:
if snapshot.timestamp_ms < timestamp_ms and snapshot.snapshot_id not in protected_ids:
snapshots_to_expire.append(snapshot.snapshot_id)

if snapshots_to_expire:
with self.tbl.transaction() as txn:
from pyiceberg.table.update import RemoveSnapshotsUpdate

txn._apply((RemoveSnapshotsUpdate(snapshot_ids=snapshots_to_expire),))

def _get_protected_snapshot_ids(self, table_metadata: TableMetadata) -> Set[int]:
"""Get the IDs of protected snapshots.

These are the HEAD snapshots of all branches and all tagged snapshots.
These ids are to be excluded from expiration.

Args:
table_metadata: The table metadata to check for protected snapshots.

Returns:
Set of protected snapshot IDs to exclude from expiration.
"""
from pyiceberg.table.refs import SnapshotRefType

protected_ids: Set[int] = set()
for ref in table_metadata.refs.values():
if ref.snapshot_ref_type in [SnapshotRefType.TAG, SnapshotRefType.BRANCH]:
protected_ids.add(ref.snapshot_id)
return protected_ids
Loading