Open
Description
Tips before filing an issue
-
Have you gone through our FAQs?
-
Join the mailing list to engage in conversations and get faster support at [email protected].
-
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
A clear and concise description of the problem.
To Reproduce
Expected behavior
- Loading data from flink to hudi table, inline cluster enabled.
- The following error occurs when performing clustering.
Environment Description
-
Hudi version : 1.0.2
-
Flink version : 1.20.0
-
Hive version : 2.3.7
-
Hadoop version : 2.10.2
-
Storage (HDFS/S3/GCS..) : HDFS
-
Running on Docker? (yes/no) : yes
Additional context
my optoins
hudiOptions.put(FlinkOptions.PATH.key(), tablePath);
hudiOptions.put(FlinkOptions.DATABASE_NAME.key(), targetDatabase);
hudiOptions.put(FlinkOptions.TABLE_NAME.key(), targetTable);
hudiOptions.put(HoodieMetadataConfig.METRICS_ENABLE.key(), "true");
hudiOptions.put(HoodieMetadataConfig.ASYNC_INDEX_ENABLE.key(), "true");
hudiOptions.put(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE.key(), "true");
hudiOptions.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
hudiOptions.put(FlinkOptions.OPERATION.key(), WriteOperationType.INSERT.name());
hudiOptions.put(FlinkOptions.RECORD_KEY_FIELD.key(), "log_unique_id");
hudiOptions.put(FlinkOptions.PRECOMBINE_FIELD.key(), FlinkOptions.NO_PRE_COMBINE);
hudiOptions.put(FlinkOptions.PARTITION_PATH_FIELD.key(), "log_datetime_id");
hudiOptions.put(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS.key(), "log_datetime_id");
hudiOptions.put(
FlinkOptions.KEYGEN_CLASS_NAME.key(), "org.apache.hudi.keygen.ComplexKeyGenerator");
hudiOptions.put(
FlinkOptions.PAYLOAD_CLASS_NAME.key(),
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload");
hudiOptions.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true");
// hudiOptions.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.FLINK_STATE.name());
hudiOptions.put(FlinkOptions.INDEX_TYPE.key(), HoodieIndex.IndexType.BUCKET.name());
hudiOptions.put(
FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(),
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name());
hudiOptions.put(
FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "10");
// Cleaning Service
hudiOptions.put(FlinkOptions.CLEAN_ASYNC_ENABLED.key(), "true");
// KEEP_LATEST_FILE_VERSIONS: keeps the last N versions of the file slices written;
// used when \"hoodie.clean.fileversions.retained\" is explicitly set only.
// KEEP_LATEST_COMMITS: keeps the file slices written by the last N commits;
// used when \"hoodie.clean.commits.retained\" is explicitly set only.
// KEEP_LATEST_BY_HOURS: keeps the file slices written in the last N hours based on the
// commit time;
// used when \"hoodie.clean.hours.retained\" is explicitly set only.
hudiOptions.put(
FlinkOptions.CLEAN_POLICY.key(), HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name());
hudiOptions.put(FlinkOptions.CLEAN_RETAIN_HOURS.key(), "24");
// hudiOptions.put(FlinkOptions.CLEAN_RETAIN_COMMITS.key(), "480");
hudiOptions.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true");
hudiOptions.put(FlinkOptions.HIVE_SYNC_TABLE.key(), targetTable);
hudiOptions.put(FlinkOptions.HIVE_SYNC_DB.key(), targetDatabase);
hudiOptions.put(FlinkOptions.HIVE_SYNC_MODE.key(), HiveSyncMode.HMS.name());
// option for thrift is located in other file / env
// clustering service
hudiOptions.put(FlinkOptions.CLUSTERING_TASKS.key(), "10");
hudiOptions.put(HoodieClusteringConfig.INLINE_CLUSTERING.key(), "true");
hudiOptions.put(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key(), "2");
hudiOptions.put(
FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS.key(),
HoodieClusteringConfig.FLINK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
hudiOptions.put(
FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME.key(), RECENT_DAYS.name());
hudiOptions.put(
FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST.key(), "0");
hudiOptions.put(FlinkOptions.CLUSTERING_TARGET_PARTITIONS.key(), "3");
hudiOptions.put(
FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES.key(),
String.valueOf(256 * 1024 * 1024L));
hudiOptions.put(
FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT.key(),
String.valueOf(180 * 1024 * 1024L));
hudiOptions.put(FlinkOptions.WRITE_TASKS.key(), "10");
// only for COW table
// hudiOptions.put(FlinkOptions.INSERT_CLUSTER.key(), "true"); // merge small file
// for insert mode
hudiOptions.put(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE.key(), "256");
hudiOptions.put(
FlinkOptions.WRITE_BATCH_SIZE.key(), "64");
hudiOptions.put(FlinkOptions.WRITE_LOG_BLOCK_SIZE.key(), "128");
hudiOptions.put(FlinkOptions.WRITE_LOG_MAX_SIZE.key(), "256");
hudiOptions.put(TIMELINE_LAYOUT_VERSION_NUM.key(), "2");
hudiOptions.put(ARCHIVE_MIN_COMMITS.key(), "480");
hudiOptions.put(ARCHIVE_MAX_COMMITS.key(), "580");
Stacktrace
2025-06-16 17:38:15
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: KAFKA (test-topic) -> Transform: RowData -> hoodie_append_write: default_database.table_06161722' (operator 59cdd8a1cddabde67ee500c4f33ff1ab).
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:651)
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$start$0(StreamWriteOperatorCoordinator.java:201)
at org.apache.hudi.sink.utils.NonThrownExecutor.handleException(NonThrownExecutor.java:142)
at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hudi.exception.HoodieException: Executor executes action [commits the instant 20250616083812901] error
... 6 more
Caused by: org.apache.hudi.exception.HoodieException: Error occurs when executing map
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:40)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceTask.doLeaf(Unknown Source)
at java.base/java.util.stream.AbstractTask.compute(Unknown Source)
at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
at org.apache.hudi.client.common.HoodieFlinkEngineContext.map(HoodieFlinkEngineContext.java:114)
at org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy.generateClusteringPlan(PartitionAwareClusteringPlanStrategy.java:168)
at org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor.createClusteringPlan(ClusteringPlanActionExecutor.java:89)
at org.apache.hudi.table.action.cluster.ClusteringPlanActionExecutor.execute(ClusteringPlanActionExecutor.java:94)
at org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.scheduleClustering(HoodieFlinkCopyOnWriteTable.java:332)
at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableServiceInternal(BaseHoodieTableServiceClient.java:666)
at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleTableService(BaseHoodieTableServiceClient.java:644)
at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleClusteringAtInstant(BaseHoodieTableServiceClient.java:462)
at org.apache.hudi.client.BaseHoodieTableServiceClient.scheduleClustering(BaseHoodieTableServiceClient.java:452)
at org.apache.hudi.client.BaseHoodieTableServiceClient.inlineScheduleClustering(BaseHoodieTableServiceClient.java:745)
at org.apache.hudi.client.BaseHoodieTableServiceClient.inlineClustering(BaseHoodieTableServiceClient.java:721)
at org.apache.hudi.client.BaseHoodieTableServiceClient.inlineClustering(BaseHoodieTableServiceClient.java:734)
at org.apache.hudi.client.BaseHoodieTableServiceClient.runTableServicesInline(BaseHoodieTableServiceClient.java:607)
at org.apache.hudi.client.BaseHoodieWriteClient.runTableServicesInlineInternal(BaseHoodieWriteClient.java:603)
at org.apache.hudi.client.BaseHoodieWriteClient.runTableServicesInline(BaseHoodieWriteClient.java:590)
at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:259)
at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:111)
at org.apache.hudi.client.HoodieFlinkWriteClient.commit(HoodieFlinkWriteClient.java:74)
at org.apache.hudi.client.BaseHoodieWriteClient.commit(BaseHoodieWriteClient.java:207)
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.doCommit(StreamWriteOperatorCoordinator.java:588)
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.commitInstant(StreamWriteOperatorCoordinator.java:564)
at org.apache.hudi.sink.StreamWriteOperatorCoordinator.lambda$notifyCheckpointComplete$2(StreamWriteOperatorCoordinator.java:263)
at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130)
... 3 more
Caused by: java.lang.IllegalArgumentException: Metadata is empty for partition: log_datetime_id=2025-06-16+17
at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:42)
at org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy.buildClusteringGroupsForPartition(BaseConsistentHashingBucketClusteringPlanStrategy.java:101)
at org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy.lambda$generateClusteringPlan$d5a31cf4$1(PartitionAwareClusteringPlanStrategy.java:170)
at org.apache.hudi.common.function.FunctionWrapper.lambda$throwingMapWrapper$0(FunctionWrapper.java:38)
... 40 more