Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ public class TsTable {
"When there are object fields, the %s %s shall not be '.', '..' or contain './', '.\\'.";
protected String tableName;

private final Map<String, TsTableColumnSchema> columnSchemaMap = new LinkedHashMap<>();
private final Map<String, Integer> tagColumnIndexMap = new HashMap<>();
private final Map<String, Integer> idColumnIndexMap = new HashMap<>();
// Copy-on-Write maps for thread-safe access without read locks
private volatile Map<String, TsTableColumnSchema> columnSchemaMap = new LinkedHashMap<>();
private volatile Map<String, Integer> tagColumnIndexMap = new HashMap<>();

private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

Expand Down Expand Up @@ -114,7 +114,6 @@ public TsTable(String tableName, ImmutableList<TsTableColumnSchema> columnSchema
public TsTable(TsTable origin) {
this.tableName = origin.tableName;
origin.columnSchemaMap.forEach((col, schema) -> this.columnSchemaMap.put(col, schema.copy()));
this.idColumnIndexMap.putAll(origin.idColumnIndexMap);
this.props = origin.props == null ? null : new HashMap<>(origin.props);
this.ttlValue = origin.ttlValue;
this.tagNums = origin.tagNums;
Expand Down Expand Up @@ -149,23 +148,95 @@ public TsTableColumnSchema getColumnSchema(final String columnName) {
}

/**
* Execute a write operation with optimistic lock support. This method handles the write flag and
* version increment automatically.
* Execute a write operation with Copy-on-Write support. This method creates new copies of the
* maps before modification to ensure thread-safe reads without locks.
*
* @param writeOperation the write operation to execute
* @param writeOperation the write operation to execute on the new map copies
*/
private void executeWrite(Runnable writeOperation) {
private void executeWrite(WriteOperation writeOperation) {
readWriteLock.writeLock().lock();
isNotWrite.set(false);
try {
writeOperation.run();
// Copy-on-Write: create local copies first
Map<String, TsTableColumnSchema> newColumnSchemaMap = new LinkedHashMap<>(columnSchemaMap);
Map<String, Integer> newTagColumnIndexMap = new HashMap<>(tagColumnIndexMap);

// Execute write operation on local copies
writeOperation.execute(newColumnSchemaMap, newTagColumnIndexMap);

// After write completes, atomically update the class fields
columnSchemaMap = newColumnSchemaMap;
tagColumnIndexMap = newTagColumnIndexMap;
} finally {
instanceVersion.incrementAndGet();
isNotWrite.set(true);
readWriteLock.writeLock().unlock();
}
}

/**
* Execute a rename operation. columnSchemaMap is modified directly, tagColumnIndexMap is copied
* for thread safety.
*
* @param oldName the old column name
* @param newName the new column name
* @param renameOperation the rename operation to create renamed schema and update tag map
*/
private void executeRename(
final String oldName, final String newName, RenameOperation renameOperation) {
readWriteLock.writeLock().lock();
isNotWrite.set(false);
try {
// Copy tagColumnIndexMap for thread safety
final Map<String, Integer> newTagColumnIndexMap = new HashMap<>(tagColumnIndexMap);

// Get expected schema before renaming
final TsTableColumnSchema expectedSchema = columnSchemaMap.get(oldName);
if (expectedSchema == null) {
return;
}

// Rebuild columnSchemaMap in single pass
final LinkedHashMap<String, TsTableColumnSchema> newColumnSchemaMap =
new LinkedHashMap<>(columnSchemaMap.size());
for (Map.Entry<String, TsTableColumnSchema> entry : columnSchemaMap.entrySet()) {
final String currentKey = entry.getKey();
final TsTableColumnSchema currentSchema = entry.getValue();

if (currentSchema == expectedSchema) {
// This is the column to rename, create renamed schema and update tag map
final TsTableColumnSchema renamedSchema =
renameOperation.createRenamedSchema(expectedSchema, newTagColumnIndexMap);
newColumnSchemaMap.put(newName, renamedSchema);
} else {
// Not the column to rename, add as is
newColumnSchemaMap.put(currentKey, currentSchema);
}
}

// Atomically update both maps
columnSchemaMap = newColumnSchemaMap;
tagColumnIndexMap = newTagColumnIndexMap;
} finally {
instanceVersion.incrementAndGet();
isNotWrite.set(true);
readWriteLock.writeLock().unlock();
}
}

@FunctionalInterface
private interface WriteOperation {
void execute(
Map<String, TsTableColumnSchema> columnSchemaMap, Map<String, Integer> tagColumnIndexMap);
}

@FunctionalInterface
private interface RenameOperation {

TsTableColumnSchema createRenamedSchema(
TsTableColumnSchema expectedSchema, Map<String, Integer> tagMap);
}

public int getTagColumnOrdinal(final String columnName) {
readWriteLock.readLock().lock();
try {
Expand Down Expand Up @@ -201,67 +272,71 @@ public List<TsTableColumnSchema> getTagColumnSchemaList() {

// Currently only supports device view
public void renameTable(final String newName) {
executeWrite(() -> tableName = newName);
executeWrite((colMap, tagMap) -> tableName = newName);
}

public void addColumnSchema(final TsTableColumnSchema columnSchema) {
executeWrite(
() -> {
columnSchemaMap.put(columnSchema.getColumnName(), columnSchema);
(colMap, tagMap) -> {
colMap.put(columnSchema.getColumnName(), columnSchema);
if (columnSchema.getColumnCategory().equals(TsTableColumnCategory.TAG)) {
tagNums++;
tagColumnIndexMap.put(columnSchema.getColumnName(), tagNums - 1);
tagMap.put(columnSchema.getColumnName(), tagNums - 1);
} else if (columnSchema.getColumnCategory().equals(TsTableColumnCategory.FIELD)) {
fieldNum++;
}
});
}

public void renameColumnSchema(final String oldName, final String newName) {
executeWrite(
() -> {
// Ensures idempotency
if (columnSchemaMap.containsKey(oldName)) {
final TsTableColumnSchema schema = columnSchemaMap.remove(oldName);
final Map<String, String> oldProps = schema.getProps();
oldProps.computeIfAbsent(TreeViewSchema.ORIGINAL_NAME, k -> schema.getColumnName());
switch (schema.getColumnCategory()) {
case TAG:
columnSchemaMap.put(
newName, new TagColumnSchema(newName, schema.getDataType(), oldProps));
break;
case FIELD:
columnSchemaMap.put(
newName,
new FieldColumnSchema(
newName,
schema.getDataType(),
((FieldColumnSchema) schema).getEncoding(),
((FieldColumnSchema) schema).getCompressor(),
oldProps));
break;
case ATTRIBUTE:
columnSchemaMap.put(
newName, new AttributeColumnSchema(newName, schema.getDataType(), oldProps));
break;
case TIME:
default:
// Do nothing
columnSchemaMap.put(oldName, schema);
}
executeRename(
oldName,
newName,
(expectedSchema, tagMap) -> {
// Create renamed schema
final Map<String, String> props = expectedSchema.getProps();
props.computeIfAbsent(TreeViewSchema.ORIGINAL_NAME, k -> expectedSchema.getColumnName());

final TsTableColumnSchema renamedSchema;
switch (expectedSchema.getColumnCategory()) {
case TAG:
renamedSchema = new TagColumnSchema(newName, expectedSchema.getDataType(), props);
// Update tagColumnIndexMap: new name's tag value is old name's value
final Integer index = tagMap.remove(oldName);
if (index != null) {
tagMap.put(newName, index);
}
break;
case FIELD:
renamedSchema =
new FieldColumnSchema(
newName,
expectedSchema.getDataType(),
((FieldColumnSchema) expectedSchema).getEncoding(),
((FieldColumnSchema) expectedSchema).getCompressor(),
props);
break;
case ATTRIBUTE:
renamedSchema =
new AttributeColumnSchema(newName, expectedSchema.getDataType(), props);
break;
case TIME:
default:
throw new SchemaExecutionException("TIME column cannot be renamed");
}
return renamedSchema;
});
}

public void removeColumnSchema(final String columnName) {
executeWrite(
() -> {
final TsTableColumnSchema columnSchema = columnSchemaMap.get(columnName);
(colMap, tagMap) -> {
final TsTableColumnSchema columnSchema = colMap.get(columnName);
if (columnSchema != null
&& columnSchema.getColumnCategory().equals(TsTableColumnCategory.TAG)) {
throw new SchemaExecutionException("Cannot remove an tag column: " + columnName);
} else if (columnSchema != null) {
columnSchemaMap.remove(columnName);
colMap.remove(columnName);
if (columnSchema.getColumnCategory().equals(TsTableColumnCategory.FIELD)) {
fieldNum--;
}
Expand Down Expand Up @@ -354,7 +429,7 @@ public Optional<String> getPropValue(final String propKey) {

public void addProp(final String key, final String value) {
executeWrite(
() -> {
(colMap, tagMap) -> {
if (props == null) {
props = new HashMap<>();
}
Expand All @@ -364,7 +439,7 @@ public void addProp(final String key, final String value) {

public void removeProp(final String key) {
executeWrite(
() -> {
(colMap, tagMap) -> {
if (props == null) {
return;
}
Expand Down Expand Up @@ -410,7 +485,7 @@ public static TsTable deserialize(final ByteBuffer buffer) {
}

public void setProps(Map<String, String> props) {
executeWrite(() -> this.props = props);
executeWrite((colMap, tagMap) -> this.props = props);
}

public void checkTableNameAndObjectNames4Object() throws MetadataException {
Expand Down
Loading