Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat : Alerts should not wake up idle clickhouse service #5787

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
26 changes: 23 additions & 3 deletions runtime/drivers/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"crypto/tls"
"errors"
"fmt"
"net/http"
"sync"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/XSAM/otelsql"
Expand Down Expand Up @@ -105,9 +108,17 @@ type configProperties struct {
// EmbedPort is the port to run Clickhouse locally (0 is random port).
EmbedPort int `mapstructure:"embed_port"`
// DataDir is the path to directory where db files will be created.
DataDir string `mapstructure:"data_dir"`
TempDir string `mapstructure:"temp_dir"`
CanScaleToZero bool `mapstructure:"can_scale_to_zero"`
DataDir string `mapstructure:"data_dir"`
TempDir string `mapstructure:"temp_dir"`

// CanScaleToZero is used to determine if the OLAP instance can be scaled to zero.
// In absence of configs required to make requests to the cloud API or in case of API errors service status will be set to this flag.
CanScaleToZero bool `mapstructure:"can_scale_to_zero"`
// CloudAPIKeyID, CloudAPIKeySecret, OrganizationID, ServiceID are used to make requests to the cloud API
APIKeyID string `mapstructure:"api_key_id"`
APIKeySecret string `mapstructure:"api_key_secret"`
OrganizationID string `mapstructure:"organization_id"`
ServiceID string `mapstructure:"service_id"`
Comment on lines +117 to +121
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These fields should probably be prefixed with cloud_ to clarify that they are CH Cloud specific.

}

// Open connects to Clickhouse using std API.
Expand Down Expand Up @@ -244,6 +255,15 @@ type connection struct {
opts *clickhouse.Options
// embed is embedded clickhouse server for local run
embed *embedClickHouse

// cloudAPI is http client used to make requests to the cloud API
cloudAPI http.Client
// scaledToZero is the cached service status.
scaledToZero bool
// statusCheckedAt is the time when the service status was last checked and cached. Cached status is valid for 10 minutes.
statusCheckedAt time.Time
// statusCheckMutex is used to synchronize access to scaledToZero and statusCheckedAt
statusCheckMutex sync.Mutex
}

// Ping implements drivers.Handle.
Expand Down
52 changes: 51 additions & 1 deletion runtime/drivers/clickhouse/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package clickhouse

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"

Expand Down Expand Up @@ -503,7 +505,55 @@ func (c *connection) renameTable(ctx context.Context, oldName, newName, onCluste
}

func (c *connection) MayBeScaledToZero(ctx context.Context) bool {
return c.config.CanScaleToZero
if c.config.APIKeyID == "" {
// no api key provided resort to the config set
return c.config.CanScaleToZero
}

c.statusCheckMutex.Lock()
defer c.statusCheckMutex.Unlock()
// check if stauts is cached
if !c.statusCheckedAt.IsZero() && time.Since(c.statusCheckedAt) <= time.Minute*10 {
return c.scaledToZero
}
Comment on lines +515 to +518
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think caching for 10 mins is too long, makes it too likely to lead to a feedback loop that keeps it alive


ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://api.clickhouse.cloud/v1/organizations/%s/services/%s", c.config.OrganizationID, c.config.ServiceID), http.NoBody)
if err != nil {
c.logger.Warn("failed to create clickhouse cloud API request", zap.Error(err))
return c.config.CanScaleToZero
}
req.SetBasicAuth(c.config.APIKeyID, c.config.APIKeySecret)

resp, err := c.cloudAPI.Do(req)
if err != nil {
c.logger.Warn("failed to get clickhouse cloud API response", zap.Error(err))
return c.config.CanScaleToZero
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
c.logger.Warn("failed to get clickhouse cloud API response", zap.Int("status_code", resp.StatusCode))
return c.config.CanScaleToZero
}

// parse response
var response struct {
Result struct {
State string `json:"state"`
} `json:"result"`
}
err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
c.logger.Warn("failed to decode clickhouse cloud API response", zap.Error(err))
return c.config.CanScaleToZero
}
scaledToZero := strings.EqualFold(response.Result.State, "idle")
// also cache the result
c.scaledToZero = scaledToZero
c.statusCheckedAt = time.Now()
return scaledToZero
}

// acquireMetaConn gets a connection from the pool for "meta" queries like information schema (i.e. fast queries).
Expand Down
47 changes: 44 additions & 3 deletions runtime/reconcilers/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,15 @@ func (r *AlertReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceNa
// Evaluate the trigger time of the alert. If triggered by schedule, we use the "clean" scheduled time.
// Note: Correction for watermarks and intervals is done in checkAlert.
var triggerTime time.Time
if scheduleTrigger && !adhocTrigger && !specHashTrigger && !refsTrigger {
onlyScheduledTrigger := scheduleTrigger && !adhocTrigger && !specHashTrigger && !refsTrigger
if onlyScheduledTrigger {
triggerTime = a.State.NextRunOn.AsTime()
} else {
triggerTime = time.Now()
}

// Run alert queries and send notifications
executeErr := r.executeAll(ctx, self, a, triggerTime, adhocTrigger)
executeErr := r.executeAll(ctx, self, a, triggerTime, adhocTrigger, onlyScheduledTrigger)

// If we were cancelled, exit without updating any other trigger-related state.
// NOTE: We don't set Retrigger here because we'll leave re-scheduling to whatever cancelled the reconciler.
Expand Down Expand Up @@ -376,7 +377,20 @@ func (r *AlertReconciler) setTriggerFalse(ctx context.Context, n *runtimev1.Reso

// executeAll runs queries and (maybe) sends notifications for the alert. It also adds entries to a.State.ExecutionHistory.
// By default, an alert is checked once for the current watermark, but if a.Spec.IntervalsIsoDuration is set, it will be checked *for each* interval that has elapsed since the previous execution watermark.
func (r *AlertReconciler) executeAll(ctx context.Context, self *runtimev1.Resource, a *runtimev1.Alert, triggerTime time.Time, adhocTrigger bool) error {
func (r *AlertReconciler) executeAll(ctx context.Context, self *runtimev1.Resource, a *runtimev1.Alert, triggerTime time.Time, adhocTrigger, onlyScheduledTrigger bool) error {
// Skip if OLAP is in idle state and alerts are configured to skip
if onlyScheduledTrigger {
err := r.validateOLAPState(ctx, self)
if err != nil {
skipErr := &skipError{}
if !errors.As(err, skipErr) {
return err
}
r.C.Logger.Info("Skipped alert check", zap.String("name", self.Meta.Name.Name), zap.String("reason", skipErr.reason))
return nil
}
}
Comment on lines +381 to +392
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is satisfactory because it means a scheduled alert may never get checked. Ideally we would want to find out when it was last alive and check the alert if it has been alive since the last alert check.

Also see the use of hasStreamingRef and AlertsDefaultStreamingRefreshCron – these exist to support alerting on external tables, so ideally the autoscaling handling happens around those code paths. For example, hasStreamingRef already traverses the refs looking for the metrics view to inspect its properties (similar to the new validateOLAPState function).


// Enforce timeout
timeout := alertCheckDefaultTimeout
if a.Spec.TimeoutSeconds > 0 {
Expand Down Expand Up @@ -826,6 +840,33 @@ func (r *AlertReconciler) computeInheritedWatermark(ctx context.Context, refs []
return t, !t.IsZero(), nil
}

func (r *AlertReconciler) validateOLAPState(ctx context.Context, self *runtimev1.Resource) error {
var mvSpec *runtimev1.MetricsViewSpec
for _, ref := range self.Meta.Refs {
if ref.Kind != runtime.ResourceKindMetricsView {
continue
}
mv, err := r.C.Get(ctx, ref, false)
if err != nil {
return err
}
mvSpec = mv.GetMetricsView().State.ValidSpec
}
if mvSpec == nil {
return nil
}

olap, release, err := r.C.AcquireOLAP(ctx, mvSpec.Connector)
if err != nil {
return err
}
defer release()
if olap.MayBeScaledToZero(ctx) {
return skipError{reason: "OLAP may be scaled to zero"}
}
return nil
}

// calculateAlertExecutionTimes calculates the execution times for an alert, taking into consideration the alert's intervals configuration and previous executions.
// If the alert is not configured to run on intervals, it will return a slice containing only the current watermark.
// If the alert should not be executed, it returns a skipError explaining why.
Expand Down
64 changes: 32 additions & 32 deletions web-common/src/runtime-client/gen/index.schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,26 @@ export const V1ResourceEvent = {
RESOURCE_EVENT_DELETE: "RESOURCE_EVENT_DELETE",
} as const;

export interface V1Resource {
meta?: V1ResourceMeta;
projectParser?: V1ProjectParser;
source?: V1SourceV2;
model?: V1ModelV2;
metricsView?: V1MetricsViewV2;
explore?: V1Explore;
migration?: V1Migration;
report?: V1Report;
alert?: V1Alert;
pullTrigger?: V1PullTrigger;
refreshTrigger?: V1RefreshTrigger;
bucketPlanner?: V1BucketPlanner;
theme?: V1Theme;
component?: V1Component;
canvas?: V1Canvas;
api?: V1API;
connector?: V1ConnectorV2;
}

export type V1ResolveComponentResponseRendererProperties = {
[key: string]: any;
};
Expand All @@ -835,6 +855,13 @@ If it resolves to false, the other fields are not set. */
rendererProperties?: V1ResolveComponentResponseRendererProperties;
}

export interface V1ReportState {
nextRunOn?: string;
currentExecution?: V1ReportExecution;
executionHistory?: V1ReportExecution[];
executionCount?: number;
}

export type V1ReportSpecAnnotations = { [key: string]: string };

export interface V1ReportSpec {
Expand Down Expand Up @@ -863,13 +890,6 @@ export interface V1ReportExecution {
finishedOn?: string;
}

export interface V1ReportState {
nextRunOn?: string;
currentExecution?: V1ReportExecution;
executionHistory?: V1ReportExecution[];
executionCount?: number;
}

export interface V1Report {
spec?: V1ReportSpec;
state?: V1ReportState;
Expand All @@ -896,26 +916,6 @@ export interface V1RefreshTrigger {
state?: V1RefreshTriggerState;
}

export interface V1Resource {
meta?: V1ResourceMeta;
projectParser?: V1ProjectParser;
source?: V1SourceV2;
model?: V1ModelV2;
metricsView?: V1MetricsViewV2;
explore?: V1Explore;
migration?: V1Migration;
report?: V1Report;
alert?: V1Alert;
pullTrigger?: V1PullTrigger;
refreshTrigger?: V1RefreshTrigger;
bucketPlanner?: V1BucketPlanner;
theme?: V1Theme;
component?: V1Component;
canvas?: V1Canvas;
api?: V1API;
connector?: V1ConnectorV2;
}

export interface V1RefreshModelTrigger {
/** The model to refresh. */
model?: string;
Expand Down Expand Up @@ -1243,11 +1243,6 @@ export interface V1MetricsViewToplistResponse {
data?: V1MetricsViewToplistResponseDataItem[];
}

export interface V1MetricsViewSort {
name?: string;
ascending?: boolean;
}

export interface V1MetricsViewToplistRequest {
instanceId?: string;
metricsViewName?: string;
Expand Down Expand Up @@ -1344,6 +1339,11 @@ It's set to true if the metrics view is based on an externally managed table. */
streaming?: boolean;
}

export interface V1MetricsViewSort {
name?: string;
ascending?: boolean;
}

export interface V1MetricsViewSearchResponse {
results?: MetricsViewSearchResponseSearchResult[];
}
Expand Down
Loading