Skip to content

Flink Connector Fails to Validate Tables with Custom Properties #2425

@LiebingYu

Description

@LiebingYu

Search before asking

  • I searched in the issues and found nothing similar.

Fluss version

main (development)

Please describe the bug 🐞

Problem Description

When a Fluss table contains custom properties (e.g., owner, team, created_by), the Flink connector fails with a ValidationException during table source/sink creation. This is a common use case in enterprise environments where users add metadata properties to track table ownership and other organizational information.

Expected Behavior

The Flink connector should accept and ignore custom properties that are stored in Fluss table's customProperties field. According to the Fluss design, custom properties are not understood by Fluss core but are stored as part of the table's metadata to provide a mechanism for users to persist user-defined properties.

Actual Behavior

The Flink connector throws a ValidationException with a message like:

Unsupported options:

owner

This happens because the FlinkTableFactory.validateExcept() method only excludes prefixes like "table.", "client.", and datalake-specific prefixes (e.g., "paimon."), but does not account for arbitrary custom properties.

How to Reproduce

  1. Create a Fluss table with a custom property:
CREATE TABLE my_table (
  id BIGINT,
  name STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'bucket.num' = '10',
  'owner' = 'user@example.com'
);
  1. Try to read from or write to this table using Flink:
SELECT * FROM my_table;
  1. The connector will fail with ValidationException: Unsupported options: owner

Root Cause Analysis

The issue is in the FlinkTableFactory class:

Source Creation:

List<String> prefixesToSkip = new ArrayList<>(Arrays.asList("table.", "client."));
datalakeFormat.ifPresent(dataLakeFormat -> prefixesToSkip.add(dataLakeFormat + "."));
helper.validateExcept(prefixesToSkip.toArray(new String[0]));

Sink Creation:

if (datalakeFormat.isPresent()) {
    helper.validateExcept("table.", "client.", datalakeFormat.get() + ".");
} else {
    helper.validateExcept("table.", "client.");
}

The validation logic does not exclude custom properties, even though:

  1. Fluss server stores custom properties separately from table properties (see TableDescriptor.getCustomProperties())
  2. Custom properties are explicitly designed to be "not understood by Fluss" but stored as metadata

Solution

Proposed Fix

We can simply remove the option validation and allow all custom properties to pass through.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions