Skip to content

Commit

Permalink
Handling UInt8, UInt16
Browse files Browse the repository at this point in the history
  • Loading branch information
bakwc committed Oct 31, 2024
1 parent 6c309d5 commit b7e7a6c
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 4 deletions.
22 changes: 18 additions & 4 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ class MysqlToClickhouseConverter:
def __init__(self, db_replicator: 'DbReplicator' = None):
self.db_replicator = db_replicator

def convert_type(self, mysql_type):
def convert_type(self, mysql_type, parameters):

is_unsigned = 'unsigned' in parameters.lower()

print(" === check mysql_type", mysql_type, parameters)

if mysql_type == 'int':
return 'Int32'
if mysql_type == 'integer':
Expand All @@ -82,10 +87,14 @@ def convert_type(self, mysql_type):
return 'Bool'
if mysql_type == 'bool':
return 'Bool'
if mysql_type == 'smallint':
if 'smallint' in mysql_type:
if is_unsigned:
return 'UInt16'
return 'Int16'
if 'tinyint' in mysql_type:
return 'Int16'
if is_unsigned:
return 'UInt8'
return 'Int8'
if 'datetime' in mysql_type:
return mysql_type.replace('datetime', 'DateTime64')
if 'longtext' in mysql_type:
Expand Down Expand Up @@ -120,7 +129,8 @@ def convert_field_type(self, mysql_type, mysql_parameters):
mysql_type = mysql_type.lower()
mysql_parameters = mysql_parameters.lower()
not_null = 'not null' in mysql_parameters
clickhouse_type = self.convert_type(mysql_type)
clickhouse_type = self.convert_type(mysql_type, mysql_parameters)
print(" === result type:", clickhouse_type)
if not not_null:
clickhouse_type = f'Nullable({clickhouse_type})'
return clickhouse_type
Expand Down Expand Up @@ -159,6 +169,10 @@ def convert_record(self, mysql_record, mysql_field_types, clickhouse_field_types
if mysql_field_type == 'json' and 'String' in clickhouse_field_type:
if not isinstance(clickhouse_field_value, str):
clickhouse_field_value = json.dumps(convert_bytes(clickhouse_field_value))
if 'UInt16' in clickhouse_field_type and clickhouse_field_value < 0:
clickhouse_field_value = 65536 + clickhouse_field_value
if 'UInt8' in clickhouse_field_type and clickhouse_field_value < 0:
clickhouse_field_value = 256 + clickhouse_field_value
clickhouse_record.append(clickhouse_field_value)
return tuple(clickhouse_record)

Expand Down
57 changes: 57 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,60 @@ def test_different_types_1():
commit=True,
)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 3)


def test_numeric_types_and_limits():
cfg = config.Settings()
cfg.load(CONFIG_FILE)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

mysql.execute("SET sql_mode = 'ALLOW_INVALID_DATES';")

mysql.execute(f'''
CREATE TABLE {TEST_TABLE_NAME} (
`id` int unsigned NOT NULL AUTO_INCREMENT,
name varchar(255),
test1 smallint,
test2 smallint unsigned,
test3 TINYINT,
test4 TINYINT UNSIGNED,
PRIMARY KEY (id)
);
''')

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (name, test1, test2, test3, test4) VALUES ('Ivan', -20000, 50000, -30, 100);",
commit=True,
)

binlog_replicator_runner = BinlogReplicatorRunner()
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
db_replicator_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())

ch.execute_command(f'USE {TEST_DB_NAME}')

assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (name, test1, test2, test3, test4) VALUES ('Peter', -10000, 60000, -120, 250);",
commit=True,
)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, 'test2=60000')) == 1)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME, 'test4=250')) == 1)

0 comments on commit b7e7a6c

Please sign in to comment.