-
Notifications
You must be signed in to change notification settings - Fork 487
Tiering support commit by time #2185
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
38395e1 to
46c293e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements tiering timeout support to prevent long-running table tiering operations from blocking other tables. The feature introduces a maximum duration for tiering a single table, after which it will be force-completed or skipped.
Key Changes
- Added
forceIgnoreflag toTieringSplitand its subclasses to mark splits that should be skipped due to timeout - Implemented periodic timeout checking in
TieringSourceEnumeratorwith configurable max duration and detection interval - Introduced
TieringReachMaxDurationEventto notify readers when a table reaches max tiering duration - Updated split handling in
TieringSplitReaderto force-complete in-progress log splits and skip new splits when timeout occurs
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 18 comments.
Show a summary per file
| File | Description |
|---|---|
| TieringSplit.java | Added forceIgnore field and methods to mark splits for skipping |
| TieringSnapshotSplit.java | Added constructors supporting forceIgnore parameter |
| TieringLogSplit.java | Added constructors supporting forceIgnore parameter |
| TieringSplitSerializer.java | Updated serialization to include forceIgnore field |
| TieringSourceEnumerator.java | Implemented periodic timeout checking, deadline tracking, and timeout event broadcasting |
| TieringSplitReader.java | Added logic to force-complete log splits and skip splits marked with forceIgnore |
| TieringSourceReader.java | Integrated timeout event handling and custom fetcher manager |
| TieringSourceFetcherManager.java | New class to manage timeout notifications to split readers |
| TieringReachMaxDurationEvent.java | New event class to signal table timeout to readers |
| TieringSourceOptions.java | Added configuration options for max duration and detection interval |
| TieringSource.java | Updated builders to support new timeout configuration parameters |
| LakeTieringJobBuilder.java | Wired up timeout configuration from Fluss config |
| TieringSplitGenerator.java | Removed hardcoded numberOfSplits parameter, changed log levels to DEBUG |
| TieringSplitSerializerTest.java | Added tests for forceIgnore serialization and updated string representations |
| TieringSourceEnumeratorTest.java | Added timeout test, refactored assertions to use containsExactlyInAnyOrderElementsOf, added helper methods |
| TieringSourceReaderTest.java | New test file for testing timeout event handling at reader level |
Comments suppressed due to low confidence (2)
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitGenerator.java:339
- The log level was changed from INFO to DEBUG for this message about skipping splits when offset conditions are met. This is appropriate as it reduces log verbosity for expected behavior. However, ensure this doesn't make it harder to diagnose why certain tables aren't being tiered, as this condition might occur frequently during normal operation and could be useful for troubleshooting.
LOG.debug(
"The lastCommittedBucketOffset {} is equals or bigger than latestBucketOffset {}, skip generating split for bucket {}",
lastCommittedBucketOffset,
latestBucketOffset,
tableBucket);
return Optional.empty();
}
}
}
private Optional<TieringSplit> generateSplitForLogTableBucket(
TablePath tablePath,
TableBucket tableBucket,
@Nullable String partitionName,
@Nullable Long lastCommittedBucketOffset,
long latestBucketOffset) {
if (latestBucketOffset <= 0) {
LOG.debug(
"The latestBucketOffset {} is equals or less than 0, skip generating split for bucket {}",
latestBucketOffset,
tableBucket);
return Optional.empty();
}
// the bucket is never been tiered
if (lastCommittedBucketOffset == null) {
// the bucket is never been tiered, scan fluss log from the earliest offset
return Optional.of(
new TieringLogSplit(
tablePath,
tableBucket,
partitionName,
EARLIEST_OFFSET,
latestBucketOffset));
} else {
// the bucket has been tiered, scan remain fluss log
if (lastCommittedBucketOffset < latestBucketOffset) {
return Optional.of(
new TieringLogSplit(
tablePath,
tableBucket,
partitionName,
lastCommittedBucketOffset,
latestBucketOffset));
}
}
LOG.debug(
"The lastCommittedBucketOffset {} is equals or bigger than latestBucketOffset {}, skip generating split for bucket {}",
lastCommittedBucketOffset,
latestBucketOffset,
tableBucket);
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java:168
- When a table reaches the max tiering duration, the timeout check only forces completion for log splits (lines 159-161 and 292). However, snapshot splits can also be in progress. If a table times out while processing snapshot splits, they won't be force-completed, leading to potential hangs or inconsistent behavior. Consider extending the timeout handling to snapshot splits as well.
// may read snapshot firstly
if (currentSnapshotSplitReader != null) {
// for snapshot split, we don't force to complete it
// since we rely on the log offset for the snapshot to
// do next tiering, if force to complete, we can't get the log offset
CloseableIterator<RecordAndPos> recordIterator = currentSnapshotSplitReader.readBatch();
if (recordIterator == null) {
LOG.info("Split {} is finished", currentSnapshotSplit.splitId());
return finishCurrentSnapshotSplit();
} else {
return forSnapshotSplitRecords(
currentSnapshotSplit.getTableBucket(), recordIterator);
}
} else {
if (currentLogScanner != null) {
if (timeoutTables.contains(currentTableId)) {
return forceCompleteTieringLogRecords();
}
ScanRecords scanRecords = currentLogScanner.poll(pollTimeout);
// force to complete records
return forLogRecords(scanRecords);
} else {
return emptyTableBucketWriteResultWithSplitIds();
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
.../src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
Show resolved
Hide resolved
.../src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
Show resolved
Hide resolved
.../src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
Outdated
Show resolved
Hide resolved
...link-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java
Outdated
Show resolved
Hide resolved
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
Outdated
Show resolved
Hide resolved
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
Outdated
Show resolved
Hide resolved
...ss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java
Show resolved
Hide resolved
...common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java
Outdated
Show resolved
Hide resolved
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
Outdated
Show resolved
Hide resolved
...-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringReachMaxDurationEvent.java
Show resolved
Hide resolved
466629a to
bbd9d38
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 7 comments.
Comments suppressed due to low confidence (1)
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:350
- There's a potential race condition between reading and modifying pendingSplits. The handleReachMaxTieringDurationTables method (lines 301-313) iterates and modifies pendingSplits, while assignSplits (line 350) also modifies it. Both methods can be called concurrently from different async callbacks, but only readersAwaitingSplit is synchronized, not pendingSplits. This could lead to concurrent modification exceptions or inconsistent state.
// todo: do we need to add lock?
synchronized (readersAwaitingSplit) {
if (!readersAwaitingSplit.isEmpty()) {
final Integer[] readers = readersAwaitingSplit.toArray(new Integer[0]);
for (Integer nextAwaitingReader : readers) {
if (!context.registeredReaders().containsKey(nextAwaitingReader)) {
readersAwaitingSplit.remove(nextAwaitingReader);
continue;
}
if (!pendingSplits.isEmpty()) {
TieringSplit tieringSplit = pendingSplits.remove(0);
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
Outdated
Show resolved
Hide resolved
fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
Outdated
Show resolved
Hide resolved
fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
Outdated
Show resolved
Hide resolved
...k/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java
Outdated
Show resolved
Hide resolved
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.
bbd9d38 to
9ae01ad
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
Outdated
Show resolved
Hide resolved
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
Outdated
Show resolved
Hide resolved
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
Outdated
Show resolved
Hide resolved
...link-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java
Show resolved
Hide resolved
.../src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
Outdated
Show resolved
Hide resolved
9ae01ad to
bb3f75e
Compare
bb3f75e to
c72407d
Compare
|
@leonardBang Coud you please help me review this pr when you got some time |
leonardBang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @luoyuxia for the work, I left some minor comment.
But I also have a question for current design, Could we address large splits case via current design? The data freshness would be worse for large log split from my understanding.
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java
Outdated
Show resolved
Hide resolved
| throw new IllegalArgumentException( | ||
| "Partition name and partition id must be both null or both not null."); | ||
| } | ||
| this.forceIgnore = forceIgnore; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
forceIgnore is not clear especially with any explanation, here we want to use a member to represent how to read of TieringSplit, IIUC, the meaning of this status is skip reading data for current split, how about skipCurrentRound and add java doc for method or member?
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java
Outdated
Show resolved
Hide resolved
| } | ||
| TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); | ||
|
|
||
| boolean forceIgnore = in.readBoolean(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hint: we can add a note we do not need consider compatibility here.
| .defaultValue(Duration.ofMinutes(30)) | ||
| .withDescription( | ||
| "The maximum duration for tiering a single table. If tiering a table exceeds this duration, " | ||
| + "it will be force completed: the tiering will be finalized and committed to the data lake " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after the table wast force completed, would it be tired again for next round ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'll be mark as finished and wait (about the data fressness interval) to scheduled to pending again. Do we need to just put into the pending queue for the table that is forced completed?
Hi, thanks for reviewing my pr. Yes, large log split will harm data fressnes. |
I mean for the large log table, the data freshness goal(i.e. 5minute) would be worse to 10 minutes, 15 minutes after some scheduled rounds, maybe in that case we need monitor these tables and scale up tiering service resources. |
Yes, your are right. But IIUC, the data freshness goal is a best effort when tiering resource is enough, but when tiering resource is not enough, it's normal that we can't reach to the goal. But a round of tiering of a table reach the max duraiton, it means that the resource is not enough, we need to scale up tiering service resources.. |
c72407d to
dc4f4a3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 23 out of 23 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...nk/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java
Outdated
Show resolved
Hide resolved
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
Show resolved
Hide resolved
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java
Show resolved
Hide resolved
...nk/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java
Show resolved
Hide resolved
fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
Show resolved
Hide resolved
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
Show resolved
Hide resolved
...-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java
Show resolved
Hide resolved
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java
Show resolved
Hide resolved
dc4f4a3 to
8942b0b
Compare
8942b0b to
88c65ca
Compare
|
@leonardBang Thanks for your review. Comments are addressed. And I also remove the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 23 out of 23 changed files in this pull request and generated 13 comments.
Comments suppressed due to low confidence (1)
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java:697
- The removed method
sortSplitswas used to make tests deterministic by sorting splits before comparison. Replacing this withcontainsExactlyInAnyOrderElementsOfis correct and cleaner, but ensure that theequals()andhashCode()methods ofTieringSplitand its subclasses properly consider the newskipCurrentRoundfield (which they do on lines 156-157 and 162-163 in TieringSplit.java). This change looks correct.
}
private void verifyTieringSplitAssignment(
FlussMockSplitEnumeratorContext<TieringSplit> context,
int expectedSplitSize,
TablePath expectedTablePath)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
Show resolved
Hide resolved
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
Show resolved
Hide resolved
...nk/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
Show resolved
Hide resolved
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
Show resolved
Hide resolved
...link-common/src/test/java/org/apache/fluss/flink/tiering/source/TieringSourceReaderTest.java
Show resolved
Hide resolved
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
Show resolved
Hide resolved
.../src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
Show resolved
Hide resolved
...uss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java
Show resolved
Hide resolved
leonardBang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @luoyuxia for the update, +1
88c65ca to
201d606
Compare
Purpose
Linked issue: close #1893
Brief change log
TieringReaderto notify it the table reach the max durationTests
TieringSourceReaderTest,TieringSourceReaderTest#testTableReachMaxTieringDuration,TieringITCaseAPI and Format
Documentation