Skip to content

Commit

Permalink
Connector refactor (#2763)
Browse files Browse the repository at this point in the history
* connector refactor initial commit

* moving tests

* added some todos

* some fixes

* fmt fix

* quick cleanup

* review changes

* connector plus driver merge

* ui fix

* ut fix

* small fix
  • Loading branch information
k-anshul committed Jul 17, 2023
1 parent 8719932 commit bbc104a
Show file tree
Hide file tree
Showing 68 changed files with 3,527 additions and 2,588 deletions.
9 changes: 5 additions & 4 deletions cli/cmd/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/rilldata/rill/cli/pkg/gitutil"
"github.com/rilldata/rill/cli/pkg/telemetry"
adminv1 "github.com/rilldata/rill/proto/gen/rill/admin/v1"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/compilers/rillv1beta"
"github.com/rilldata/rill/runtime/connectors"
"github.com/rilldata/rill/runtime/pkg/fileutil"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -482,7 +482,7 @@ func variablesFlow(ctx context.Context, projectPath, projectName string) {
}

// collect all sources
srcs := make([]*connectors.Source, 0)
srcs := make([]*runtimev1.Source, 0)
for _, c := range connectorList {
if !c.AnonymousAccess {
srcs = append(srcs, c.Sources...)
Expand All @@ -495,9 +495,10 @@ func variablesFlow(ctx context.Context, projectPath, projectName string) {
warn := color.New(color.Bold).Add(color.FgYellow)
warn.Printf("\nCould not ingest all sources. Rill requires credentials for the following sources:\n\n")
for _, src := range srcs {
if _, ok := src.Properties["path"]; ok {
props := src.Properties.AsMap()
if _, ok := props["path"]; ok {
// print URL wherever applicable
warn.Printf(" - %s\n", src.Properties["path"])
warn.Printf(" - %s\n", props["path"])
} else {
warn.Printf(" - %s\n", src.Name)
}
Expand Down
18 changes: 10 additions & 8 deletions cli/cmd/env/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/rilldata/rill/cli/pkg/gitutil"
"github.com/rilldata/rill/cli/pkg/telemetry"
adminv1 "github.com/rilldata/rill/proto/gen/rill/admin/v1"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/compilers/rillv1beta"
"github.com/rilldata/rill/runtime/connectors"
"github.com/rilldata/rill/runtime/pkg/fileutil"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -146,7 +146,7 @@ func VariablesFlow(ctx context.Context, projectPath string, tel *telemetry.Telem
}

// collect all sources
srcs := make([]*connectors.Source, 0)
srcs := make([]*runtimev1.Source, 0)
for _, c := range connectorList {
if !c.AnonymousAccess {
srcs = append(srcs, c.Sources...)
Expand All @@ -159,9 +159,10 @@ func VariablesFlow(ctx context.Context, projectPath string, tel *telemetry.Telem
tel.Emit(telemetry.ActionDataAccessStart)
fmt.Printf("Finish deploying your project by providing access to the data store. Rill does not have access to the following data sources:\n\n")
for _, src := range srcs {
if _, ok := src.Properties["path"]; ok {
props := src.Properties.AsMap()
if _, ok := props["path"]; ok {
// print URL wherever applicable
fmt.Printf(" - %s\n", src.Properties["path"])
fmt.Printf(" - %s\n", props["path"])
} else {
fmt.Printf(" - %s\n", src.Name)
}
Expand All @@ -173,7 +174,7 @@ func VariablesFlow(ctx context.Context, projectPath string, tel *telemetry.Telem
// ignore asking for credentials if external source can be access anonymously
continue
}
connectorVariables := c.Spec.ConnectorVariables
connectorVariables := c.Spec.ConfigProperties
if len(connectorVariables) != 0 {
fmt.Printf("\nConnector %q requires credentials.\n", c.Type)
if c.Spec.ServiceAccountDocs != "" {
Expand All @@ -184,11 +185,12 @@ func VariablesFlow(ctx context.Context, projectPath string, tel *telemetry.Telem
if c.Spec.Help != "" {
fmt.Println(c.Spec.Help)
}
for _, prop := range connectorVariables {
for i := range connectorVariables {
prop := connectorVariables[i]
question := &survey.Question{}
msg := fmt.Sprintf("connector.%s.%s", c.Name, prop.Key)
if prop.Help != "" {
msg = fmt.Sprintf(msg+" (%s)", prop.Help)
if prop.Hint != "" {
msg = fmt.Sprintf(msg+" (%s)", prop.Hint)
}

if prop.Secret {
Expand Down
7 changes: 3 additions & 4 deletions cli/cmd/runtime/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ import (
"golang.org/x/sync/errgroup"

// Load infra drivers and connectors for runtime
_ "github.com/rilldata/rill/runtime/connectors/gcs"
_ "github.com/rilldata/rill/runtime/connectors/https"
_ "github.com/rilldata/rill/runtime/connectors/motherduck"
_ "github.com/rilldata/rill/runtime/connectors/s3"
_ "github.com/rilldata/rill/runtime/drivers/druid"
_ "github.com/rilldata/rill/runtime/drivers/duckdb"
_ "github.com/rilldata/rill/runtime/drivers/file"
_ "github.com/rilldata/rill/runtime/drivers/gcs"
_ "github.com/rilldata/rill/runtime/drivers/github"
_ "github.com/rilldata/rill/runtime/drivers/https"
_ "github.com/rilldata/rill/runtime/drivers/postgres"
_ "github.com/rilldata/rill/runtime/drivers/s3"
_ "github.com/rilldata/rill/runtime/drivers/sqlite"
)

Expand Down
2 changes: 1 addition & 1 deletion runtime/caches.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (c *connectionCache) get(ctx context.Context, instanceID, driver, dsn strin
if instanceID != "default" {
logger = c.logger.With(zap.String("instance_id", instanceID), zap.String("driver", driver))
}
conn, err := drivers.Open(driver, dsn, logger)
conn, err := drivers.Open(driver, map[string]any{"dsn": dsn}, logger)
if err != nil {
return nil, err
}
Expand Down
76 changes: 44 additions & 32 deletions runtime/compilers/rillv1beta/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,31 @@ import (
"path/filepath"

"github.com/bmatcuk/doublestar/v4"
"github.com/rilldata/rill/runtime/connectors"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/fileutil"
"github.com/rilldata/rill/runtime/services/catalog/artifacts"
"github.com/rilldata/rill/runtime/services/catalog/migrator/models"
"github.com/rilldata/rill/runtime/services/catalog/migrator/sources"
"go.uber.org/zap"
)

// TODO :: return this to build support for all kind of variables
type Variables struct {
ProjectVariables []connectors.VariableSchema
ProjectVariables []drivers.PropertySchema
Connectors []*Connector
}

type Connector struct {
Name string
Type string
Sources []*connectors.Source
Spec connectors.Spec
Sources []*runtimev1.Source
Spec drivers.Spec
AnonymousAccess bool
}

func ExtractConnectors(ctx context.Context, projectPath string) ([]*Connector, error) {
allSources := make([]*connectors.Source, 0)
allSources := make([]*runtimev1.Source, 0)

// get sources from files
sourcesPath := filepath.Join(projectPath, "sources")
Expand Down Expand Up @@ -62,28 +63,27 @@ func ExtractConnectors(ctx context.Context, projectPath string) ([]*Connector, e
}

// keeping a map to dedup connectors
connectorMap := make(map[key][]*connectors.Source)
connectorMap := make(map[key][]*runtimev1.Source)
for _, src := range allSources {
connector, ok := connectors.Connectors[src.Connector]
connector, ok := drivers.Connectors[src.Connector]
if !ok {
return nil, fmt.Errorf("no source connector defined for type %q", src.Connector)
}

// ignoring error since failure to resolve this should not break the deployment flow
// this can fail under cases such as full or host/bucket of URI is a variable
access, _ := connector.HasAnonymousAccess(ctx, &connectors.Env{}, src)
access, _ := connector.HasAnonymousSourceAccess(ctx, source(src.Connector, src), zap.NewNop())
c := key{Name: src.Connector, Type: src.Connector, AnonymousAccess: access}
srcs, ok := connectorMap[c]
if !ok {
srcs = make([]*connectors.Source, 0)
srcs = make([]*runtimev1.Source, 0)
}
srcs = append(srcs, src)
connectorMap[c] = srcs
}

result := make([]*Connector, 0)
for k, v := range connectorMap {
connector := connectors.Connectors[k.Type]
connector := drivers.Connectors[k.Type]
result = append(result, &Connector{
Name: k.Name,
Type: k.Type,
Expand All @@ -95,25 +95,16 @@ func ExtractConnectors(ctx context.Context, projectPath string) ([]*Connector, e
return result, nil
}

func readSource(ctx context.Context, path string) (*connectors.Source, error) {
func readSource(ctx context.Context, path string) (*runtimev1.Source, error) {
catalog, err := read(ctx, path)
if err != nil {
return nil, err
}

apiSource := catalog.GetSource()
source := &connectors.Source{
Name: apiSource.Name,
Connector: apiSource.Connector,
Properties: apiSource.Properties.AsMap(),
ExtractPolicy: apiSource.GetPolicy(),
Timeout: apiSource.GetTimeoutSeconds(),
}

return source, nil
return catalog.GetSource(), nil
}

func readEmbeddedSources(ctx context.Context, path string) ([]*connectors.Source, error) {
func readEmbeddedSources(ctx context.Context, path string) ([]*runtimev1.Source, error) {
catalog, err := read(ctx, path)
if err != nil {
return nil, err
Expand All @@ -122,8 +113,8 @@ func readEmbeddedSources(ctx context.Context, path string) ([]*connectors.Source
apiModel := catalog.GetModel()
dependencies := models.ExtractTableNames(apiModel.Sql)

embeddedSourcesMap := make(map[string]*connectors.Source)
embeddedSources := make([]*connectors.Source, 0)
embeddedSourcesMap := make(map[string]*runtimev1.Source)
embeddedSources := make([]*runtimev1.Source, 0)

for _, dependency := range dependencies {
source, ok := sources.ParseEmbeddedSource(dependency)
Expand All @@ -134,13 +125,8 @@ func readEmbeddedSources(ctx context.Context, path string) ([]*connectors.Source
continue
}

connSource := &connectors.Source{
Name: source.Name,
Connector: source.Connector,
Properties: source.Properties.AsMap(),
}
embeddedSourcesMap[source.Name] = connSource
embeddedSources = append(embeddedSources, connSource)
embeddedSourcesMap[source.Name] = source
embeddedSources = append(embeddedSources, source)
}

return embeddedSources, nil
Expand Down Expand Up @@ -172,3 +158,29 @@ type key struct {
Type string
AnonymousAccess bool
}

func source(connector string, src *runtimev1.Source) drivers.Source {
props := src.Properties.AsMap()
switch connector {
case "s3":
return &drivers.BucketSource{
Properties: props,
}
case "gcs":
return &drivers.BucketSource{
Properties: props,
}
case "https":
return &drivers.FileSource{
Properties: props,
}
case "local_file":
return &drivers.FileSource{
Properties: props,
}
case "motherduck":
return &drivers.DatabaseSource{}
default:
return nil
}
}
4 changes: 2 additions & 2 deletions runtime/compilers/rillv1beta/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
"github.com/rilldata/rill/runtime/services/catalog/testutils"
"github.com/stretchr/testify/require"

_ "github.com/rilldata/rill/runtime/connectors/gcs"
_ "github.com/rilldata/rill/runtime/connectors/s3"
_ "github.com/rilldata/rill/runtime/drivers/duckdb"
_ "github.com/rilldata/rill/runtime/drivers/file"
_ "github.com/rilldata/rill/runtime/drivers/gcs"
_ "github.com/rilldata/rill/runtime/drivers/s3"
_ "github.com/rilldata/rill/runtime/drivers/sqlite"
)

Expand Down
10 changes: 5 additions & 5 deletions runtime/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func (r *Runtime) Registry() drivers.RegistryStore {
registry, ok := r.metastore.RegistryStore()
registry, ok := r.metastore.AsRegistry()
if !ok {
// Verified as registry in New, so this should never happen
panic("metastore is not a registry")
Expand All @@ -28,7 +28,7 @@ func (r *Runtime) Repo(ctx context.Context, instanceID string) (drivers.RepoStor
return nil, err
}

repo, ok := conn.RepoStore()
repo, ok := conn.AsRepoStore()
if !ok {
// Verified as repo when instance is created, so this should never happen
return nil, fmt.Errorf("connection for instance '%s' is not a repo", instanceID)
Expand All @@ -48,7 +48,7 @@ func (r *Runtime) OLAP(ctx context.Context, instanceID string) (drivers.OLAPStor
return nil, err
}

olap, ok := conn.OLAPStore()
olap, ok := conn.AsOLAP()
if !ok {
// Verified as OLAP when instance is created, so this should never happen
return nil, fmt.Errorf("connection for instance '%s' is not an olap", instanceID)
Expand All @@ -69,7 +69,7 @@ func (r *Runtime) Catalog(ctx context.Context, instanceID string) (drivers.Catal
return nil, err
}

store, ok := conn.CatalogStore()
store, ok := conn.AsCatalogStore()
if !ok {
// Verified as CatalogStore when instance is created, so this should never happen
return nil, fmt.Errorf("instance cannot embed catalog")
Expand All @@ -78,7 +78,7 @@ func (r *Runtime) Catalog(ctx context.Context, instanceID string) (drivers.Catal
return store, nil
}

store, ok := r.metastore.CatalogStore()
store, ok := r.metastore.AsCatalogStore()
if !ok {
return nil, fmt.Errorf("metastore cannot serve as catalog")
}
Expand Down
Loading

0 comments on commit bbc104a

Please sign in to comment.