Skip to content

Conversation

@luoyuxia
Copy link
Contributor

@luoyuxia luoyuxia commented Dec 16, 2025

Purpose

Linked issue: close #1893

Brief change log

  • Introduce a option to control how long the tiering will last before force to complete
  • Tiering Enumerator will record the deadline of tiering for each table, when the deadline reach for a table
    • mark all the pending split for the table to force to ignore so that although reader recieve the split, it can just ignore it to force to complete the tiering (we still need to send the split since the tiering commiter operator need to collect all the split for one round of tiering)
    • send a event to TieringReader to notify it the table reach the max duration
  • when TieringReader` recieves the event, force to complete the split

Tests

TieringSourceReaderTest, TieringSourceReaderTest#testTableReachMaxTieringDuration,TieringITCase

API and Format

Documentation

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements tiering timeout support to prevent long-running table tiering operations from blocking other tables. The feature introduces a maximum duration for tiering a single table, after which it will be force-completed or skipped.

Key Changes

  • Added forceIgnore flag to TieringSplit and its subclasses to mark splits that should be skipped due to timeout
  • Implemented periodic timeout checking in TieringSourceEnumerator with configurable max duration and detection interval
  • Introduced TieringReachMaxDurationEvent to notify readers when a table reaches max tiering duration
  • Updated split handling in TieringSplitReader to force-complete in-progress log splits and skip new splits when timeout occurs

Reviewed changes

Copilot reviewed 16 out of 16 changed files in this pull request and generated 18 comments.

Show a summary per file
File Description
TieringSplit.java Added forceIgnore field and methods to mark splits for skipping
TieringSnapshotSplit.java Added constructors supporting forceIgnore parameter
TieringLogSplit.java Added constructors supporting forceIgnore parameter
TieringSplitSerializer.java Updated serialization to include forceIgnore field
TieringSourceEnumerator.java Implemented periodic timeout checking, deadline tracking, and timeout event broadcasting
TieringSplitReader.java Added logic to force-complete log splits and skip splits marked with forceIgnore
TieringSourceReader.java Integrated timeout event handling and custom fetcher manager
TieringSourceFetcherManager.java New class to manage timeout notifications to split readers
TieringReachMaxDurationEvent.java New event class to signal table timeout to readers
TieringSourceOptions.java Added configuration options for max duration and detection interval
TieringSource.java Updated builders to support new timeout configuration parameters
LakeTieringJobBuilder.java Wired up timeout configuration from Fluss config
TieringSplitGenerator.java Removed hardcoded numberOfSplits parameter, changed log levels to DEBUG
TieringSplitSerializerTest.java Added tests for forceIgnore serialization and updated string representations
TieringSourceEnumeratorTest.java Added timeout test, refactored assertions to use containsExactlyInAnyOrderElementsOf, added helper methods
TieringSourceReaderTest.java New test file for testing timeout event handling at reader level
Comments suppressed due to low confidence (2)

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java:339

  • The log level was changed from INFO to DEBUG for this message about skipping splits when offset conditions are met. This is appropriate as it reduces log verbosity for expected behavior. However, ensure this doesn't make it harder to diagnose why certain tables aren't being tiered, as this condition might occur frequently during normal operation and could be useful for troubleshooting.
                LOG.debug(
                        "The lastCommittedBucketOffset {} is equals or bigger than latestBucketOffset {}, skip generating split for bucket {}",
                        lastCommittedBucketOffset,
                        latestBucketOffset,
                        tableBucket);
                return Optional.empty();
            }
        }
    }

    private Optional<TieringSplit> generateSplitForLogTableBucket(
            TablePath tablePath,
            TableBucket tableBucket,
            @Nullable String partitionName,
            @Nullable Long lastCommittedBucketOffset,
            long latestBucketOffset) {
        if (latestBucketOffset <= 0) {
            LOG.debug(
                    "The latestBucketOffset {} is equals or less than 0, skip generating split for bucket {}",
                    latestBucketOffset,
                    tableBucket);
            return Optional.empty();
        }

        // the bucket is never been tiered
        if (lastCommittedBucketOffset == null) {
            // the bucket is never been tiered, scan fluss log from the earliest offset
            return Optional.of(
                    new TieringLogSplit(
                            tablePath,
                            tableBucket,
                            partitionName,
                            EARLIEST_OFFSET,
                            latestBucketOffset));
        } else {
            // the bucket has been tiered, scan remain fluss log
            if (lastCommittedBucketOffset < latestBucketOffset) {
                return Optional.of(
                        new TieringLogSplit(
                                tablePath,
                                tableBucket,
                                partitionName,
                                lastCommittedBucketOffset,
                                latestBucketOffset));
            }
        }
        LOG.debug(
                "The lastCommittedBucketOffset {} is equals or bigger than latestBucketOffset {}, skip generating split for bucket {}",
                lastCommittedBucketOffset,
                latestBucketOffset,
                tableBucket);

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:168

  • When a table reaches the max tiering duration, the timeout check only forces completion for log splits (lines 159-161 and 292). However, snapshot splits can also be in progress. If a table times out while processing snapshot splits, they won't be force-completed, leading to potential hangs or inconsistent behavior. Consider extending the timeout handling to snapshot splits as well.
        // may read snapshot firstly
        if (currentSnapshotSplitReader != null) {
            // for snapshot split, we don't force to complete it
            // since we rely on the log offset for the snapshot to
            // do next tiering, if force to complete, we can't get the log offset
            CloseableIterator<RecordAndPos> recordIterator = currentSnapshotSplitReader.readBatch();
            if (recordIterator == null) {
                LOG.info("Split {} is finished", currentSnapshotSplit.splitId());
                return finishCurrentSnapshotSplit();
            } else {
                return forSnapshotSplitRecords(
                        currentSnapshotSplit.getTableBucket(), recordIterator);
            }
        } else {
            if (currentLogScanner != null) {
                if (timeoutTables.contains(currentTableId)) {
                    return forceCompleteTieringLogRecords();
                }
                ScanRecords scanRecords = currentLogScanner.poll(pollTimeout);
                // force to complete records
                return forLogRecords(scanRecords);
            } else {
                return emptyTableBucketWriteResultWithSplitIds();
            }
        }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@luoyuxia luoyuxia force-pushed the tiering-support-commit-by-time branch 3 times, most recently from 466629a to bbd9d38 Compare December 25, 2025 08:55
@luoyuxia luoyuxia requested a review from Copilot December 25, 2025 08:58
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 19 out of 19 changed files in this pull request and generated 7 comments.

Comments suppressed due to low confidence (1)

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:350

  • There's a potential race condition between reading and modifying pendingSplits. The handleReachMaxTieringDurationTables method (lines 301-313) iterates and modifies pendingSplits, while assignSplits (line 350) also modifies it. Both methods can be called concurrently from different async callbacks, but only readersAwaitingSplit is synchronized, not pendingSplits. This could lead to concurrent modification exceptions or inconsistent state.
        // todo: do we need to add lock?
        synchronized (readersAwaitingSplit) {
            if (!readersAwaitingSplit.isEmpty()) {
                final Integer[] readers = readersAwaitingSplit.toArray(new Integer[0]);
                for (Integer nextAwaitingReader : readers) {
                    if (!context.registeredReaders().containsKey(nextAwaitingReader)) {
                        readersAwaitingSplit.remove(nextAwaitingReader);
                        continue;
                    }
                    if (!pendingSplits.isEmpty()) {
                        TieringSplit tieringSplit = pendingSplits.remove(0);

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@luoyuxia luoyuxia force-pushed the tiering-support-commit-by-time branch from bbd9d38 to 9ae01ad Compare December 25, 2025 09:55
@luoyuxia luoyuxia marked this pull request as ready for review December 25, 2025 11:32
@luoyuxia luoyuxia requested a review from Copilot December 25, 2025 11:32
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 19 out of 19 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@luoyuxia luoyuxia force-pushed the tiering-support-commit-by-time branch from 9ae01ad to bb3f75e Compare December 25, 2025 11:56
@luoyuxia luoyuxia requested a review from leonardBang December 25, 2025 11:56
@luoyuxia luoyuxia force-pushed the tiering-support-commit-by-time branch from bb3f75e to c72407d Compare December 25, 2025 12:00
@luoyuxia
Copy link
Contributor Author

@leonardBang Coud you please help me review this pr when you got some time

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @luoyuxia for the work, I left some minor comment.

But I also have a question for current design, Could we address large splits case via current design? The data freshness would be worse for large log split from my understanding.

throw new IllegalArgumentException(
"Partition name and partition id must be both null or both not null.");
}
this.forceIgnore = forceIgnore;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forceIgnore is not clear especially with any explanation, here we want to use a member to represent how to read of TieringSplit, IIUC, the meaning of this status is skip reading data for current split, how about skipCurrentRound and add java doc for method or member?

}
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);

boolean forceIgnore = in.readBoolean();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hint: we can add a note we do not need consider compatibility here.

.defaultValue(Duration.ofMinutes(30))
.withDescription(
"The maximum duration for tiering a single table. If tiering a table exceeds this duration, "
+ "it will be force completed: the tiering will be finalized and committed to the data lake "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after the table wast force completed, would it be tired again for next round ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll be mark as finished and wait (about the data fressness interval) to scheduled to pending again. Do we need to just put into the pending queue for the table that is forced completed?

@luoyuxia
Copy link
Contributor Author

luoyuxia commented Jan 9, 2026

Thanks @luoyuxia for the work, I left some minor comment.

But I also have a question for current design, Could we address large splits case via current design? The data freshness would be worse for large log split from my understanding.

Hi, thanks for reviewing my pr. Yes, large log split will harm data fressnes.
But What do you mean address large splits case?
In the current design, if we have a large split, and aussming the max tiering duration is 10 min, after 10 min, the large split will be forced to commit.

@leonardBang
Copy link
Contributor

Thanks @luoyuxia for the work, I left some minor comment.
But I also have a question for current design, Could we address large splits case via current design? The data freshness would be worse for large log split from my understanding.

Hi, thanks for reviewing my pr. Yes, large log split will harm data fressnes. But What do you mean address large splits case? In the current design, if we have a large split, and aussming the max tiering duration is 10 min, after 10 min, the large split will be forced to commit.

I mean for the large log table, the data freshness goal(i.e. 5minute) would be worse to 10 minutes, 15 minutes after some scheduled rounds, maybe in that case we need monitor these tables and scale up tiering service resources.

@luoyuxia
Copy link
Contributor Author

Thanks @luoyuxia for the work, I left some minor comment.
But I also have a question for current design, Could we address large splits case via current design? The data freshness would be worse for large log split from my understanding.

Hi, thanks for reviewing my pr. Yes, large log split will harm data fressnes. But What do you mean address large splits case? In the current design, if we have a large split, and aussming the max tiering duration is 10 min, after 10 min, the large split will be forced to commit.

I mean for the large log table, the data freshness goal(i.e. 5minute) would be worse to 10 minutes, 15 minutes after some scheduled rounds, maybe in that case we need monitor these tables and scale up tiering service resources.

Yes, your are right. But IIUC, the data freshness goal is a best effort when tiering resource is enough, but when tiering resource is not enough, it's normal that we can't reach to the goal. But a round of tiering of a table reach the max duraiton, it means that the resource is not enough, we need to scale up tiering service resources..

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 23 out of 23 changed files in this pull request and generated 8 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@luoyuxia luoyuxia force-pushed the tiering-support-commit-by-time branch from dc4f4a3 to 8942b0b Compare January 21, 2026 07:27
@luoyuxia luoyuxia requested a review from Copilot January 21, 2026 07:27
@luoyuxia
Copy link
Contributor Author

@leonardBang Thanks for your review. Comments are addressed. And I also remove the lake.tiering.table.duration.detect-interval to make code clean. PTAL

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 23 out of 23 changed files in this pull request and generated 13 comments.

Comments suppressed due to low confidence (1)

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java:697

  • The removed method sortSplits was used to make tests deterministic by sorting splits before comparison. Replacing this with containsExactlyInAnyOrderElementsOf is correct and cleaner, but ensure that the equals() and hashCode() methods of TieringSplit and its subclasses properly consider the new skipCurrentRound field (which they do on lines 156-157 and 162-163 in TieringSplit.java). This change looks correct.
    }

    private void verifyTieringSplitAssignment(
            FlussMockSplitEnumeratorContext<TieringSplit> context,
            int expectedSplitSize,
            TablePath expectedTablePath)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @luoyuxia for the update, +1

@luoyuxia luoyuxia force-pushed the tiering-support-commit-by-time branch from 88c65ca to 201d606 Compare January 21, 2026 13:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Tiering Service supports splitting large splits and committing them separately to the lake.

2 participants