diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index b8a6ea45c2..aa2556626a 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -995,701 +995,145 @@ readable_metrics: [ [6.0989]] ``` -!!! info - Content refers to type of content stored by the data file: `0` - `Data`, `1` - `Position Deletes`, `2` - `Equality Deletes` +## Table Maintenance -To show only data files or delete files in the current snapshot, use `table.inspect.data_files()` and `table.inspect.delete_files()` respectively. +PyIceberg provides a set of maintenance utilities to help keep your tables healthy, efficient, and resilient. These operations are available via the `MaintenanceTable` class and are essential for managing metadata, reclaiming space, and ensuring operational safety. -## Add Files +### Use Cases -Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. +- **Deduplicate Data Files**: Remove duplicate references to the same physical data file, which can occur due to concurrent writes, manual file additions, or recovery from failures. +- **Snapshot Retention**: Control the number and age of snapshots retained for rollback, auditing, and space management. +- **Safe Expiration**: Ensure that protected snapshots (e.g., branch/tag heads) are never accidentally removed. -```python -# Given that these parquet files have schema consistent with the Iceberg table - -file_paths = [ - "s3a://warehouse/default/existing-1.parquet", - "s3a://warehouse/default/existing-2.parquet", -] - -# They can be added to the table without rewriting them - -tbl.add_files(file_paths=file_paths) - -# A new snapshot is committed to the table with manifests pointing to the existing parquet files -``` - - - -!!! note "Name Mapping" - Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one. - -!!! note "Partitions" - `add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). Please note that if the column statistics of the `PartitionField`'s source column are not present in the parquet metadata, the partition value is inferred as `None`. - -!!! warning "Maintenance Operations" - Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them. - - - -## Schema evolution - -PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overridden). - -In the examples below, the `.update_schema()` is called from the table itself. - -```python -with table.update_schema() as update: - update.add_column("some_field", IntegerType(), "doc") -``` - -You can also initiate a transaction if you want to make more changes than just evolving the schema: - -```python -with table.transaction() as transaction: - with transaction.update_schema() as update_schema: - update.add_column("some_other_field", IntegerType(), "doc") - # ... Update properties etc -``` - -### Union by Name - -Using `.union_by_name()` you can merge another schema into an existing schema without having to worry about field-IDs: - -```python -from pyiceberg.catalog import load_catalog -from pyiceberg.schema import Schema -from pyiceberg.types import NestedField, StringType, DoubleType, LongType - -catalog = load_catalog() - -schema = Schema( - NestedField(1, "city", StringType(), required=False), - NestedField(2, "lat", DoubleType(), required=False), - NestedField(3, "long", DoubleType(), required=False), -) - -table = catalog.create_table("default.locations", schema) - -new_schema = Schema( - NestedField(1, "city", StringType(), required=False), - NestedField(2, "lat", DoubleType(), required=False), - NestedField(3, "long", DoubleType(), required=False), - NestedField(10, "population", LongType(), required=False), -) - -with table.update_schema() as update: - update.union_by_name(new_schema) -``` - -Now the table has the union of the two schemas `print(table.schema())`: - -```python -table { - 1: city: optional string - 2: lat: optional double - 3: long: optional double - 4: population: optional long -} -``` - -### Add column - -Using `add_column` you can add a column, without having to worry about the field-id: - -```python -with table.update_schema() as update: - update.add_column("retries", IntegerType(), "Number of retries to place the bid") - # In a struct - update.add_column("details", StructType()) - -with table.update_schema() as update: - update.add_column(("details", "confirmed_by"), StringType(), "Name of the exchange") -``` - -A complex type must exist before columns can be added to it. Fields in complex types are added in a tuple. - -### Rename column - -Renaming a field in an Iceberg table is simple: - -```python -with table.update_schema() as update: - update.rename_column("retries", "num_retries") - # This will rename `confirmed_by` to `processed_by` in the `details` struct - update.rename_column(("details", "confirmed_by"), "processed_by") -``` - -### Move column - -Move order of fields: - -```python -with table.update_schema() as update: - update.move_first("symbol") - # This will move `bid` after `ask` - update.move_after("bid", "ask") - # This will move `confirmed_by` before `exchange` in the `details` struct - update.move_before(("details", "confirmed_by"), ("details", "exchange")) -``` - -### Update column - -Update a fields' type, description or required. - -```python -with table.update_schema() as update: - # Promote a float to a double - update.update_column("bid", field_type=DoubleType()) - # Make a field optional - update.update_column("symbol", required=False) - # Update the documentation - update.update_column("symbol", doc="Name of the share on the exchange") -``` - -Be careful, some operations are not compatible, but can still be done at your own risk by setting `allow_incompatible_changes`: - -```python -with table.update_schema(allow_incompatible_changes=True) as update: - # Incompatible change, cannot require an optional field - update.update_column("symbol", required=True) -``` - -### Delete column - -Delete a field, careful this is a incompatible change (readers/writers might expect this field): - -```python -with table.update_schema(allow_incompatible_changes=True) as update: - update.delete_column("some_field") - # In a struct - update.delete_column(("details", "confirmed_by")) -``` - -## Partition evolution - -PyIceberg supports partition evolution. See the [partition evolution](https://iceberg.apache.org/spec/#partition-evolution) -for more details. - -The API to use when evolving partitions is the `update_spec` API on the table. - -```python -with table.update_spec() as update: - update.add_field("id", BucketTransform(16), "bucketed_id") - update.add_field("event_ts", DayTransform(), "day_ts") -``` - -Updating the partition spec can also be done as part of a transaction with other operations. - -```python -with table.transaction() as transaction: - with transaction.update_spec() as update_spec: - update_spec.add_field("id", BucketTransform(16), "bucketed_id") - update_spec.add_field("event_ts", DayTransform(), "day_ts") - # ... Update properties etc -``` - -### Add fields - -New partition fields can be added via the `add_field` API which takes in the field name to partition on, -the partition transform, and an optional partition name. If the partition name is not specified, -one will be created. - -```python -with table.update_spec() as update: - update.add_field("id", BucketTransform(16), "bucketed_id") - update.add_field("event_ts", DayTransform(), "day_ts") - # identity is a shortcut API for adding an IdentityTransform - update.identity("some_field") -``` - -### Remove fields - -Partition fields can also be removed via the `remove_field` API if it no longer makes sense to partition on those fields. - -```python -with table.update_spec() as update: - # Remove the partition field with the name - update.remove_field("some_partition_name") -``` - -### Rename fields - -Partition fields can also be renamed via the `rename_field` API. - -```python -with table.update_spec() as update: - # Rename the partition field with the name bucketed_id to sharded_id - update.rename_field("bucketed_id", "sharded_id") -``` - -## Table properties - -Set and remove properties through the `Transaction` API: - -```python -with table.transaction() as transaction: - transaction.set_properties(abc="def") - -assert table.properties == {"abc": "def"} - -with table.transaction() as transaction: - transaction.remove_properties("abc") - -assert table.properties == {} -``` - -Or, without context manager: - -```python -table = table.transaction().set_properties(abc="def").commit_transaction() - -assert table.properties == {"abc": "def"} - -table = table.transaction().remove_properties("abc").commit_transaction() - -assert table.properties == {} -``` - -## Snapshot properties - -Optionally, Snapshot properties can be set while writing to a table using `append` or `overwrite` API: - -```python -tbl.append(df, snapshot_properties={"abc": "def"}) - -# or - -tbl.overwrite(df, snapshot_properties={"abc": "def"}) - -assert tbl.metadata.snapshots[-1].summary["abc"] == "def" -``` - -## Snapshot Management - -Manage snapshots with operations through the `Table` API: - -```python -# To run a specific operation -table.manage_snapshots().create_tag(snapshot_id, "tag123").commit() -# To run multiple operations -table.manage_snapshots() - .create_tag(snapshot_id1, "tag123") - .create_tag(snapshot_id2, "tag456") - .commit() -# Operations are applied on commit. -``` - -You can also use context managers to make more changes: - -```python -with table.manage_snapshots() as ms: - ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789") -``` - -## Views - -PyIceberg supports view operations. - -### Check if a view exists - -```python -from pyiceberg.catalog import load_catalog - -catalog = load_catalog("default") -catalog.view_exists("default.bar") -``` - -## Table Statistics Management - -Manage table statistics with operations through the `Table` API: - -```python -# To run a specific operation -table.update_statistics().set_statistics(statistics_file=statistics_file).commit() -# To run multiple operations -table.update_statistics() - .set_statistics(statistics_file1) - .remove_statistics(snapshot_id2) - .commit() -# Operations are applied on commit. -``` - -You can also use context managers to make more changes: +--- -```python -with table.update_statistics() as update: - update.set_statistics(statistics_file) - update.remove_statistics(snapshot_id2) -``` +### Deduplicate Data Files -## Query the data +Duplicate data file references can occur in Iceberg tables, leading to wasted storage and potential confusion. The `deduplicate_data_files` method scans the table for duplicate `DataFile` entries (i.e., multiple metadata entries pointing to the same Parquet file) and removes the extras. -To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID: +#### Example: Remove duplicate data files ```python -from pyiceberg.catalog import load_catalog -from pyiceberg.expressions import GreaterThanOrEqual - -catalog = load_catalog("default") -table = catalog.load_table("nyc.taxis") - -scan = table.scan( - row_filter=GreaterThanOrEqual("trip_distance", 10.0), - selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"), - limit=100, -) - -# Or filter using a string predicate -scan = table.scan( - row_filter="trip_distance > 10.0", -) - -[task.file.file_path for task in scan.plan_files()] -``` - -The low level API `plan_files` methods returns a set of tasks that provide the files that might contain matching rows: +from pyiceberg.table.maintenance import MaintenanceTable -```json -[ - "s3://warehouse/wh/nyc/taxis/data/00003-4-42464649-92dd-41ad-b83b-dea1a2fe4b58-00001.parquet" -] +maintenance = MaintenanceTable(table) +removed_files = maintenance.deduplicate_data_files() +print(f"Removed {len(removed_files)} duplicate data files") ``` -In this case it is up to the engine itself to filter the file itself. Below, `to_arrow()` and `to_duckdb()` that already do this for you. - -### Apache Arrow - - - -!!! note "Requirements" - This requires [`pyarrow` to be installed](index.md). - - - -Using PyIceberg it is filter out data from a huge table and pull it into a PyArrow table: +#### Use Case: Why deduplication is needed -```python -table.scan( - row_filter=GreaterThanOrEqual("trip_distance", 10.0), - selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"), -).to_arrow() -``` +- **Concurrent Writes**: Two writers may commit the same file in different snapshots. +- **Manual File Addition**: Files added via `add_files` or recovery scripts may be referenced more than once. +- **Metadata Recovery**: After a failed commit or restore, duplicate references may exist. -This will return a PyArrow table: +#### Visual Example -```python -pyarrow.Table -VendorID: int64 -tpep_pickup_datetime: timestamp[us, tz=+00:00] -tpep_dropoff_datetime: timestamp[us, tz=+00:00] ----- -VendorID: [[2,1,2,1,1,...,2,2,2,2,2],[2,1,1,1,2,...,1,1,2,1,2],...,[2,2,2,2,2,...,2,6,6,2,2],[2,2,2,2,2,...,2,2,2,2,2]] -tpep_pickup_datetime: [[2021-04-01 00:28:05.000000,...,2021-04-30 23:44:25.000000]] -tpep_dropoff_datetime: [[2021-04-01 00:47:59.000000,...,2021-05-01 00:14:47.000000]] -``` +Here are two common scenarios where deduplication is needed: -This will only pull in the files that that might contain matching rows. +```mermaid +graph TD + subgraph Iceberg Table Metadata + manifest1["ManifestFile"] + snapshot1["Snapshot"] + dataFile1["DataFile A"] + dataFile2["DataFile B"] + parquetFile["Parquet File (s3://bucket/path/to/data.parquet)"] + end -One can also return a PyArrow RecordBatchReader, if reading one record batch at a time is preferred: + snapshot1 --> manifest1 + manifest1 --> dataFile1 + manifest1 --> dataFile2 + dataFile1 --> parquetFile + dataFile2 --> parquetFile -```python -table.scan( - row_filter=GreaterThanOrEqual("trip_distance", 10.0), - selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"), -).to_arrow_batch_reader() + note1["Note: Both DataFile A and B point to the same Parquet file"] + note1 --- parquetFile ``` -### Pandas - - - -!!! note "Requirements" - This requires [`pandas` to be installed](index.md). - - - -PyIceberg makes it easy to filter out data from a huge table and pull it into a Pandas dataframe locally. This will only fetch the relevant Parquet files for the query and apply the filter. This will reduce IO and therefore improve performance and reduce cost. - -```python -table.scan( - row_filter="trip_distance >= 10.0", - selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"), -).to_pandas() -``` +```mermaid +graph TD + subgraph Iceberg Table Metadata + snapshot1["Snapshot"] + manifest1["ManifestFile A"] + manifest2["ManifestFile B"] + dataFile1["DataFile A (in Manifest A)"] + dataFile2["DataFile B (in Manifest B)"] + parquetFile["Parquet File (s3://bucket/path/to/data.parquet)"] + end -This will return a Pandas dataframe: + snapshot1 --> manifest1 + snapshot1 --> manifest2 + manifest1 --> dataFile1 + manifest2 --> dataFile2 + dataFile1 --> parquetFile + dataFile2 --> parquetFile -```python - VendorID tpep_pickup_datetime tpep_dropoff_datetime -0 2 2021-04-01 00:28:05+00:00 2021-04-01 00:47:59+00:00 -1 1 2021-04-01 00:39:01+00:00 2021-04-01 00:57:39+00:00 -2 2 2021-04-01 00:14:42+00:00 2021-04-01 00:42:59+00:00 -3 1 2021-04-01 00:17:17+00:00 2021-04-01 00:43:38+00:00 -4 1 2021-04-01 00:24:04+00:00 2021-04-01 00:56:20+00:00 -... ... ... ... -116976 2 2021-04-30 23:56:18+00:00 2021-05-01 00:29:13+00:00 -116977 2 2021-04-30 23:07:41+00:00 2021-04-30 23:37:18+00:00 -116978 2 2021-04-30 23:38:28+00:00 2021-05-01 00:12:04+00:00 -116979 2 2021-04-30 23:33:00+00:00 2021-04-30 23:59:00+00:00 -116980 2 2021-04-30 23:44:25+00:00 2021-05-01 00:14:47+00:00 - -[116981 rows x 3 columns] + note1["Note: Both Manifest Files refer to DataFiles that share the same physical Parquet file"] + note1 --- parquetFile ``` -It is recommended to use Pandas 2 or later, because it stores the data in an [Apache Arrow backend](https://datapythonista.me/blog/pandas-20-and-the-arrow-revolution-part-i) which avoids copies of data. +--- -### DuckDB +### Snapshot Retention and Expiration - +Iceberg tables accumulate snapshots over time. Retaining too many can waste storage, but removing too many can reduce rollback and audit capabilities. PyIceberg provides flexible retention policies: -!!! note "Requirements" - This requires [DuckDB to be installed](index.md). +- **Keep the last N snapshots** for rollback safety. +- **Expire snapshots older than a timestamp** for space reclamation. +- **Set a minimum number of snapshots to keep** as a guardrail. - - -A table scan can also be converted into a in-memory DuckDB table: +#### Example: Retain only the last 5 snapshots ```python -con = table.scan( - row_filter=GreaterThanOrEqual("trip_distance", 10.0), - selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"), -).to_duckdb(table_name="distant_taxi_trips") -``` +from pyiceberg.table.maintenance import MaintenanceTable -Using the cursor that we can run queries on the DuckDB table: - -```python -print( - con.execute( - "SELECT tpep_dropoff_datetime - tpep_pickup_datetime AS duration FROM distant_taxi_trips LIMIT 4" - ).fetchall() -) -[ - (datetime.timedelta(seconds=1194),), - (datetime.timedelta(seconds=1118),), - (datetime.timedelta(seconds=1697),), - (datetime.timedelta(seconds=1581),), -] +maintenance = MaintenanceTable(table) +maintenance.retain_last_n_snapshots(5) ``` -### Ray - - - -!!! note "Requirements" - This requires [Ray to be installed](index.md). - - - -A table scan can also be converted into a Ray dataset: +#### Example: Expire snapshots older than 30 days, but keep at least 3 ```python -ray_dataset = table.scan( - row_filter=GreaterThanOrEqual("trip_distance", 10.0), - selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"), -).to_ray() -``` +import time +from pyiceberg.table.maintenance import MaintenanceTable -This will return a Ray dataset: - -```python -Dataset( - num_blocks=1, - num_rows=1168798, - schema={ - VendorID: int64, - tpep_pickup_datetime: timestamp[us, tz=UTC], - tpep_dropoff_datetime: timestamp[us, tz=UTC] - } +maintenance = MaintenanceTable(table) +thirty_days_ago = int((time.time() - 30 * 24 * 60 * 60) * 1000) +maintenance.expire_snapshots_with_retention_policy( + timestamp_ms=thirty_days_ago, + min_snapshots_to_keep=3 ) ``` -Using [Ray Dataset API](https://docs.ray.io/en/latest/data/api/dataset.html) to interact with the dataset: - -```python -print(ray_dataset.take(2)) -[ - { - "VendorID": 2, - "tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 23, 50), - "tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 0, 34, 31), - }, - { - "VendorID": 2, - "tpep_pickup_datetime": datetime.datetime(2008, 12, 31, 23, 5, 3), - "tpep_dropoff_datetime": datetime.datetime(2009, 1, 1, 16, 10, 18), - }, -] -``` - -### Daft - -PyIceberg interfaces closely with Daft Dataframes (see also: [Daft integration with Iceberg](https://www.getdaft.io/projects/docs/en/stable/integrations/iceberg/)) which provides a full lazily optimized query engine interface on top of PyIceberg tables. - - - -!!! note "Requirements" - This requires [Daft to be installed](index.md). - - - -A table can be read easily into a Daft Dataframe: - -```python -df = table.to_daft() # equivalent to `daft.read_iceberg(table)` -df = df.where(df["trip_distance"] >= 10.0) -df = df.select("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime") -``` - -This returns a Daft Dataframe which is lazily materialized. Printing `df` will display the schema: - -```python -╭──────────┬───────────────────────────────┬───────────────────────────────╮ -│ VendorID ┆ tpep_pickup_datetime ┆ tpep_dropoff_datetime │ -│ --- ┆ --- ┆ --- │ -│ Int64 ┆ Timestamp(Microseconds, None) ┆ Timestamp(Microseconds, None) │ -╰──────────┴───────────────────────────────┴───────────────────────────────╯ - -(No data to display: Dataframe not materialized) -``` - -We can execute the Dataframe to preview the first few rows of the query with `df.show()`. - -This is correctly optimized to take advantage of Iceberg features such as hidden partitioning and file-level statistics for efficient reads. - -```python -df.show(2) -``` - -```python -╭──────────┬───────────────────────────────┬───────────────────────────────╮ -│ VendorID ┆ tpep_pickup_datetime ┆ tpep_dropoff_datetime │ -│ --- ┆ --- ┆ --- │ -│ Int64 ┆ Timestamp(Microseconds, None) ┆ Timestamp(Microseconds, None) │ -╞══════════╪═══════════════════════════════╪═══════════════════════════════╡ -│ 2 ┆ 2008-12-31T23:23:50.000000 ┆ 2009-01-01T00:34:31.000000 │ -├╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -│ 2 ┆ 2008-12-31T23:05:03.000000 ┆ 2009-01-01T16:10:18.000000 │ -╰──────────┴───────────────────────────────┴───────────────────────────────╯ - -(Showing first 2 rows) -``` - -### Polars - -PyIceberg interfaces closely with Polars Dataframes and LazyFrame which provides a full lazily optimized query engine interface on top of PyIceberg tables. - - - -!!! note "Requirements" - This requires [`polars` to be installed](index.md). - -```python -pip install pyiceberg['polars'] -``` - - -PyIceberg data can be analyzed and accessed through Polars using either DataFrame or LazyFrame. -If your code utilizes the Apache Iceberg data scanning and retrieval API and then analyzes the resulting DataFrame in Polars, use the `table.scan().to_polars()` API. -If the intent is to utilize Polars' high-performance filtering and retrieval functionalities, use LazyFrame exported from the Iceberg table with the `table.to_polars()` API. - -```python -# Get LazyFrame -iceberg_table.to_polars() - -# Get Data Frame -iceberg_table.scan().to_polars() -``` - -#### Working with Polars DataFrame - -PyIceberg makes it easy to filter out data from a huge table and pull it into a Polars dataframe locally. This will only fetch the relevant Parquet files for the query and apply the filter. This will reduce IO and therefore improve performance and reduce cost. +#### Example: Combined policy ```python -schema = Schema( - NestedField(field_id=1, name='ticket_id', field_type=LongType(), required=True), - NestedField(field_id=2, name='customer_id', field_type=LongType(), required=True), - NestedField(field_id=3, name='issue', field_type=StringType(), required=False), - NestedField(field_id=4, name='created_at', field_type=TimestampType(), required=True), - required=True -) - -iceberg_table = catalog.create_table( - identifier='default.product_support_issues', - schema=schema -) - -pa_table_data = pa.Table.from_pylist( - [ - {'ticket_id': 1, 'customer_id': 546, 'issue': 'User Login issue', 'created_at': 1650020000000000}, - {'ticket_id': 2, 'customer_id': 547, 'issue': 'Payment not going through', 'created_at': 1650028640000000}, - {'ticket_id': 3, 'customer_id': 548, 'issue': 'Error on checkout', 'created_at': 1650037280000000}, - {'ticket_id': 4, 'customer_id': 549, 'issue': 'Unable to reset password', 'created_at': 1650045920000000}, - {'ticket_id': 5, 'customer_id': 550, 'issue': 'Account locked', 'created_at': 1650054560000000}, - {'ticket_id': 6, 'customer_id': 551, 'issue': 'Order not received', 'created_at': 1650063200000000}, - {'ticket_id': 7, 'customer_id': 552, 'issue': 'Refund not processed', 'created_at': 1650071840000000}, - {'ticket_id': 8, 'customer_id': 553, 'issue': 'Shipping address issue', 'created_at': 1650080480000000}, - {'ticket_id': 9, 'customer_id': 554, 'issue': 'Product damaged', 'created_at': 1650089120000000}, - {'ticket_id': 10, 'customer_id': 555, 'issue': 'Unable to apply discount code', 'created_at': 1650097760000000}, - {'ticket_id': 11, 'customer_id': 556, 'issue': 'Website not loading', 'created_at': 1650106400000000}, - {'ticket_id': 12, 'customer_id': 557, 'issue': 'Incorrect order received', 'created_at': 1650115040000000}, - {'ticket_id': 13, 'customer_id': 558, 'issue': 'Unable to track order', 'created_at': 1650123680000000}, - {'ticket_id': 14, 'customer_id': 559, 'issue': 'Order delayed', 'created_at': 1650132320000000}, - {'ticket_id': 15, 'customer_id': 560, 'issue': 'Product not as described', 'created_at': 1650140960000000}, - {'ticket_id': 16, 'customer_id': 561, 'issue': 'Unable to contact support', 'created_at': 1650149600000000}, - {'ticket_id': 17, 'customer_id': 562, 'issue': 'Duplicate charge', 'created_at': 1650158240000000}, - {'ticket_id': 18, 'customer_id': 563, 'issue': 'Unable to update profile', 'created_at': 1650166880000000}, - {'ticket_id': 19, 'customer_id': 564, 'issue': 'App crashing', 'created_at': 1650175520000000}, - {'ticket_id': 20, 'customer_id': 565, 'issue': 'Unable to download invoice', 'created_at': 1650184160000000}, - {'ticket_id': 21, 'customer_id': 566, 'issue': 'Incorrect billing amount', 'created_at': 1650192800000000}, - ], schema=iceberg_table.schema().as_arrow() +# Expire old snapshots, but always keep last 10 and at least 5 total +maintenance.expire_snapshots_with_retention_policy( + timestamp_ms=thirty_days_ago, + retain_last_n=10, + min_snapshots_to_keep=5 ) - -iceberg_table.append( - df=pa_table_data -) - -table.scan( - row_filter="ticket_id > 10", -).to_polars() ``` -This will return a Polars DataFrame: +#### Deduplication Use Cases -```python -shape: (11, 4) -┌───────────┬─────────────┬────────────────────────────┬─────────────────────┐ -│ ticket_id ┆ customer_id ┆ issue ┆ created_at │ -│ --- ┆ --- ┆ --- ┆ --- │ -│ i64 ┆ i64 ┆ str ┆ datetime[μs] │ -╞═══════════╪═════════════╪════════════════════════════╪═════════════════════╡ -│ 11 ┆ 556 ┆ Website not loading ┆ 2022-04-16 10:53:20 │ -│ 12 ┆ 557 ┆ Incorrect order received ┆ 2022-04-16 13:17:20 │ -│ 13 ┆ 558 ┆ Unable to track order ┆ 2022-04-16 15:41:20 │ -│ 14 ┆ 559 ┆ Order delayed ┆ 2022-04-16 18:05:20 │ -│ 15 ┆ 560 ┆ Product not as described ┆ 2022-04-16 20:29:20 │ -│ … ┆ … ┆ … ┆ … │ -│ 17 ┆ 562 ┆ Duplicate charge ┆ 2022-04-17 01:17:20 │ -│ 18 ┆ 563 ┆ Unable to update profile ┆ 2022-04-17 03:41:20 │ -│ 19 ┆ 564 ┆ App crashing ┆ 2022-04-17 06:05:20 │ -│ 20 ┆ 565 ┆ Unable to download invoice ┆ 2022-04-17 08:29:20 │ -│ 21 ┆ 566 ┆ Incorrect billing amount ┆ 2022-04-17 10:53:20 │ -└───────────┴─────────────┴────────────────────────────┴─────────────────────┘ -``` +- **Operational Resilience**: Always keep recent snapshots for rollback. +- **Space Reclamation**: Remove old, unneeded snapshots. +- **Safety Guardrails**: Prevent accidental removal of too many snapshots. -#### Working with Polars LazyFrame +--- -PyIceberg supports creation of a Polars LazyFrame based on an Iceberg Table. +### Best Practices -using the above code example: +- Run deduplication and snapshot retention as part of regular table maintenance. +- Always review which snapshots are protected (branches/tags) before expiring. +- Use guardrails (`min_snapshots_to_keep`) in production to avoid accidental data loss. -```python -lf = iceberg_table.to_polars().filter(pl.col("ticket_id") > 10) -print(lf.collect()) -``` +--- -This above code snippet returns a Polars LazyFrame and defines a filter to be executed by Polars: +======= ```python shape: (11, 4) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 1dc7a29cc1..79972cc71e 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -80,6 +80,7 @@ from pyiceberg.schema import Schema from pyiceberg.table.inspect import InspectTable from pyiceberg.table.locations import LocationProvider, load_location_provider +from pyiceberg.table.maintenance import MaintenanceTable from pyiceberg.table.metadata import ( INITIAL_SEQUENCE_NUMBER, TableMetadata, @@ -115,7 +116,7 @@ update_table_metadata, ) from pyiceberg.table.update.schema import UpdateSchema -from pyiceberg.table.update.snapshot import ExpireSnapshots, ManageSnapshots, UpdateSnapshot, _FastAppendFiles +from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.table.update.statistics import UpdateStatistics from pyiceberg.transforms import IdentityTransform @@ -1069,6 +1070,15 @@ def inspect(self) -> InspectTable: """ return InspectTable(self) + @property + def maintenance(self) -> MaintenanceTable: + """Return the MaintenanceTable object for maintenance. + + Returns: + MaintenanceTable object based on this Table. + """ + return MaintenanceTable(self) + def refresh(self) -> Table: """Refresh the current table metadata. @@ -1241,10 +1251,6 @@ def manage_snapshots(self) -> ManageSnapshots: """ return ManageSnapshots(transaction=Transaction(self, autocommit=True)) - def expire_snapshots(self) -> ExpireSnapshots: - """Shorthand to run expire snapshots by id or by a timestamp.""" - return ExpireSnapshots(transaction=Transaction(self, autocommit=True)) - def update_statistics(self) -> UpdateStatistics: """ Shorthand to run statistics management operations like add statistics and remove statistics. diff --git a/pyiceberg/table/inspect.py b/pyiceberg/table/inspect.py index 3bb0268a05..d0062f4537 100644 --- a/pyiceberg/table/inspect.py +++ b/pyiceberg/table/inspect.py @@ -17,7 +17,8 @@ from __future__ import annotations from datetime import datetime, timezone -from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple +from functools import reduce +from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Set, Tuple, Union from pyiceberg.conversions import from_bytes from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile, PartitionFieldSummary @@ -650,14 +651,11 @@ def _files(self, snapshot_id: Optional[int] = None, data_file_filter: Optional[S snapshot = self._get_snapshot(snapshot_id) io = self.tbl.io + files_table: list[pa.Table] = [] + for manifest_list in snapshot.manifests(io): + files_table.append(self._get_files_from_manifest(manifest_list, data_file_filter)) - executor = ExecutorFactory.get_or_create() - results = list( - executor.map( - lambda manifest_list: self._get_files_from_manifest(manifest_list, data_file_filter), snapshot.manifests(io) - ) - ) - return pa.concat_tables(results) + return pa.concat_tables(files_table) def files(self, snapshot_id: Optional[int] = None) -> "pa.Table": return self._files(snapshot_id) @@ -668,10 +666,20 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) - def all_manifests(self) -> "pa.Table": + def all_manifests(self, snapshots: Optional[Union[list[Snapshot], list[int]]] = None) -> "pa.Table": import pyarrow as pa - snapshots = self.tbl.snapshots() + # coerce into snapshot objects if users passes in snapshot ids + if snapshots is not None: + if isinstance(snapshots[0], int): + snapshots = [ + snapshot + for snapshot_id in snapshots + if (snapshot := self.tbl.metadata.snapshot_by_id(snapshot_id)) is not None + ] + else: + snapshots = self.tbl.snapshots() + if not snapshots: return pa.Table.from_pylist([], schema=self._get_all_manifests_schema()) @@ -681,6 +689,32 @@ def all_manifests(self) -> "pa.Table": ) return pa.concat_tables(manifests_by_snapshots) + def _all_known_files(self) -> dict[str, set[str]]: + """Get all the known files in the table. + + Returns: + dict of {file_type: set of file paths} for each file type. + """ + snapshots = self.tbl.snapshots() + + _all_known_files = {} + _all_known_files["manifests"] = set(self.all_manifests(snapshots)["path"].to_pylist()) + _all_known_files["manifest_lists"] = {snapshot.manifest_list for snapshot in snapshots} + _all_known_files["statistics"] = {statistic.statistics_path for statistic in self.tbl.metadata.statistics} + + metadata_files = {entry.metadata_file for entry in self.tbl.metadata.metadata_log} + metadata_files.add(self.tbl.metadata_location) # Include current metadata file + _all_known_files["metadata"] = metadata_files + + executor = ExecutorFactory.get_or_create() + snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] + files_by_snapshots: Iterator[Set[str]] = executor.map( + lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids + ) + _all_known_files["datafiles"] = reduce(set.union, files_by_snapshots, set()) + + return _all_known_files + def _all_files(self, data_file_filter: Optional[Set[DataFileContent]] = None) -> "pa.Table": import pyarrow as pa diff --git a/pyiceberg/table/maintenance.py b/pyiceberg/table/maintenance.py new file mode 100644 index 0000000000..1d71598cf3 --- /dev/null +++ b/pyiceberg/table/maintenance.py @@ -0,0 +1,372 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, List, Optional, Set + +from pyiceberg.manifest import DataFile, ManifestFile +from pyiceberg.utils.concurrent import ThreadPoolExecutor # type: ignore[attr-defined] + +logger = logging.getLogger(__name__) + + +if TYPE_CHECKING: + from pyiceberg.table import Table + from pyiceberg.table.metadata import TableMetadata + + +class MaintenanceTable: + tbl: Table + + def __init__(self, tbl: Table) -> None: + self.tbl = tbl + + try: + import pyarrow as pa # noqa + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For metadata operations PyArrow needs to be installed") from e + + def expire_snapshot_by_id(self, snapshot_id: int) -> None: + """Expire a single snapshot by its ID. + + Args: + snapshot_id: The ID of the snapshot to expire. + + Raises: + ValueError: If the snapshot does not exist or is protected. + """ + with self.tbl.transaction() as txn: + # Check if snapshot exists + if not any(snapshot.snapshot_id == snapshot_id for snapshot in txn.table_metadata.snapshots): + raise ValueError(f"Snapshot with ID {snapshot_id} does not exist.") + + # Check if snapshot is protected + protected_ids = self._get_protected_snapshot_ids(txn.table_metadata) + if snapshot_id in protected_ids: + raise ValueError(f"Snapshot with ID {snapshot_id} is protected and cannot be expired.") + + # Remove the snapshot + from pyiceberg.table.update import RemoveSnapshotsUpdate + + txn._apply((RemoveSnapshotsUpdate(snapshot_ids=[snapshot_id]),)) + + def expire_snapshots_by_ids(self, snapshot_ids: List[int]) -> None: + """Expire multiple snapshots by their IDs. + + Args: + snapshot_ids: List of snapshot IDs to expire. + + Raises: + ValueError: If any snapshot does not exist or is protected. + """ + with self.tbl.transaction() as txn: + protected_ids = self._get_protected_snapshot_ids(txn.table_metadata) + + # Validate all snapshots before expiring any + for snapshot_id in snapshot_ids: + if txn.table_metadata.snapshot_by_id(snapshot_id) is None: + raise ValueError(f"Snapshot with ID {snapshot_id} does not exist.") + if snapshot_id in protected_ids: + raise ValueError(f"Snapshot with ID {snapshot_id} is protected and cannot be expired.") + + # Remove all snapshots + from pyiceberg.table.update import RemoveSnapshotsUpdate + + txn._apply((RemoveSnapshotsUpdate(snapshot_ids=snapshot_ids),)) + + def expire_snapshots_older_than(self, timestamp_ms: int) -> None: + """Expire all unprotected snapshots with a timestamp older than a given value. + + Args: + timestamp_ms: Only snapshots with timestamp_ms < this value will be expired. + """ + # First check if there are any snapshots to expire to avoid unnecessary transactions + protected_ids = self._get_protected_snapshot_ids(self.tbl.metadata) + snapshots_to_expire: List[int] = [] + + for snapshot in self.tbl.metadata.snapshots: + if snapshot.timestamp_ms < timestamp_ms and snapshot.snapshot_id not in protected_ids: + snapshots_to_expire.append(snapshot.snapshot_id) + + if snapshots_to_expire: + self.expire_snapshots_by_ids(snapshots_to_expire) + + def expire_snapshots_older_than_with_retention( + self, timestamp_ms: int, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None + ) -> None: + """Expire all unprotected snapshots with a timestamp older than a given value, with retention strategies. + + Args: + timestamp_ms: Only snapshots with timestamp_ms < this value will be expired. + retain_last_n: Always keep the last N snapshots regardless of age. + min_snapshots_to_keep: Minimum number of snapshots to keep in total. + """ + snapshots_to_expire = self._get_snapshots_to_expire_with_retention( + timestamp_ms=timestamp_ms, retain_last_n=retain_last_n, min_snapshots_to_keep=min_snapshots_to_keep + ) + + if snapshots_to_expire: + self.expire_snapshots_by_ids(snapshots_to_expire) + + def retain_last_n_snapshots(self, n: int) -> None: + """Keep only the last N snapshots, expiring all others. + + Args: + n: Number of most recent snapshots to keep. + + Raises: + ValueError: If n is less than 1. + """ + if n < 1: + raise ValueError("Number of snapshots to retain must be at least 1") + + protected_ids = self._get_protected_snapshot_ids(self.tbl.metadata) + + # Sort snapshots by timestamp (most recent first) + sorted_snapshots = sorted(self.tbl.metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) + + # Keep the last N snapshots and all protected ones + snapshots_to_keep = set() + snapshots_to_keep.update(protected_ids) + + # Add the N most recent snapshots + for i, snapshot in enumerate(sorted_snapshots): + if i < n: + snapshots_to_keep.add(snapshot.snapshot_id) + + # Find snapshots to expire + snapshots_to_expire: List[int] = [] + for snapshot in self.tbl.metadata.snapshots: + if snapshot.snapshot_id not in snapshots_to_keep: + snapshots_to_expire.append(snapshot.snapshot_id) + + if snapshots_to_expire: + self.expire_snapshots_by_ids(snapshots_to_expire) + + def _get_snapshots_to_expire_with_retention( + self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None + ) -> List[int]: + """Get snapshots to expire considering retention strategies. + + Args: + timestamp_ms: Only snapshots with timestamp_ms < this value will be considered for expiration. + retain_last_n: Always keep the last N snapshots regardless of age. + min_snapshots_to_keep: Minimum number of snapshots to keep in total. + + Returns: + List of snapshot IDs to expire. + """ + protected_ids = self._get_protected_snapshot_ids(self.tbl.metadata) + + # Sort snapshots by timestamp (most recent first) + sorted_snapshots = sorted(self.tbl.metadata.snapshots, key=lambda s: s.timestamp_ms, reverse=True) + + # Start with all snapshots that could be expired + candidates_for_expiration = [] + snapshots_to_keep = set(protected_ids) + + # Apply retain_last_n constraint + if retain_last_n is not None: + for i, snapshot in enumerate(sorted_snapshots): + if i < retain_last_n: + snapshots_to_keep.add(snapshot.snapshot_id) + + # Apply timestamp constraint + for snapshot in self.tbl.metadata.snapshots: + if snapshot.snapshot_id not in snapshots_to_keep and (timestamp_ms is None or snapshot.timestamp_ms < timestamp_ms): + candidates_for_expiration.append(snapshot) + + # Sort candidates by timestamp (oldest first) for potential expiration + candidates_for_expiration.sort(key=lambda s: s.timestamp_ms) + + # Apply min_snapshots_to_keep constraint + total_snapshots = len(self.tbl.metadata.snapshots) + snapshots_to_expire: List[int] = [] + + for candidate in candidates_for_expiration: + # Check if expiring this snapshot would violate min_snapshots_to_keep + remaining_after_expiration = total_snapshots - len(snapshots_to_expire) - 1 + + if min_snapshots_to_keep is None or remaining_after_expiration >= min_snapshots_to_keep: + snapshots_to_expire.append(candidate.snapshot_id) + else: + # Stop expiring to maintain minimum count + break + + return snapshots_to_expire + + def expire_snapshots_with_retention_policy( + self, timestamp_ms: Optional[int] = None, retain_last_n: Optional[int] = None, min_snapshots_to_keep: Optional[int] = None + ) -> None: + """Comprehensive snapshot expiration with multiple retention strategies. + + This method provides a unified interface for snapshot expiration with various + retention policies to ensure operational resilience while allowing space reclamation. + + Args: + timestamp_ms: Only snapshots with timestamp_ms < this value will be considered for expiration. + If None, all snapshots are candidates (subject to other constraints). + retain_last_n: Always keep the last N snapshots regardless of age. + Useful when regular snapshot creation occurs and users want to keep + the last few for rollback purposes. + min_snapshots_to_keep: Minimum number of snapshots to keep in total. + Acts as a guardrail to prevent aggressive expiration logic + from removing too many snapshots. + + Returns: + List of snapshot IDs that were expired. + + Raises: + ValueError: If retain_last_n or min_snapshots_to_keep is less than 1. + + Examples: + # Keep last 5 snapshots regardless of age + maintenance.expire_snapshots_with_retention_policy(retain_last_n=5) + + # Expire snapshots older than timestamp but keep at least 3 total + maintenance.expire_snapshots_with_retention_policy( + timestamp_ms=1234567890000, + min_snapshots_to_keep=3 + ) + + # Combined policy: expire old snapshots but keep last 10 and at least 5 total + maintenance.expire_snapshots_with_retention_policy( + timestamp_ms=1234567890000, + retain_last_n=10, + min_snapshots_to_keep=5 + ) + """ + if retain_last_n is not None and retain_last_n < 1: + raise ValueError("retain_last_n must be at least 1") + + if min_snapshots_to_keep is not None and min_snapshots_to_keep < 1: + raise ValueError("min_snapshots_to_keep must be at least 1") + + snapshots_to_expire = self._get_snapshots_to_expire_with_retention( + timestamp_ms=timestamp_ms, retain_last_n=retain_last_n, min_snapshots_to_keep=min_snapshots_to_keep + ) + + if snapshots_to_expire: + self.expire_snapshots_by_ids(snapshots_to_expire) + + def _get_protected_snapshot_ids(self, table_metadata: TableMetadata) -> Set[int]: + """Get the IDs of protected snapshots. + + These are the HEAD snapshots of all branches and all tagged snapshots. + These ids are to be excluded from expiration. + + Args: + table_metadata: The table metadata to check for protected snapshots. + + Returns: + Set of protected snapshot IDs to exclude from expiration. + """ + from pyiceberg.table.refs import SnapshotRefType + + protected_ids: Set[int] = set() + for ref in table_metadata.refs.values(): + if ref.snapshot_ref_type in [SnapshotRefType.TAG, SnapshotRefType.BRANCH]: + protected_ids.add(ref.snapshot_id) + return protected_ids + + def _get_all_datafiles(self) -> List[DataFile]: + """Collect all DataFiles in the current snapshot only.""" + datafiles: List[DataFile] = [] + + current_snapshot = self.tbl.current_snapshot() + if not current_snapshot: + return datafiles + + def process_manifest(manifest: ManifestFile) -> list[DataFile]: + found: list[DataFile] = [] + for entry in manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=True): + if hasattr(entry, "data_file"): + found.append(entry.data_file) + return found + + # Scan only the current snapshot's manifests + manifests = current_snapshot.manifests(io=self.tbl.io) + with ThreadPoolExecutor() as executor: + results = executor.map(process_manifest, manifests) + for res in results: + datafiles.extend(res) + + return datafiles + + def deduplicate_data_files(self) -> List[DataFile]: + """ + Remove duplicate data files from an Iceberg table. + + Returns: + List of removed DataFile objects. + """ + import os + from collections import defaultdict + + removed: List[DataFile] = [] + + # Get the current snapshot + current_snapshot = self.tbl.current_snapshot() + if not current_snapshot: + return removed + + # Collect all manifest entries from the current snapshot + all_entries = [] + for manifest in current_snapshot.manifests(io=self.tbl.io): + entries = list(manifest.fetch_manifest_entry(io=self.tbl.io, discard_deleted=True)) + all_entries.extend(entries) + + # Group entries by file name + file_groups = defaultdict(list) + for entry in all_entries: + file_name = os.path.basename(entry.data_file.file_path) + file_groups[file_name].append(entry) + + # Find duplicate entries to remove + has_duplicates = False + files_to_remove = [] + files_to_keep = [] + + for _file_name, entries in file_groups.items(): + if len(entries) > 1: + # Keep the first entry, remove the rest + files_to_keep.append(entries[0].data_file) + for duplicate_entry in entries[1:]: + files_to_remove.append(duplicate_entry.data_file) + removed.append(duplicate_entry.data_file) + has_duplicates = True + else: + # No duplicates, keep the entry + files_to_keep.append(entries[0].data_file) + + # Only create a new snapshot if we actually have duplicates to remove + if has_duplicates: + with self.tbl.transaction() as txn: + with txn.update_snapshot().overwrite() as overwrite_snapshot: + # First, explicitly delete all the duplicate files + for file_to_remove in files_to_remove: + overwrite_snapshot.delete_data_file(file_to_remove) + + # Then add back only the files that should be kept + for file_to_keep in files_to_keep: + overwrite_snapshot.append_data_file(file_to_keep) + + # Refresh the table to reflect the changes + self.tbl = self.tbl.refresh() + + return removed diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index 3ffb275ded..fa1a7715b1 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -68,7 +68,6 @@ AddSnapshotUpdate, AssertRefSnapshotId, RemoveSnapshotRefUpdate, - RemoveSnapshotsUpdate, SetSnapshotRefUpdate, TableRequirement, TableUpdate, @@ -904,103 +903,3 @@ def remove_branch(self, branch_name: str) -> ManageSnapshots: This for method chaining """ return self._remove_ref_snapshot(ref_name=branch_name) - - -class ExpireSnapshots(UpdateTableMetadata["ExpireSnapshots"]): - """ - Expire snapshots by ID. - - Use table.expire_snapshots().().commit() to run a specific operation. - Use table.expire_snapshots().().().commit() to run multiple operations. - Pending changes are applied on commit. - """ - - _snapshot_ids_to_expire: Set[int] = set() - _updates: Tuple[TableUpdate, ...] = () - _requirements: Tuple[TableRequirement, ...] = () - - def _commit(self) -> UpdatesAndRequirements: - """ - Commit the staged updates and requirements. - - This will remove the snapshots with the given IDs, but will always skip protected snapshots (branch/tag heads). - - Returns: - Tuple of updates and requirements to be committed, - as required by the calling parent apply functions. - """ - # Remove any protected snapshot IDs from the set to expire, just in case - protected_ids = self._get_protected_snapshot_ids() - self._snapshot_ids_to_expire -= protected_ids - update = RemoveSnapshotsUpdate(snapshot_ids=self._snapshot_ids_to_expire) - self._updates += (update,) - return self._updates, self._requirements - - def _get_protected_snapshot_ids(self) -> Set[int]: - """ - Get the IDs of protected snapshots. - - These are the HEAD snapshots of all branches and all tagged snapshots. These ids are to be excluded from expiration. - - Returns: - Set of protected snapshot IDs to exclude from expiration. - """ - protected_ids: Set[int] = set() - - for ref in self._transaction.table_metadata.refs.values(): - if ref.snapshot_ref_type in [SnapshotRefType.TAG, SnapshotRefType.BRANCH]: - protected_ids.add(ref.snapshot_id) - - return protected_ids - - def expire_snapshot_by_id(self, snapshot_id: int) -> ExpireSnapshots: - """ - Expire a snapshot by its ID. - - This will mark the snapshot for expiration. - - Args: - snapshot_id (int): The ID of the snapshot to expire. - Returns: - This for method chaining. - """ - if self._transaction.table_metadata.snapshot_by_id(snapshot_id) is None: - raise ValueError(f"Snapshot with ID {snapshot_id} does not exist.") - - if snapshot_id in self._get_protected_snapshot_ids(): - raise ValueError(f"Snapshot with ID {snapshot_id} is protected and cannot be expired.") - - self._snapshot_ids_to_expire.add(snapshot_id) - - return self - - def expire_snapshots_by_ids(self, snapshot_ids: List[int]) -> "ExpireSnapshots": - """ - Expire multiple snapshots by their IDs. - - This will mark the snapshots for expiration. - - Args: - snapshot_ids (List[int]): List of snapshot IDs to expire. - Returns: - This for method chaining. - """ - for snapshot_id in snapshot_ids: - self.expire_snapshot_by_id(snapshot_id) - return self - - def expire_snapshots_older_than(self, timestamp_ms: int) -> "ExpireSnapshots": - """ - Expire all unprotected snapshots with a timestamp older than a given value. - - Args: - timestamp_ms (int): Only snapshots with timestamp_ms < this value will be expired. - - Returns: - This for method chaining. - """ - protected_ids = self._get_protected_snapshot_ids() - for snapshot in self._transaction.table_metadata.snapshots: - if snapshot.timestamp_ms < timestamp_ms and snapshot.snapshot_id not in protected_ids: - self._snapshot_ids_to_expire.add(snapshot.snapshot_id) - return self diff --git a/ruff.toml b/ruff.toml index 11fd2a957b..bd5c015ea5 100644 --- a/ruff.toml +++ b/ruff.toml @@ -34,13 +34,7 @@ exclude = [ ".svn", ".tox", ".venv", - "__pypackages__", - "_build", - "buck-out", - "build", - "dist", - "node_modules", - "venv", + "vendor", ] # Ignore _all_ violations. diff --git a/tests/expressions/test_literals.py b/tests/expressions/test_literals.py index 4d8f5557f6..b58a202483 100644 --- a/tests/expressions/test_literals.py +++ b/tests/expressions/test_literals.py @@ -760,7 +760,21 @@ def test_invalid_decimal_conversions() -> None: def test_invalid_string_conversions() -> None: assert_invalid_conversions( literal("abc"), - [FixedType(1), BinaryType()], + [ + BooleanType(), + IntegerType(), + LongType(), + FloatType(), + DoubleType(), + DateType(), + TimeType(), + TimestampType(), + TimestamptzType(), + DecimalType(9, 2), + UUIDType(), + FixedType(1), + BinaryType(), + ], ) diff --git a/tests/table/test_dedup_data_file_filepaths.py b/tests/table/test_dedup_data_file_filepaths.py new file mode 100644 index 0000000000..67fb57e6fe --- /dev/null +++ b/tests/table/test_dedup_data_file_filepaths.py @@ -0,0 +1,139 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import os +import uuid +from pathlib import Path +from typing import List, Set + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +from pyiceberg.manifest import DataFile +from pyiceberg.table import Table +from pyiceberg.table.maintenance import MaintenanceTable +from tests.catalog.test_base import InMemoryCatalog + + +@pytest.fixture +def iceberg_catalog(tmp_path: Path) -> InMemoryCatalog: + catalog = InMemoryCatalog("test.in_memory.catalog", warehouse=tmp_path.absolute().as_posix()) + catalog.create_namespace("default") + return catalog + + +@pytest.fixture +def dupe_data_file_path(tmp_path: Path) -> Path: + unique_id = uuid.uuid4() + return tmp_path / f"{unique_id}" / "file1.parquet" + + +@pytest.fixture +def prepopulated_table(iceberg_catalog: InMemoryCatalog, dupe_data_file_path: Path) -> Table: + identifier = "default.test_table" + try: + iceberg_catalog.drop_table(identifier) + except Exception: + pass + + arrow_schema = pa.schema( + [ + pa.field("id", pa.int32(), nullable=False), + pa.field("value", pa.string(), nullable=True), + ] + ) + + df_a = pa.Table.from_pylist( + [ + {"id": 1, "value": "A"}, + ], + schema=arrow_schema, + ) + df_b = pa.Table.from_pylist( + [ + {"id": 2, "value": "B"}, + ], + schema=arrow_schema, + ) + + # Ensure the parent directory exists + dupe_data_file_path.parent.mkdir(parents=True, exist_ok=True) + + pq.write_table(df_a, str(dupe_data_file_path)) + pq.write_table(df_b, str(dupe_data_file_path)) + + table: Table = iceberg_catalog.create_table(identifier, arrow_schema) + + tx = table.transaction() + tx.add_files([str(dupe_data_file_path)], check_duplicate_files=False) + tx.commit_transaction() + tx2 = table.transaction() + tx2.add_files([str(dupe_data_file_path)], check_duplicate_files=False) + tx2.commit_transaction() + + return table + + +def test_overwrite_removes_only_selected_datafile(prepopulated_table: Table, dupe_data_file_path: Path) -> None: + mt = MaintenanceTable(tbl=prepopulated_table) + + removed_files: List[DataFile] = mt.deduplicate_data_files() + + file_names_after: Set[str] = {df.file_path.split("/")[-1] for df in mt._get_all_datafiles()} + # Only one file with the same name should remain after deduplication + assert dupe_data_file_path.name in file_names_after, f"Expected {dupe_data_file_path.name} to remain in the table" + assert len(file_names_after) == 1, "Expected only one unique file name to remain after deduplication" + # All removed files should have the same file name + assert all(df.file_path.split("/")[-1] == dupe_data_file_path.name for df in removed_files), ( + "All removed files should be duplicates by name" + ) + + +def test_get_all_datafiles_current_snapshot(prepopulated_table: Table, dupe_data_file_path: Path) -> None: + mt = MaintenanceTable(tbl=prepopulated_table) + + datafiles: List[DataFile] = mt._get_all_datafiles() + file_paths: Set[str] = {df.file_path.split("/")[-1] for df in datafiles} + assert dupe_data_file_path.name in file_paths + + +def test_get_all_datafiles_all_snapshots(prepopulated_table: Table, dupe_data_file_path: Path) -> None: + mt = MaintenanceTable(tbl=prepopulated_table) + + datafiles: List[DataFile] = mt._get_all_datafiles() + file_paths: Set[str] = {df.file_path.split("/")[-1] for df in datafiles} + assert dupe_data_file_path.name in file_paths + + +def test_deduplicate_data_files_removes_duplicates_in_current_snapshot( + prepopulated_table: Table, dupe_data_file_path: Path +) -> None: + mt = MaintenanceTable(tbl=prepopulated_table) + + all_datafiles: List[DataFile] = mt._get_all_datafiles() + file_names: List[str] = [os.path.basename(df.file_path) for df in all_datafiles] + # There should be more than one reference before deduplication + assert file_names.count(dupe_data_file_path.name) > 1, ( + f"Expected multiple references to {dupe_data_file_path.name} before deduplication" + ) + removed: List[DataFile] = mt.deduplicate_data_files() + + all_datafiles_after: List[DataFile] = mt._get_all_datafiles() + file_names_after: List[str] = [os.path.basename(df.file_path) for df in all_datafiles_after] + # Only one reference should remain after deduplication + assert file_names_after.count(dupe_data_file_path.name) == 1 + assert all(isinstance(df, DataFile) for df in removed) diff --git a/tests/table/test_overwrite_files.py b/tests/table/test_overwrite_files.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/tests/table/test_overwrite_files.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/table/test_expire_snapshots.py b/tests/table/test_retention_strategies.py similarity index 64% rename from tests/table/test_expire_snapshots.py rename to tests/table/test_retention_strategies.py index 82ecb9e493..9427e04ebf 100644 --- a/tests/table/test_expire_snapshots.py +++ b/tests/table/test_retention_strategies.py @@ -14,12 +14,99 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from types import SimpleNamespace from unittest.mock import MagicMock from uuid import uuid4 import pytest -from pyiceberg.table import CommitTableResponse, Table +from pyiceberg.table import CommitTableResponse, Table # noqa: F401 + + +def _make_snapshots(ids_and_timestamps: list[tuple[int, int]]) -> list[SimpleNamespace]: + return [SimpleNamespace(snapshot_id=sid, timestamp_ms=ts, parent_snapshot_id=None) for sid, ts in ids_and_timestamps] + + +def test_retain_last_n_snapshots(table_v2: Table) -> None: + # Setup: 5 snapshots, keep last 3 + ids_and_ts = [ + (1, 1000), + (2, 2000), + (3, 3000), + (4, 4000), + (5, 5000), + ] + snapshots = _make_snapshots(ids_and_ts) + table_v2.metadata = table_v2.metadata.model_copy(update={"snapshots": snapshots, "refs": {}}) + table_v2.catalog = MagicMock() + # Simulate commit response with only last 3 snapshots + keep_ids = [3, 4, 5] + mock_response = CommitTableResponse( + metadata=table_v2.metadata.model_copy(update={"snapshots": [s for s in snapshots if s.snapshot_id in keep_ids]}), + metadata_location="mock://metadata/location", + uuid=uuid4(), + ) + table_v2.catalog.commit_table.return_value = mock_response + table_v2.maintenance.retain_last_n_snapshots(3) + table_v2.catalog.commit_table.assert_called_once() + # Update metadata to reflect commit + table_v2.metadata = mock_response.metadata + remaining_ids = {s.snapshot_id for s in table_v2.metadata.snapshots} + assert remaining_ids == set(keep_ids) + + +def test_min_snapshots_to_keep(table_v2: Table) -> None: + # Setup: 5 snapshots, expire all older than 4500, but keep at least 3 + ids_and_ts = [ + (1, 1000), + (2, 2000), + (3, 3000), + (4, 4000), + (5, 5000), + ] + snapshots = _make_snapshots(ids_and_ts) + table_v2.metadata = table_v2.metadata.model_copy(update={"snapshots": snapshots, "refs": {}}) + table_v2.catalog = MagicMock() + # Only 1,2 should be expired (to keep 3 total) + keep_ids = [3, 4, 5] + mock_response = CommitTableResponse( + metadata=table_v2.metadata.model_copy(update={"snapshots": [s for s in snapshots if s.snapshot_id in keep_ids]}), + metadata_location="mock://metadata/location", + uuid=uuid4(), + ) + table_v2.catalog.commit_table.return_value = mock_response + table_v2.maintenance.expire_snapshots_older_than_with_retention(timestamp_ms=4500, min_snapshots_to_keep=3) + table_v2.catalog.commit_table.assert_called_once() + table_v2.metadata = mock_response.metadata + remaining_ids = {s.snapshot_id for s in table_v2.metadata.snapshots} + assert remaining_ids == set(keep_ids) + + +def test_combined_constraints(table_v2: Table) -> None: + # Setup: 5 snapshots, expire all older than 3500, keep last 2, min 4 total + ids_and_ts = [ + (1, 1000), + (2, 2000), + (3, 3000), + (4, 4000), + (5, 5000), + ] + snapshots = _make_snapshots(ids_and_ts) + table_v2.metadata = table_v2.metadata.model_copy(update={"snapshots": snapshots, "refs": {}}) + table_v2.catalog = MagicMock() + # Only 1 should be expired (to keep last 2 and min 4 total) + keep_ids = [2, 3, 4, 5] + mock_response = CommitTableResponse( + metadata=table_v2.metadata.model_copy(update={"snapshots": [s for s in snapshots if s.snapshot_id in keep_ids]}), + metadata_location="mock://metadata/location", + uuid=uuid4(), + ) + table_v2.catalog.commit_table.return_value = mock_response + table_v2.maintenance.expire_snapshots_with_retention_policy(timestamp_ms=3500, retain_last_n=2, min_snapshots_to_keep=4) + table_v2.catalog.commit_table.assert_called_once() + table_v2.metadata = mock_response.metadata + remaining_ids = {s.snapshot_id for s in table_v2.metadata.snapshots} + assert remaining_ids == set(keep_ids) def test_cannot_expire_protected_head_snapshot(table_v2: Table) -> None: @@ -43,7 +130,7 @@ def test_cannot_expire_protected_head_snapshot(table_v2: Table) -> None: # Attempt to expire the HEAD snapshot and expect a ValueError with pytest.raises(ValueError, match=f"Snapshot with ID {HEAD_SNAPSHOT} is protected and cannot be expired."): - table_v2.expire_snapshots().expire_snapshot_by_id(HEAD_SNAPSHOT).commit() + table_v2.maintenance.expire_snapshot_by_id(HEAD_SNAPSHOT) table_v2.catalog.commit_table.assert_not_called() @@ -66,7 +153,7 @@ def test_cannot_expire_tagged_snapshot(table_v2: Table) -> None: assert any(ref.snapshot_id == TAGGED_SNAPSHOT for ref in table_v2.metadata.refs.values()) with pytest.raises(ValueError, match=f"Snapshot with ID {TAGGED_SNAPSHOT} is protected and cannot be expired."): - table_v2.expire_snapshots().expire_snapshot_by_id(TAGGED_SNAPSHOT).commit() + table_v2.maintenance.expire_snapshot_by_id(TAGGED_SNAPSHOT) table_v2.catalog.commit_table.assert_not_called() @@ -98,7 +185,7 @@ def test_expire_unprotected_snapshot(table_v2: Table) -> None: assert all(ref.snapshot_id != EXPIRE_SNAPSHOT for ref in table_v2.metadata.refs.values()) # Expire the snapshot - table_v2.expire_snapshots().expire_snapshot_by_id(EXPIRE_SNAPSHOT).commit() + table_v2.maintenance.expire_snapshot_by_id(EXPIRE_SNAPSHOT) table_v2.catalog.commit_table.assert_called_once() remaining_snapshots = table_v2.metadata.snapshots @@ -114,7 +201,7 @@ def test_expire_nonexistent_snapshot_raises(table_v2: Table) -> None: table_v2.metadata = table_v2.metadata.model_copy(update={"refs": {}}) with pytest.raises(ValueError, match=f"Snapshot with ID {NONEXISTENT_SNAPSHOT} does not exist."): - table_v2.expire_snapshots().expire_snapshot_by_id(NONEXISTENT_SNAPSHOT).commit() + table_v2.maintenance.expire_snapshot_by_id(NONEXISTENT_SNAPSHOT) table_v2.catalog.commit_table.assert_not_called() @@ -131,7 +218,7 @@ def test_expire_snapshots_by_timestamp_skips_protected(table_v2: Table) -> None: update={ "refs": { "main": MagicMock(snapshot_id=HEAD_SNAPSHOT, snapshot_ref_type="branch"), - "mytag": MagicMock(snapshot_id=TAGGED_SNAPSHOT, snapshot_ref_type="tag"), + "my_tag": MagicMock(snapshot_id=TAGGED_SNAPSHOT, snapshot_ref_type="tag"), }, "snapshots": [ SimpleNamespace(snapshot_id=HEAD_SNAPSHOT, timestamp_ms=1, parent_snapshot_id=None), @@ -152,22 +239,16 @@ def test_expire_snapshots_by_timestamp_skips_protected(table_v2: Table) -> None: ) table_v2.catalog.commit_table.return_value = mock_response - table_v2.expire_snapshots().expire_snapshots_older_than(future_timestamp).commit() - # Update metadata to reflect the commit (as in other tests) - table_v2.metadata = mock_response.metadata + table_v2.maintenance.expire_snapshots_older_than(future_timestamp) # Both protected snapshots should remain remaining_ids = {s.snapshot_id for s in table_v2.metadata.snapshots} assert HEAD_SNAPSHOT in remaining_ids assert TAGGED_SNAPSHOT in remaining_ids - # No snapshots should have been expired (commit_table called, but with empty snapshot_ids) - args, kwargs = table_v2.catalog.commit_table.call_args - updates = args[2] if len(args) > 2 else () - # Find RemoveSnapshotsUpdate in updates - remove_update = next((u for u in updates if getattr(u, "action", None) == "remove-snapshots"), None) - assert remove_update is not None - assert remove_update.snapshot_ids == [] + # No snapshots should have been expired, so commit_table should not have been called + # This is the correct behavior - don't create unnecessary transactions when there's nothing to do + table_v2.catalog.commit_table.assert_not_called() def test_expire_snapshots_by_ids(table_v2: Table) -> None: @@ -215,7 +296,7 @@ def test_expire_snapshots_by_ids(table_v2: Table) -> None: assert all(ref.snapshot_id not in (EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2) for ref in table_v2.metadata.refs.values()) # Expire the snapshots - table_v2.expire_snapshots().expire_snapshots_by_ids([EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2]).commit() + table_v2.maintenance.expire_snapshots_by_ids([EXPIRE_SNAPSHOT_1, EXPIRE_SNAPSHOT_2]) table_v2.catalog.commit_table.assert_called_once() remaining_snapshots = table_v2.metadata.snapshots