Skip to content

Commit

Permalink
DATE formatting fix for query results (#2679)
Browse files Browse the repository at this point in the history
* date formatting fix for query results

* Map Druid DATE type correctly

* Move conversion to pbutil

* Handle DATE type

* Handle other toplist APIs

* Fix bug

* Fix schema temp column bug

---------

Co-authored-by: Benjamin Egelund-Müller <[email protected]>
  • Loading branch information
k-anshul and begelundmuller committed Jun 26, 2023
1 parent 1c88307 commit 425f14d
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 49 deletions.
2 changes: 1 addition & 1 deletion runtime/drivers/druid/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func databaseTypeToPB(dbt string, nullable bool) (*runtimev1.Type, error) {
case "TIMESTAMP":
t.Code = runtimev1.Type_CODE_TIMESTAMP
case "DATE":
t.Code = runtimev1.Type_CODE_TIMESTAMP
t.Code = runtimev1.Type_CODE_DATE
case "OTHER":
t.Code = runtimev1.Type_CODE_JSON
}
Expand Down
81 changes: 55 additions & 26 deletions runtime/pkg/pbutil/pbutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,31 @@ import (
"unicode/utf8"

"github.com/marcboeker/go-duckdb"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"google.golang.org/protobuf/types/known/structpb"
)

// ToValue converts any value to a google.protobuf.Value. It's similar to
// structpb.NewValue, but adds support for a few extra primitive types.
func ToValue(v any) (*structpb.Value, error) {
func ToValue(v any, t *runtimev1.Type) (*structpb.Value, error) {
switch v := v.(type) {
// In addition to the extra supported types, we also override handling for
// maps and lists since we need to use valToPB on nested fields.
case map[string]any:
v2, err := ToStruct(v)
var t2 *runtimev1.StructType
if t != nil {
t2 = t.StructType
}
v2, err := ToStruct(v, t2)
if err != nil {
return nil, err
}

return structpb.NewStructValue(v2), nil
case []any:
v2, err := ToListValue(v)
v2, err := ToListValue(v, t)
if err != nil {
return nil, err
}

return structpb.NewListValue(v2), nil
// Handle types not handled by structpb.NewValue
case int8:
Expand All @@ -41,6 +44,10 @@ func ToValue(v any) (*structpb.Value, error) {
case uint16:
return structpb.NewNumberValue(float64(v)), nil
case time.Time:
if t != nil && t.Code == runtimev1.Type_CODE_DATE {
s := v.Format(time.DateOnly)
return structpb.NewStringValue(s), nil
}
s := v.Format(time.RFC3339Nano)
return structpb.NewStringValue(s), nil
case float32:
Expand Down Expand Up @@ -71,21 +78,23 @@ func ToValue(v any) (*structpb.Value, error) {
v2, _ := new(big.Rat).SetFrac(v.Value, denom).Float64()
return structpb.NewNumberValue(v2), nil
case duckdb.Map:
return ToValue(map[any]any(v))
return ToValue(map[any]any(v), t)
case map[any]any:
v2, err := ToStructCoerceKeys(v)
var t2 *runtimev1.MapType
if t != nil {
t2 = t.MapType
}
v2, err := ToStructCoerceKeys(v, t2)
if err != nil {
return nil, err
}

return structpb.NewStructValue(v2), nil
case duckdb.Interval:
m := map[string]any{"months": v.Months, "days": v.Days, "micros": v.Micros}
v2, err := ToStruct(m)
v2, err := ToStruct(m, nil)
if err != nil {
return nil, err
}

return structpb.NewStructValue(v2), nil
default:
// Default handling for basic types (ints, string, etc.)
Expand All @@ -95,32 +104,48 @@ func ToValue(v any) (*structpb.Value, error) {

// ToStruct converts a map to a google.protobuf.Struct. It's similar to
// structpb.NewStruct(), but it recurses on valToPB instead of structpb.NewValue
// to add support for more types.
func ToStruct(v map[string]any) (*structpb.Struct, error) {
// to add support for more types. Providing t as a type hint is optional.
func ToStruct(v map[string]any, t *runtimev1.StructType) (*structpb.Struct, error) {
x := &structpb.Struct{Fields: make(map[string]*structpb.Value, len(v))}
for k, v := range v {
if !utf8.ValidString(k) {
return nil, fmt.Errorf("invalid UTF-8 in string: %q", k)
if t == nil {
for k, v := range v {
if !utf8.ValidString(k) {
return nil, fmt.Errorf("invalid UTF-8 in string: %q", k)
}
var err error
x.Fields[k], err = ToValue(v, nil)
if err != nil {
return nil, err
}
}
var err error
x.Fields[k], err = ToValue(v)
if err != nil {
return nil, err
} else {
for _, f := range t.Fields {
var err error
x.Fields[f.Name], err = ToValue(v[f.Name], f.Type)
if err != nil {
return nil, err
}
}
}
return x, nil
}

// ToStructCoerceKeys converts a map with non-string keys to a google.protobuf.Struct.
// It attempts to coerce the keys to JSON strings.
func ToStructCoerceKeys(v map[any]any) (*structpb.Struct, error) {
// It attempts to coerce the keys to JSON strings. Providing t as a type hint is optional.
func ToStructCoerceKeys(v map[any]any, t *runtimev1.MapType) (*structpb.Struct, error) {
var keyType, valType *runtimev1.Type
if t != nil {
keyType = t.KeyType
valType = t.ValueType
}

x := &structpb.Struct{Fields: make(map[string]*structpb.Value, len(v))}
for k1, v := range v {
k2, ok := k1.(string)
if !ok {
// Encode k1 using ToValue (to correctly coerce time, big numbers, etc.) and then to JSON.
// This yields more idiomatic/consistent strings than using fmt.Sprintf("%v", k1).
val, err := ToValue(k1)
val, err := ToValue(k1, keyType)
if err != nil {
return nil, err
}
Expand All @@ -135,7 +160,7 @@ func ToStructCoerceKeys(v map[any]any) (*structpb.Struct, error) {
}

var err error
x.Fields[k2], err = ToValue(v)
x.Fields[k2], err = ToValue(v, valType)
if err != nil {
return nil, err
}
Expand All @@ -156,12 +181,16 @@ func trimQuotes(s string) string {

// ToListValue converts a map to a google.protobuf.List. It's similar to
// structpb.NewList(), but it recurses on valToPB instead of structpb.NewList
// to add support for more types.
func ToListValue(v []interface{}) (*structpb.ListValue, error) {
// to add support for more types. Providing t as a type hint is optional.
func ToListValue(v []interface{}, t *runtimev1.Type) (*structpb.ListValue, error) {
var elemType *runtimev1.Type
if t != nil {
elemType = t.ArrayElementType
}
x := &structpb.ListValue{Values: make([]*structpb.Value, len(v))}
for i, v := range v {
var err error
x.Values[i], err = ToValue(v)
x.Values[i], err = ToValue(v, elemType)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/pkg/pbutil/pbutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestToStructCoerceKeys(t *testing.T) {
expected, err := structpb.NewStruct(tt.Expected)
require.NoError(t, err)

actual, err := ToStructCoerceKeys(tt.Input)
actual, err := ToStructCoerceKeys(tt.Input, nil)
require.NoError(t, err)

require.True(t, proto.Equal(expected, actual))
Expand Down
16 changes: 14 additions & 2 deletions runtime/queries/column_timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/pbutil"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -160,6 +161,17 @@ func (q *ColumnTimeseries) Resolve(ctx context.Context, rt *runtime.Runtime, ins
return err
}

// Omit the time value from the result schema
schema := rows.Schema
if schema != nil {
for i, f := range schema.Fields {
if f.Name == tsAlias {
schema.Fields = slices.Delete(schema.Fields, i, i+1)
break
}
}
}

var data []*runtimev1.TimeSeriesValue
for rows.Next() {
rowMap := make(map[string]any)
Expand All @@ -177,9 +189,9 @@ func (q *ColumnTimeseries) Resolve(ctx context.Context, rt *runtime.Runtime, ins
rows.Close()
panic(fmt.Sprintf("unexpected type for timestamp column: %T", v))
}

delete(rowMap, tsAlias)
records, err := pbutil.ToStruct(rowMap)

records, err := pbutil.ToStruct(rowMap, schema)
if err != nil {
rows.Close()
return err
Expand Down
2 changes: 1 addition & 1 deletion runtime/queries/column_topk.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (q *ColumnTopK) Resolve(ctx context.Context, rt *runtime.Runtime, instanceI
return err
}

entry.Value, err = pbutil.ToValue(val)
entry.Value, err = pbutil.ToValue(val, safeFieldType(rows.Schema, 0))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/queries/metricsview.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func rowsToData(rows *drivers.Result) ([]*structpb.Struct, error) {
return nil, err
}

rowStruct, err := pbutil.ToStruct(rowMap)
rowStruct, err := pbutil.ToStruct(rowMap, rows.Schema)
if err != nil {
return nil, err
}
Expand Down
24 changes: 11 additions & 13 deletions runtime/queries/metricsview_comparison_toplist.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,9 @@ func (q *MetricsViewComparisonToplist) executeToplist(ctx context.Context, olap
if err != nil {
return err
}

defer rows.Close()

var data []*runtimev1.MetricsViewComparisonRow

for rows.Next() {
values, err := rows.SliceScan()
if err != nil {
Expand All @@ -111,7 +109,7 @@ func (q *MetricsViewComparisonToplist) executeToplist(ctx context.Context, olap
measureValues := make([]*runtimev1.MetricsViewComparisonValue, 0, len(q.MeasureNames))

for i, name := range q.MeasureNames {
v, err := pbutil.ToValue(values[1+i])
v, err := pbutil.ToValue(values[1+i], safeFieldType(rows.Schema, 1+i))
if err != nil {
return err
}
Expand All @@ -122,7 +120,7 @@ func (q *MetricsViewComparisonToplist) executeToplist(ctx context.Context, olap
})
}

dv, err := pbutil.ToValue(values[0])
dv, err := pbutil.ToValue(values[0], safeFieldType(rows.Schema, 0))
if err != nil {
return err
}
Expand Down Expand Up @@ -165,22 +163,22 @@ func (q *MetricsViewComparisonToplist) executeComparisonToplist(ctx context.Cont
measureValues := []*runtimev1.MetricsViewComparisonValue{}

for i, name := range q.MeasureNames {
bv, err := pbutil.ToValue(values[1+i*4])
bv, err := pbutil.ToValue(values[1+i*4], safeFieldType(rows.Schema, 1+i*4))
if err != nil {
return err
}

cv, err := pbutil.ToValue(values[2+i*4])
cv, err := pbutil.ToValue(values[2+i*4], safeFieldType(rows.Schema, 2+i*4))
if err != nil {
return err
}

da, err := pbutil.ToValue(values[3+i*4])
da, err := pbutil.ToValue(values[3+i*4], safeFieldType(rows.Schema, 3+i*4))
if err != nil {
return err
}

dr, err := pbutil.ToValue(values[4+i*4])
dr, err := pbutil.ToValue(values[4+i*4], safeFieldType(rows.Schema, 4+i*4))
if err != nil {
return err
}
Expand All @@ -194,7 +192,7 @@ func (q *MetricsViewComparisonToplist) executeComparisonToplist(ctx context.Cont
})
}

dv, err := pbutil.ToValue(values[0])
dv, err := pbutil.ToValue(values[0], safeFieldType(rows.Schema, 0))
if err != nil {
return err
}
Expand Down Expand Up @@ -340,7 +338,7 @@ func (q *MetricsViewComparisonToplist) buildMetricsComparisonTopListSQL(mv *runt

args := []any{}
if mv.TimeDimension == "" {
return "", nil, fmt.Errorf("Metrics view '%s' doesn't have time dimension", mv.Name)
return "", nil, fmt.Errorf("metrics view '%s' doesn't have time dimension", mv.Name)
}

td := safeName(mv.TimeDimension)
Expand Down Expand Up @@ -377,7 +375,7 @@ func (q *MetricsViewComparisonToplist) buildMetricsComparisonTopListSQL(mv *runt
for _, s := range q.Sort {
i, ok := measureMap[s.MeasureName]
if !ok {
return "", nil, fmt.Errorf("Metrics view '%s' doesn't contain '%s' sort column", q.MetricsViewName, s.MeasureName)
return "", nil, fmt.Errorf("metrics view '%s' doesn't contain '%s' sort column", q.MetricsViewName, s.MeasureName)
}
orderClause += ", "
subQueryOrderClause += ", "
Expand Down Expand Up @@ -554,13 +552,13 @@ func (q *MetricsViewComparisonToplist) Export(ctx context.Context, rt *runtime.R

func validateSort(sorts []*runtimev1.MetricsViewComparisonSort) error {
if len(sorts) == 0 {
return fmt.Errorf("Sorting is required")
return fmt.Errorf("sorting is required")
}
firstSort := sorts[0]

for _, s := range sorts {
if firstSort != s {
return fmt.Errorf("Diffirent sort types are not supported in a single query")
return fmt.Errorf("diffirent sort types are not supported in a single query")
}
}
return nil
Expand Down
16 changes: 14 additions & 2 deletions runtime/queries/metricsview_timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/pkg/pbutil"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -155,6 +156,17 @@ func (q *MetricsViewTimeSeries) resolveDruid(ctx context.Context, olap drivers.O
}
defer rows.Close()

// Omit the time value from the result schema
schema := rows.Schema
if schema != nil {
for i, f := range schema.Fields {
if f.Name == tsAlias {
schema.Fields = slices.Delete(schema.Fields, i, i+1)
break
}
}
}

var data []*runtimev1.TimeSeriesValue
for rows.Next() {
rowMap := make(map[string]any)
Expand All @@ -170,9 +182,9 @@ func (q *MetricsViewTimeSeries) resolveDruid(ctx context.Context, olap drivers.O
default:
panic(fmt.Sprintf("unexpected type for timestamp column: %T", v))
}

delete(rowMap, tsAlias)
records, err := pbutil.ToStruct(rowMap)

records, err := pbutil.ToStruct(rowMap, schema)
if err != nil {
return err
}
Expand Down
13 changes: 12 additions & 1 deletion runtime/queries/protoutil.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
package queries

import "google.golang.org/protobuf/proto"
import (
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"google.golang.org/protobuf/proto"
)

// safeFieldType returns the type of the field at index i, or nil if no type is found.
func safeFieldType(t *runtimev1.StructType, i int) *runtimev1.Type {
if t != nil && len(t.Fields) > i {
return t.Fields[i].Type
}
return nil
}

// sizeProtoMessage returns size of serialized proto message
func sizeProtoMessage(m proto.Message) int64 {
Expand Down
Loading

0 comments on commit 425f14d

Please sign in to comment.