Skip to content

Commit

Permalink
feat : cluster and direct SQL support for clickhouse models (#5511)
Browse files Browse the repository at this point in the history
* cluster and config support for clickhouse models

* changes for on_cluster property

* small self review

* small self review

* unit tests for clickhouse cluster

* unit tests for clickhouse cluster

* lint fix plus other ut changes

* revert config change
  • Loading branch information
k-anshul committed Aug 28, 2024
1 parent 23494f1 commit 1bd7671
Show file tree
Hide file tree
Showing 21 changed files with 1,378 additions and 340 deletions.
168 changes: 115 additions & 53 deletions go.mod

Large diffs are not rendered by default.

439 changes: 312 additions & 127 deletions go.sum

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions runtime/compilers/rillv1/parse_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ func (p *Parser) parseModel(node *Node) error {
inputProps["sql"] = sql
}

// Validate input details
if len(inputProps) == 0 {
return errors.New(`model does not identify any input properties (try entering a SQL query)`)
}
inputPropsPB, err := structpb.NewStruct(inputProps)
if err != nil {
return fmt.Errorf(`found invalid input property type: %w`, err)
Expand Down
9 changes: 9 additions & 0 deletions runtime/drivers/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ var spec = drivers.Spec{
DisplayName: "SSL",
Description: "Use SSL to connect to the ClickHouse server",
},
{
Key: "cluster",
Type: drivers.StringPropertyType,
Required: false,
DisplayName: "cluster",
Description: "Cluster name",
},
},
ImplementsOLAP: true,
}
Expand All @@ -91,6 +98,8 @@ type configProperties struct {
Port int `mapstructure:"port"`
// SSL determines whether secured connection need to be established. To be set when setting individual fields.
SSL bool `mapstructure:"ssl"`
// Cluster name. Required for running distributed queries.
Cluster string `mapstructure:"cluster"`
// EnableCache controls whether to enable cache for Clickhouse queries.
EnableCache bool `mapstructure:"enable_cache"`
// LogQueries controls whether to log the raw SQL passed to OLAP.Execute.
Expand Down
42 changes: 42 additions & 0 deletions runtime/drivers/clickhouse/information_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package clickhouse

import (
"context"
"database/sql"
"errors"

"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -165,3 +166,44 @@ func (i informationSchema) scanTables(rows *sqlx.Rows) ([]*drivers.Table, error)

return res, nil
}

func (i informationSchema) entityType(ctx context.Context, db, name string) (typ string, onCluster bool, err error) {
conn, release, err := i.c.acquireMetaConn(ctx)
if err != nil {
return "", false, err
}
defer func() { _ = release() }()

var q string
if i.c.config.Cluster == "" {
q = `SELECT
multiIf(engine IN ('MaterializedView', 'View'), 'VIEW', engine = 'Dictionary', 'DICTIONARY', 'TABLE') AS type,
0 AS is_on_cluster
FROM system.tables AS t
JOIN system.databases AS db ON t.database = db.name
WHERE t.database = coalesce(?, currentDatabase()) AND t.name = ?`
} else {
q = `SELECT
multiIf(engine IN ('MaterializedView', 'View'), 'VIEW', engine = 'Dictionary', 'DICTIONARY', 'TABLE') AS type,
countDistinct(_shard_num) > 1 AS is_on_cluster
FROM clusterAllReplicas(` + i.c.config.Cluster + `, system.tables) AS t
JOIN system.databases AS db ON t.database = db.name
WHERE t.database = coalesce(?, currentDatabase()) AND t.name = ?
GROUP BY engine, t.name`
}
var args []any
if db == "" {
args = []any{nil, name}
} else {
args = []any{db, name}
}
row := conn.QueryRowxContext(ctx, q, args...)
err = row.Scan(&typ, &onCluster)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return "", false, drivers.ErrNotFound
}
return "", false, err
}
return typ, onCluster, nil
}
4 changes: 2 additions & 2 deletions runtime/drivers/clickhouse/information_schema_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package clickhouse
package clickhouse_test

import (
"context"
Expand Down Expand Up @@ -37,7 +37,7 @@ func TestInformationSchema(t *testing.T) {
port, err := clickHouseContainer.MappedPort(ctx, "9000/tcp")
require.NoError(t, err)

conn, err := driver{}.Open("default", map[string]any{"dsn": fmt.Sprintf("clickhouse://clickhouse:clickhouse@%v:%v", host, port.Port())}, activity.NewNoopClient(), zap.NewNop())
conn, err := drivers.Open("clickhouse", "default", map[string]any{"dsn": fmt.Sprintf("clickhouse://clickhouse:clickhouse@%v:%v", host, port.Port())}, activity.NewNoopClient(), zap.NewNop())
require.NoError(t, err)
prepareConn(t, conn)
t.Run("testInformationSchemaAll", func(t *testing.T) { testInformationSchemaAll(t, conn) })
Expand Down
42 changes: 22 additions & 20 deletions runtime/drivers/clickhouse/model_executor_self.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ func (e *selfToSelfExecutor) Concurrency(desired int) (int, bool) {
}

func (e *selfToSelfExecutor) Execute(ctx context.Context, opts *drivers.ModelExecuteOptions) (*drivers.ModelResult, error) {
olap, ok := e.c.AsOLAP(e.c.instanceID)
if !ok {
return nil, fmt.Errorf("output connector is not OLAP")
}

inputProps := &ModelInputProperties{}
if err := mapstructure.WeakDecode(opts.InputProperties, inputProps); err != nil {
return nil, fmt.Errorf("failed to parse input properties: %w", err)
Expand All @@ -39,25 +34,23 @@ func (e *selfToSelfExecutor) Execute(ctx context.Context, opts *drivers.ModelExe
if err := mapstructure.WeakDecode(opts.OutputProperties, outputProps); err != nil {
return nil, fmt.Errorf("failed to parse output properties: %w", err)
}
if outputProps.Typ == "" && outputProps.Materialize == nil {
outputProps.Materialize = &opts.Env.DefaultMaterialize
}
if err := outputProps.Validate(opts); err != nil {
return nil, fmt.Errorf("invalid output properties: %w", err)
}
if outputProps.Typ != "DICTIONARY" && inputProps.SQL == "" {
return nil, fmt.Errorf("input SQL is required")
}

usedModelName := false
if outputProps.Table == "" {
outputProps.Table = opts.ModelName
usedModelName = true
}

materialize := opts.Env.DefaultMaterialize
if outputProps.Materialize != nil {
materialize = *outputProps.Materialize
}
if opts.IncrementalRun && !materialize {
return nil, fmt.Errorf("incremental models are only supported for materialized models")
}

asView := !materialize
asView := outputProps.Typ == "VIEW"
tableName := outputProps.Table
if outputProps.QuerySettings != "" {
// Note: This will lead to failures if user sets settings both in query and output properties
Expand All @@ -72,27 +65,27 @@ func (e *selfToSelfExecutor) Execute(ctx context.Context, opts *drivers.ModelExe

// Drop the staging view/table if it exists.
// NOTE: This intentionally drops the end table if not staging changes.
if t, err := olap.InformationSchema().Lookup(ctx, "", "", stagingTableName); err == nil {
_ = olap.DropTable(ctx, stagingTableName, t.View)
if t, err := e.c.InformationSchema().Lookup(ctx, "", "", stagingTableName); err == nil {
_ = e.c.DropTable(ctx, stagingTableName, t.View)
}

// Create the table
err := olap.CreateTableAsSelect(ctx, stagingTableName, asView, inputProps.SQL, opts.OutputProperties)
err := e.c.CreateTableAsSelect(ctx, stagingTableName, asView, inputProps.SQL, mustToMap(outputProps))
if err != nil {
_ = olap.DropTable(ctx, stagingTableName, asView)
_ = e.c.DropTable(ctx, stagingTableName, asView)
return nil, fmt.Errorf("failed to create model: %w", err)
}

// Rename the staging table to the final table name
if stagingTableName != tableName {
err = olapForceRenameTable(ctx, olap, stagingTableName, asView, tableName)
err = olapForceRenameTable(ctx, e.c, stagingTableName, asView, tableName)
if err != nil {
return nil, fmt.Errorf("failed to rename staged model: %w", err)
}
}
} else {
// Insert into the table
err := olap.InsertTableAsSelect(ctx, tableName, inputProps.SQL, false, true, outputProps.IncrementalStrategy, outputProps.UniqueKey)
err := e.c.InsertTableAsSelect(ctx, tableName, inputProps.SQL, false, true, outputProps.IncrementalStrategy, outputProps.UniqueKey)
if err != nil {
return nil, fmt.Errorf("failed to incrementally insert into table: %w", err)
}
Expand All @@ -117,3 +110,12 @@ func (e *selfToSelfExecutor) Execute(ctx context.Context, opts *drivers.ModelExe
Table: tableName,
}, nil
}

func mustToMap(o *ModelOutputProperties) map[string]any {
m := make(map[string]any)
err := mapstructure.WeakDecode(o, &m)
if err != nil {
panic(fmt.Errorf("failed to encode output properties: %w", err))
}
return m
}
112 changes: 91 additions & 21 deletions runtime/drivers/clickhouse/model_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ type ModelInputProperties struct {
}

func (p *ModelInputProperties) Validate() error {
if p.SQL == "" {
return fmt.Errorf("missing property 'sql'")
}
return nil
}

Expand All @@ -27,10 +24,20 @@ type ModelOutputProperties struct {
Materialize *bool `mapstructure:"materialize"`
UniqueKey []string `mapstructure:"unique_key"`
IncrementalStrategy drivers.IncrementalStrategy `mapstructure:"incremental_strategy"`
// Typ to materialize the model into. Possible values include `TABLE`, `VIEW` or `DICTIONARY`. Optional.
Typ string `mapstructure:"type"`
// Columns sets the column names and data types. If unspecified these are detected from the select query by clickhouse.
// It is also possible to set indexes with this property.
// Example : (id UInt32, username varchar, email varchar, created_at datetime, INDEX idx1 username TYPE set(100) GRANULARITY 3)
Columns string `mapstructure:"columns"`
// Config can be used to set the table parameters like engine, partition key in SQL format without setting individual properties.
// It also allows creating dictionaries using a source.
// Example:
// ENGINE = MergeTree
// PARTITION BY toYYYYMM(__time)
// ORDER BY __time
// TTL d + INTERVAL 1 MONTH DELETE
Config string `mapstructure:"config"`
// Engine sets the table engine. Default: MergeTree
Engine string `mapstructure:"engine"`
// OrderBy sets the order by clause. Default: tuple() for MergeTree and not set for other engines
Expand All @@ -47,21 +54,41 @@ type ModelOutputProperties struct {
TableSettings string `mapstructure:"table_settings"`
// QuerySettings sets the settings clause used in insert/create table as select queries.
QuerySettings string `mapstructure:"query_settings"`
// DistributedSettings is table settings for distributed table.
DistributedSettings string `mapstructure:"distributed.settings"`
// DistributedShardingKey is the sharding key for distributed table.
DistributedShardingKey string `mapstructure:"distributed.sharding_key"`
}

func (p *ModelOutputProperties) Validate(opts *drivers.ModelExecuteOptions) error {
if p.Config != "" {
if p.Engine != "" || p.OrderBy != "" || p.PartitionBy != "" || p.PrimaryKey != "" || p.SampleBy != "" || p.TTL != "" || p.TableSettings != "" {
return fmt.Errorf("`config` property cannot be used with individual properties")
}
}
p.Typ = strings.ToUpper(p.Typ)
if p.Typ != "" && p.Materialize != nil {
return fmt.Errorf("cannot set both `type` and `materialize` properties")
}
if p.Materialize != nil {
if *p.Materialize {
p.Typ = "TABLE"
} else {
p.Typ = "VIEW"
}
}
if opts.Incremental || opts.SplitRun {
if p.Materialize != nil && !*p.Materialize {
if p.Typ != "" && p.Typ != "TABLE" {
return fmt.Errorf("incremental or split models must be materialized")
}
p.Materialize = boolPtr(true)
p.Typ = "TABLE"
}
if p.Typ == "" {
p.Typ = "VIEW"
}

if opts.InputConnector != opts.OutputConnector {
if p.Materialize != nil && !*p.Materialize {
return fmt.Errorf("models that output to a different connector must be materialized")
}
p.Materialize = boolPtr(true)
if p.Typ == "DICTIONARY" && p.Columns == "" {
return fmt.Errorf("model materialized as dictionary must specify columns")
}

switch p.IncrementalStrategy {
Expand All @@ -76,18 +103,61 @@ func (p *ModelOutputProperties) Validate(opts *drivers.ModelExecuteOptions) erro
return nil
}

func (p *ModelOutputProperties) tblConfig() string {
if p.Config != "" {
return p.Config
}
var sb strings.Builder
// engine with default
if p.Engine != "" {
fmt.Fprintf(&sb, "ENGINE = %s", p.Engine)
} else {
fmt.Fprintf(&sb, "ENGINE = MergeTree")
}

// order_by
if p.OrderBy != "" {
fmt.Fprintf(&sb, " ORDER BY %s", p.OrderBy)
} else if p.Engine == "MergeTree" {
// need ORDER BY for MergeTree
// it is optional for many other engines
fmt.Fprintf(&sb, " ORDER BY tuple()")
}

// partition_by
if p.PartitionBy != "" {
fmt.Fprintf(&sb, " PARTITION BY %s", p.PartitionBy)
}

// primary_key
if p.PrimaryKey != "" {
fmt.Fprintf(&sb, " PRIMARY KEY %s", p.PrimaryKey)
}

// sample_by
if p.SampleBy != "" {
fmt.Fprintf(&sb, " SAMPLE BY %s", p.SampleBy)
}

// ttl
if p.TTL != "" {
fmt.Fprintf(&sb, " TTL %s", p.TTL)
}

// settings
if p.TableSettings != "" {
fmt.Fprintf(&sb, " %s", p.TableSettings)
}
return sb.String()
}

type ModelResultProperties struct {
Table string `mapstructure:"table"`
View bool `mapstructure:"view"`
UsedModelName bool `mapstructure:"used_model_name"`
}

func (c *connection) Rename(ctx context.Context, res *drivers.ModelResult, newName string, env *drivers.ModelEnv) (*drivers.ModelResult, error) {
olap, ok := c.AsOLAP(c.instanceID)
if !ok {
return nil, fmt.Errorf("connector is not an OLAP")
}

resProps := &ModelResultProperties{}
if err := mapstructure.WeakDecode(res.Properties, resProps); err != nil {
return nil, fmt.Errorf("failed to parse previous result properties: %w", err)
Expand All @@ -97,7 +167,7 @@ func (c *connection) Rename(ctx context.Context, res *drivers.ModelResult, newNa
return res, nil
}

err := olapForceRenameTable(ctx, olap, resProps.Table, resProps.View, newName)
err := olapForceRenameTable(ctx, c, resProps.Table, resProps.View, newName)
if err != nil {
return nil, fmt.Errorf("failed to rename model: %w", err)
}
Expand Down Expand Up @@ -134,15 +204,15 @@ func (c *connection) Delete(ctx context.Context, res *drivers.ModelResult) error

stagingTable, err := olap.InformationSchema().Lookup(ctx, "", "", stagingTableNameFor(res.Table))
if err == nil {
_ = olap.DropTable(ctx, stagingTable.Name, stagingTable.View)
_ = c.DropTable(ctx, stagingTable.Name, stagingTable.View)
}

table, err := olap.InformationSchema().Lookup(ctx, "", "", res.Table)
if err != nil {
return err
}

return olap.DropTable(ctx, table.Name, table.View)
return c.DropTable(ctx, table.Name, table.View)
}

func (c *connection) MergeSplitResults(a, b *drivers.ModelResult) (*drivers.ModelResult, error) {
Expand All @@ -160,7 +230,7 @@ func stagingTableNameFor(table string) string {

// olapForceRenameTable renames a table or view from fromName to toName in the OLAP connector.
// If a view or table already exists with toName, it is overwritten.
func olapForceRenameTable(ctx context.Context, olap drivers.OLAPStore, fromName string, fromIsView bool, toName string) error {
func olapForceRenameTable(ctx context.Context, c *connection, fromName string, fromIsView bool, toName string) error {
if fromName == "" || toName == "" {
return fmt.Errorf("cannot rename empty table name: fromName=%q, toName=%q", fromName, toName)
}
Expand All @@ -180,15 +250,15 @@ func olapForceRenameTable(ctx context.Context, olap drivers.OLAPStore, fromName
// Renaming a table to the same name with different casing is not supported. Workaround by renaming to a temporary name first.
if strings.EqualFold(fromName, toName) {
tmpName := fmt.Sprintf("__rill_tmp_rename_%s_%s", typ, toName)
err := olap.RenameTable(ctx, fromName, tmpName, fromIsView)
err := c.RenameTable(ctx, fromName, tmpName, fromIsView)
if err != nil {
return err
}
fromName = tmpName
}

// Do the rename
return olap.RenameTable(ctx, fromName, toName, fromIsView)
return c.RenameTable(ctx, fromName, toName, fromIsView)
}

func boolPtr(b bool) *bool {
Expand Down
Loading

0 comments on commit 1bd7671

Please sign in to comment.