Skip to content

[SUPPORT] Flink inline clustering "Metadata is empty for partition: ~~" #13439

Open
@SML0127

Description

@SML0127

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at [email protected].

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

A clear and concise description of the problem.

To Reproduce

Expected behavior

  • Loading data from flink to hudi table, inline cluster enabled.
  • The following error occurs when performing clustering.

Environment Description

  • Hudi version : 1.0.2

  • Flink version : 1.20.0

  • Hive version : 2.3.7

  • Hadoop version : 2.10.2

  • Storage (HDFS/S3/GCS..) : HDFS

  • Running on Docker? (yes/no) : yes

Additional context

my optoins

        hudiOptions.put(FlinkOptions.PATH.key(), tablePath);
        hudiOptions.put(FlinkOptions.DATABASE_NAME.key(), targetDatabase);
        hudiOptions.put(FlinkOptions.TABLE_NAME.key(), targetTable);

        hudiOptions.put(HoodieMetadataConfig.METRICS_ENABLE.key(), "true");
        hudiOptions.put(HoodieMetadataConfig.ASYNC_INDEX_ENABLE.key(), "true");
        hudiOptions.put(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE.key(), "true");

        hudiOptions.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());

        hudiOptions.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.name());
        hudiOptions.put(FlinkOptions.RECORD_KEY_FIELD.key(), "log_unique_id");
        hudiOptions.put(FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.NO_PRE_COMBINE);
        hudiOptions.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "log_datetime_id");
        hudiOptions.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "log_datetime_id");
        hudiOptions.put(
                FlinkOptions.KEYGEN_CLASS_NAME.key(), "org.apache.hudi.keygen.ComplexKeyGenerator");
        hudiOptions.put(
                FlinkOptions.PAYLOAD_CLASS_NAME.key(),
                "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload");
        hudiOptions.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true");

        // hudiOptions.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.FLINK_STATE.name());
        hudiOptions.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());
        hudiOptions.put(
                FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(),
                HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name());
        hudiOptions.put(
                FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "10"); 


        // Cleaning Service
        hudiOptions.put(FlinkOptions.CLEAN_ASYNC_ENABLED.key(), "true");

        // KEEP_LATEST_FILE_VERSIONS: keeps the last N versions of the file slices written;
        // used when \"hoodie.clean.fileversions.retained\" is explicitly set only.
        // KEEP_LATEST_COMMITS: keeps the file slices written by the last N commits;
        // used when \"hoodie.clean.commits.retained\" is explicitly set only.
        // KEEP_LATEST_BY_HOURS: keeps the file slices written in the last N hours based on the
        // commit time;
        // used when \"hoodie.clean.hours.retained\" is explicitly set only.
        hudiOptions.put(
                FlinkOptions.CLEAN_POLICY.key(), HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name());
        hudiOptions.put(FlinkOptions.CLEAN_RETAIN_HOURS.key(), "24"); 
        // hudiOptions.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "480"); 

        hudiOptions.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
        hudiOptions.put(FlinkOptions.HIVE_SYNC_TABLE.key(), targetTable);
        hudiOptions.put(FlinkOptions.HIVE_SYNC_DB.key(), targetDatabase);
        hudiOptions.put(FlinkOptions.HIVE_SYNC_MODE.key(), HiveSyncMode.HMS.name());
        // option for thrift is located in other file / env
        

        // clustering service
        hudiOptions.put(FlinkOptions.CLUSTERING_TASKS.key(), "10");
        hudiOptions.put(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
        hudiOptions.put(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), "2"); 
        hudiOptions.put(
                FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS.key(),
                HoodieClusteringConfig.FLINK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);

        hudiOptions.put(
                FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME.key(), RECENT_DAYS.name());
        hudiOptions.put(
                FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key(), "0");
        hudiOptions.put(FlinkOptions.CLUSTERING_TARGET_PARTITIONS.key(), "3");
        hudiOptions.put(
                FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES.key(),
                String.valueOf(256 * 1024 * 1024L));
        hudiOptions.put(
                FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT.key(),
                String.valueOf(180 * 1024 * 1024L));

        hudiOptions.put(FlinkOptions.WRITE_TASKS.key(), "10"); 
        // only for COW table
        // hudiOptions.put(FlinkOptions.INSERT_CLUSTER.key(), "true"); // merge small file
        // for insert mode
        hudiOptions.put(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE.key(), "256");
        hudiOptions.put(
                FlinkOptions.WRITE_BATCH_SIZE.key(), "64"); 
        hudiOptions.put(FlinkOptions.WRITE_LOG_BLOCK_SIZE.key(), "128");
        hudiOptions.put(FlinkOptions.WRITE_LOG_MAX_SIZE.key(), "256");

        hudiOptions.put(TIMELINE_LAYOUT_VERSION_NUM.key(), "2");
        hudiOptions.put(ARCHIVE_MIN_COMMITS.key(), "480"); 
        hudiOptions.put(ARCHIVE_MAX_COMMITS.key(), "580");

Stacktrace

2025-06-16 17:38:15
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: KAFKA (test-topic) -> Transform: RowData -> hoodie_append_write: default_database.table_06161722' (operator 59cdd8a1cddabde67ee500c4f33ff1ab).
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:651)
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:201)
	at org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142)
	at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [commits the instant 20250616083812901] error
	... 6 more
Caused by: org.apache.hudi.exception.HoodieException: Error occurs when executing map
	at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
	at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
	at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
	at java.base/java.util.stream.AbstractTask.compute(Unknown Source)
	at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
	at java.base/java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
	at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
	at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
	at org.apache.hudi.client.common.HoodieFlinkEngineContext.map(HoodieFlinkEngineContext.java:114)
	at org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy.generateClusteringPlan(PartitionAwareClusteringPlanStrategy.java:168)
	at org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor.createClusteringPlan(ClusteringPlanActionExecutor.java:89)
	at org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor.execute(ClusteringPlanActionExecutor.java:94)
	at org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.scheduleClustering(HoodieFlinkCopyOnWriteTable.java:332)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableServiceInternal(BaseHoodieTableServiceClient.java:666)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableService(BaseHoodieTableServiceClient.java:644)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleClusteringAtInstant(BaseHoodieTableServiceClient.java:462)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleClustering(BaseHoodieTableServiceClient.java:452)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.inlineScheduleClustering(BaseHoodieTableServiceClient.java:745)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.inlineClustering(BaseHoodieTableServiceClient.java:721)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.inlineClustering(BaseHoodieTableServiceClient.java:734)
	at org.apache.hudi.client.BaseHoodieTableServiceClient.runTableServicesInline(BaseHoodieTableServiceClient.java:607)
	at org.apache.hudi.client.BaseHoodieWriteClient.runTableServicesInlineInternal(BaseHoodieWriteClient.java:603)
	at org.apache.hudi.client.BaseHoodieWriteClient.runTableServicesInline(BaseHoodieWriteClient.java:590)
	at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:259)
	at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:111)
	at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:74)
	at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:207)
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doCommit(StreamWriteOperatorCoordinator.java:588)
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:564)
	at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$notifyCheckpointComplete$2(StreamWriteOperatorCoordinator.java:263)
	at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
	... 3 more
Caused by: java.lang.IllegalArgumentException: Metadata is empty for partition: log_datetime_id=2025-06-16+17
	at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:42)
	at org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy.buildClusteringGroupsForPartition(BaseConsistentHashingBucketClusteringPlanStrategy.java:101)
	at org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy.lambda$generateClusteringPlan$d5a31cf4$1(PartitionAwareClusteringPlanStrategy.java:170)
	at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
	... 40 more

Metadata

Metadata

Assignees

No one assigned

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions