Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Fix rename partitioned table is not atomic (#9133) #9219

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ namespace DB
M(force_set_mocked_s3_object_mtime) \
M(force_stop_background_checkpoint_upload) \
M(skip_seek_before_read_dmfile) \
M(force_schema_sync_diff_fail) \
M(exception_after_large_write_exceed) \
M(proactive_flush_force_set_type) \
M(exception_when_fetch_disagg_pages) \
Expand Down Expand Up @@ -159,6 +160,7 @@ namespace DB
M(random_pipeline_model_cancel_failpoint) \
M(random_pipeline_model_execute_prefix_failpoint) \
M(random_pipeline_model_execute_suffix_failpoint) \
M(random_ddl_fail_when_rename_partitions) \
M(random_spill_to_disk_failpoint) \
M(random_region_persister_latency_failpoint) \
M(random_restore_from_disk_failpoint) \
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Databases/DatabasesCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ class DatabaseSnapshotIterator : public IDatabaseIterator
Tables::iterator it;

public:
DatabaseSnapshotIterator(Tables & tables_)
explicit DatabaseSnapshotIterator(Tables & tables_)
: tables(tables_)
, it(tables.begin())
{}

DatabaseSnapshotIterator(Tables && tables_)
explicit DatabaseSnapshotIterator(Tables && tables_)
: tables(tables_)
, it(tables.begin())
{}
Expand Down Expand Up @@ -127,15 +127,15 @@ class DatabaseWithOwnTablesBase : public IDatabase

void shutdown() override;

virtual ~DatabaseWithOwnTablesBase() override;
~DatabaseWithOwnTablesBase() override;

protected:
String name;

mutable std::mutex mutex;
Tables tables;

DatabaseWithOwnTablesBase(String name_)
explicit DatabaseWithOwnTablesBase(String name_)
: name(std::move(name_))
{}
};
Expand Down
117 changes: 99 additions & 18 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Table::Table(
, database_name(database_name_)
, database_id(database_id_)
, table_name(table_name_)
, col_id(table_info_.columns.size())
, col_id(table_info.columns.size())
{}

MockTiDB::MockTiDB()
Expand Down Expand Up @@ -141,7 +141,7 @@ TablePtr MockTiDB::dropTableInternal(Context & context, const TablePtr & table,

void MockTiDB::dropDB(Context & context, const String & database_name, bool drop_regions)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);

std::vector<String> table_names;
std::for_each(tables_by_id.begin(), tables_by_id.end(), [&](const auto & pair) {
Expand All @@ -168,7 +168,7 @@ void MockTiDB::dropDB(Context & context, const String & database_name, bool drop

void MockTiDB::dropTable(Context & context, const String & database_name, const String & table_name, bool drop_regions)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);

auto table = dropTableByNameImpl(context, database_name, table_name, drop_regions);
if (!table)
Expand All @@ -186,7 +186,7 @@ void MockTiDB::dropTable(Context & context, const String & database_name, const

void MockTiDB::dropTableById(Context & context, const TableID & table_id, bool drop_regions)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);

auto table = dropTableByIdImpl(context, table_id, drop_regions);
if (!table)
Expand Down Expand Up @@ -319,7 +319,7 @@ TableID MockTiDB::newTable(
const String & handle_pk_name,
const String & engine_type)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);

String qualified_name = database_name + "." + table_name;
if (tables_by_name.find(qualified_name) != tables_by_name.end())
Expand All @@ -338,13 +338,55 @@ TableID MockTiDB::newTable(
return addTable(database_name, std::move(*table_info));
}

std::tuple<TableID, std::vector<TableID>> MockTiDB::newPartitionTable(
const String & database_name,
const String & table_name,
const ColumnsDescription & columns,
Timestamp tso,
const String & handle_pk_name,
const String & engine_type,
const Strings & part_names)
{
std::scoped_lock lock(tables_mutex);

String qualified_name = database_name + "." + table_name;
if (tables_by_name.find(qualified_name) != tables_by_name.end())
{
throw Exception("Mock TiDB table " + qualified_name + " already exists", ErrorCodes::TABLE_ALREADY_EXISTS);
}

if (databases.find(database_name) == databases.end())
{
throw Exception("MockTiDB not found db: " + database_name, ErrorCodes::LOGICAL_ERROR);
}

std::vector<TableID> physical_table_ids;
physical_table_ids.reserve(part_names.size());
auto table_info = parseColumns(table_name, columns, handle_pk_name, engine_type);
table_info->id = table_id_allocator++;
table_info->is_partition_table = true;
table_info->partition.enable = true;
for (const auto & part_name : part_names)
{
PartitionDefinition part_def;
part_def.id = table_id_allocator++;
part_def.name = part_name;
table_info->partition.definitions.emplace_back(part_def);
++table_info->partition.num;
physical_table_ids.emplace_back(part_def.id);
}
table_info->update_timestamp = tso;
auto logical_table_id = addTable(database_name, std::move(*table_info));
return {logical_table_id, physical_table_ids};
}

std::vector<TableID> MockTiDB::newTables(
const String & database_name,
const std::vector<std::tuple<String, ColumnsDescription, String>> & tables,
Timestamp tso,
const String & engine_type)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);
std::vector<TableID> table_ids;
table_ids.reserve(tables.size());
if (databases.find(database_name) == databases.end())
Expand Down Expand Up @@ -431,7 +473,7 @@ TableID MockTiDB::newPartition(
Timestamp tso,
bool is_add_part)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);

TablePtr logical_table = getTableByID(belong_logical_table);
TableID partition_id = table_id_allocator++; // allocate automatically
Expand All @@ -446,7 +488,7 @@ TableID MockTiDB::newPartition(
Timestamp tso,
bool is_add_part)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);

TablePtr logical_table = getTableByNameInternal(database_name, table_name);
return newPartitionImpl(logical_table, partition_id, toString(partition_id), tso, is_add_part);
Expand Down Expand Up @@ -493,7 +535,7 @@ TableID MockTiDB::newPartitionImpl(

void MockTiDB::dropPartition(const String & database_name, const String & table_name, TableID partition_id)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);

TablePtr table = getTableByNameInternal(database_name, table_name);
TableInfo & table_info = table->table_info;
Expand Down Expand Up @@ -527,7 +569,7 @@ void MockTiDB::addColumnToTable(
const NameAndTypePair & column,
const Field & default_value)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);

TablePtr table = getTableByNameInternal(database_name, table_name);
String qualified_name = database_name + "." + table_name;
Expand Down Expand Up @@ -556,7 +598,7 @@ void MockTiDB::addColumnToTable(

void MockTiDB::dropColumnFromTable(const String & database_name, const String & table_name, const String & column_name)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);

TablePtr table = getTableByNameInternal(database_name, table_name);
String qualified_name = database_name + "." + table_name;
Expand Down Expand Up @@ -586,7 +628,7 @@ void MockTiDB::modifyColumnInTable(
const String & table_name,
const NameAndTypePair & column)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);

TablePtr table = getTableByNameInternal(database_name, table_name);
String qualified_name = database_name + "." + table_name;
Expand Down Expand Up @@ -623,7 +665,7 @@ void MockTiDB::renameColumnInTable(
const String & old_column_name,
const String & new_column_name)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);

TablePtr table = getTableByNameInternal(database_name, table_name);
String qualified_name = database_name + "." + table_name;
Expand Down Expand Up @@ -656,7 +698,7 @@ void MockTiDB::renameColumnInTable(

void MockTiDB::renameTable(const String & database_name, const String & table_name, const String & new_table_name)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);

TablePtr table = getTableByNameInternal(database_name, table_name);
String qualified_name = database_name + "." + table_name;
Expand All @@ -681,9 +723,48 @@ void MockTiDB::renameTable(const String & database_name, const String & table_na
version_diff[version] = diff;
}

void MockTiDB::renameTableTo(
const String & database_name,
const String & table_name,
const String & new_database_name,
const String & new_table_name)
{
std::scoped_lock lock(tables_mutex);

TablePtr table = getTableByNameInternal(database_name, table_name);
String qualified_name = database_name + "." + table_name;
String new_qualified_name = new_database_name + "." + new_table_name;

const auto old_database_id = table->database_id;
const auto new_database = databases.find(new_database_name);
RUNTIME_CHECK_MSG(
new_database != databases.end(),
"new_database is not exist in MockTiDB, new_database_name={}",
new_database_name);
table->database_id = new_database->second; // set new_database_id

TableInfo new_table_info = table->table_info;
new_table_info.name = new_table_name;
auto new_table
= std::make_shared<Table>(new_database_name, table->database_id, new_table_name, std::move(new_table_info));

tables_by_id[new_table->table_info.id] = new_table;
tables_by_name.erase(qualified_name);
tables_by_name.emplace(new_qualified_name, new_table);

version++;
SchemaDiff diff;
diff.type = SchemaActionType::RenameTable;
diff.schema_id = table->database_id;
diff.old_schema_id = old_database_id;
diff.table_id = table->id();
diff.version = version;
version_diff[version] = diff;
}

void MockTiDB::renameTables(const std::vector<std::tuple<std::string, std::string, std::string>> & table_name_map)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);
version++;
SchemaDiff diff;
for (const auto & [database_name, table_name, new_table_name] : table_name_map)
Expand Down Expand Up @@ -723,7 +804,7 @@ void MockTiDB::renameTables(const std::vector<std::tuple<std::string, std::strin

void MockTiDB::truncateTable(const String & database_name, const String & table_name)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);

TablePtr table = getTableByNameInternal(database_name, table_name);

Expand All @@ -745,7 +826,7 @@ void MockTiDB::truncateTable(const String & database_name, const String & table_

Int64 MockTiDB::regenerateSchemaMap()
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);

SchemaDiff diff;
diff.type = SchemaActionType::None;
Expand All @@ -758,7 +839,7 @@ Int64 MockTiDB::regenerateSchemaMap()

TablePtr MockTiDB::getTableByName(const String & database_name, const String & table_name)
{
std::lock_guard lock(tables_mutex);
std::scoped_lock lock(tables_mutex);

return getTableByNameInternal(database_name, table_name);
}
Expand Down
17 changes: 17 additions & 0 deletions dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ class MockTiDB : public ext::Singleton<MockTiDB>
const String & handle_pk_name,
const String & engine_type);

// Mock to create a partition table with given partition names
// Return <logical_table_id, [physical_table_id0, physical_table_id1, ...]>
std::tuple<TableID, std::vector<TableID>> newPartitionTable(
const String & database_name,
const String & table_name,
const ColumnsDescription & columns,
Timestamp tso,
const String & handle_pk_name,
const String & engine_type,
const Strings & part_names);

std::vector<TableID> newTables(
const String & database_name,
const std::vector<std::tuple<String, ColumnsDescription, String>> & tables,
Expand Down Expand Up @@ -135,6 +146,12 @@ class MockTiDB : public ext::Singleton<MockTiDB>
const String & new_column_name);

void renameTable(const String & database_name, const String & table_name, const String & new_table_name);
// Rename table to another database
void renameTableTo(
const String & database_name,
const String & table_name,
const String & new_database_name,
const String & new_table_name);

void renameTables(const std::vector<std::tuple<std::string, std::string, std::string>> & table_name_map);

Expand Down
Loading