Skip to content

Commit

Permalink
Potential fix for the incremental runs always run as a base backup (#63)
Browse files Browse the repository at this point in the history
* Potential fix for the incremental runs always run as a base backup

* param cleanups
  • Loading branch information
fuziontech authored May 26, 2024
1 parent c8962f5 commit c94e91f
Showing 1 changed file with 25 additions and 57 deletions.
82 changes: 25 additions & 57 deletions housewatch/clickhouse/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def execute_backup(
query_settings: Dict[str, str | int] = {},
query_id: Optional[str] = None,
substitute_params: bool = True,
cluster: Optional[str] = None,
cluster: Optional[str] = "default",
aws_key: Optional[str] = None,
aws_secret: Optional[str] = None,
base_backup: Optional[str] = None,
Expand Down Expand Up @@ -60,6 +60,7 @@ def execute_backup(
response.append(item)
responses.append((shard, response))
if not is_sharded:
# if the table is not sharded then just run the backup on one node
return response
return response

Expand All @@ -83,39 +84,19 @@ def get_backup(backup, cluster=None):


def create_table_backup(
database, table, bucket, path, cluster=None, aws_key=None, aws_secret=None, base_backup=None, is_sharded=False
database, table, bucket, path, aws_key=None, aws_secret=None, base_backup=None, is_sharded=False, cluster="default"
):
if aws_key is None or aws_secret is None:
aws_key = settings.AWS_ACCESS_KEY_ID
aws_secret = settings.AWS_SECRET_ACCESS_KEY

query_settings = {}
if cluster:
QUERY = """BACKUP TABLE %(database)s.%(table)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s/%(shard)s', '%(aws_key)s', '%(aws_secret)s')
ASYNC"""
return execute_backup(
QUERY,
{
"database": database,
"table": table,
"bucket": bucket,
"path": path,
"aws_key": aws_key,
"aws_secret": aws_secret,
},
query_settings=query_settings,
cluster=cluster,
aws_key=aws_key,
aws_secret=aws_secret,
base_backup=base_backup,
is_sharded=is_sharded,
)
QUERY = """BACKUP TABLE %(database)s.%(table)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s', '%(aws_key)s', '%(aws_secret)s')
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s/%(shard)s', '%(aws_key)s', '%(aws_secret)s')
ASYNC"""
if base_backup:
query_settings["base_backup"] = f"S3('{base_backup}', '{aws_key}', '{aws_secret}')"
return run_query(
return execute_backup(
QUERY,
{
"database": database,
Expand All @@ -126,41 +107,25 @@ def create_table_backup(
"aws_secret": aws_secret,
},
query_settings=query_settings,
use_cache=False,
cluster=cluster,
aws_key=aws_key,
aws_secret=aws_secret,
base_backup=base_backup,
is_sharded=is_sharded,
)


def create_database_backup(database, bucket, path, cluster=None, aws_key=None, aws_secret=None, base_backup=None):
def create_database_backup(database, bucket, path, aws_key=None, aws_secret=None, base_backup=None, cluster="default"):
if aws_key is None or aws_secret is None:
aws_key = settings.AWS_ACCESS_KEY_ID
aws_secret = settings.AWS_SECRET_ACCESS_KEY
query_settings = {}
if cluster:
QUERY = """BACKUP DATABASE %(database)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s/%(shard)s', '%(aws_key)s', '%(aws_secret)s')
ASYNC"""

return execute_backup(
QUERY,
{
"database": database,
"bucket": bucket,
"path": path,
"aws_key": aws_key,
"aws_secret": aws_secret,
},
query_settings=query_settings,
cluster=cluster,
aws_key=aws_key,
aws_secret=aws_secret,
base_backup=base_backup,
)
QUERY = """BACKUP DATABASE %(database)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s', '%(aws_key)s', '%(aws_secret)s')
QUERY = """BACKUP DATABASE %(database)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s/%(shard)s', '%(aws_key)s', '%(aws_secret)s')
ASYNC"""
if base_backup:
query_settings["base_backup"] = f"S3('{base_backup}', '{aws_key}', '{aws_secret}')"
return run_query(
return execute_backup(
QUERY,
{
"database": database,
Expand All @@ -170,7 +135,10 @@ def create_database_backup(database, bucket, path, cluster=None, aws_key=None, a
"aws_secret": aws_secret,
},
query_settings=query_settings,
use_cache=False,
cluster=cluster,
aws_key=aws_key,
aws_secret=aws_secret,
base_backup=base_backup,
)


Expand All @@ -191,9 +159,9 @@ def run_backup(backup_id, incremental=False):
backup.database,
backup.bucket,
path,
backup.cluster,
backup.aws_access_key_id,
backup.aws_secret_access_key,
aws_key=backup.aws_access_key_id,
aws_secret=backup.aws_secret_access_key,
cluster=backup.cluster,
base_backup=base_backup,
)
elif backup.is_table_backup():
Expand All @@ -202,9 +170,9 @@ def run_backup(backup_id, incremental=False):
backup.table,
backup.bucket,
path,
backup.cluster,
backup.aws_access_key_id,
backup.aws_secret_access_key,
aws_key=backup.aws_access_key_id,
aws_secret=backup.aws_secret_access_key,
cluster=backup.cluster,
base_backup=base_backup,
is_sharded=backup.is_sharded,
)
Expand Down

0 comments on commit c94e91f

Please sign in to comment.