Skip to content

Commit

Permalink
webrtc-load-tester: Migrate from Cloud Run to Compute Engine (#348)
Browse files Browse the repository at this point in the history
* .github: Fix failure notification

* wrtc-lt: Migrate from Cloud Run to Compute Engine

We found out UDP networking issues with Cloud Run
and need to go lower level. This pretty much does
that.

* wrtc-lt: Fix instance template deletion

* wrtc-lt/compute: Switch to premium network tier

Ingress rate is free anyway, so let's use it!

* wrtc-lt/orch: Fix VM group resource deletion

* wrtc-lt/compute: Add monitoring and logging

* .github: Update load-test action with new params

* [DEV] .github: Run branch container image

* wrtc-lt/orch: Fix template vs group deletion order

* wrtc-lt/orch: Increase streaming duration

VMs take longer to start

* wrtc-lt/orch: Increase timeout for deleting VM group

VMs are buuuuuuulky

* Revert "[DEV] .github: Run branch container image"

This reverts commit 79948f1.
  • Loading branch information
victorges authored Nov 30, 2023
1 parent 278a106 commit 4f4f9f0
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 101 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/load-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ jobs:
LT_WEBRTC_PLAYBACK_MANIFEST_URL: "${{ steps.env.outputs.playback-manifest-url }}"
LT_WEBRTC_PLAYBACK_JWT_PRIVATE_KEY: ${{ steps.env.outputs.playback-jwt-private-key }}
LT_WEBRTC_PLAYBACK_VIEWERS_PER_WORKER: 10
LT_WEBRTC_PLAYBACK_VIEWERS_PER_CPU: 2
LT_WEBRTC_PLAYBACK_MEMORY_PER_VIEWER_MIB: 400
LT_WEBRTC_PLAYBACK_MACHINE_TYPE: n2-highcpu-4
LT_WEBRTC_PLAYBACK_REGION_VIEWERS_JSON: '${{ inputs.playback-region-viewers-json }}'
LT_WEBRTC_PLAYBACK_BASE_SCREENSHOT_FOLDER_OS: ${{ secrets.LOAD_TEST_SCREENSHOT_FOLDER_OS }}
LT_WEBRTC_GOOGLE_CREDENTIALS_JSON: '${{ secrets.LOAD_TEST_GOOGLE_CREDENTIALS_JSON }}'
Expand Down Expand Up @@ -227,7 +226,7 @@ jobs:
DISCORD_USERNAME: ${{ github.triggering_actor }}
DISCORD_EMBEDS: >
[{
"title": "${{ inputs.playback-protocol }} load test has failed!,
"title": "${{ inputs.playback-protocol }} load test has failed!",
"url": "${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}",
"color": 8388608,
"author": {
Expand Down
266 changes: 266 additions & 0 deletions cmd/webrtc-load-tester/gcloud/compute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
package gcloud

import (
"context"
"fmt"
"strings"
"sync"
"time"

konlet "github.com/GoogleCloudPlatform/konlet/gce-containers-startup/types"
"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
"gopkg.in/yaml.v2"
)

var (
computeClient *compute.Service

konletRestartPolicyNever = konlet.RestartPolicyNever // just so we can take an address easily
)

type VMTemplateSpec struct {
ContainerImage string
Role string
Args []string

TestID string
MachineType string
}

func CreateVMTemplate(ctx context.Context, spec VMTemplateSpec) (url, name string, err error) {
name = fmt.Sprintf("load-tester-%s-%s", spec.TestID[:8], spec.Role)

containerSpec, err := yaml.Marshal(konlet.ContainerSpec{
Spec: konlet.ContainerSpecStruct{
Containers: []konlet.Container{{
Name: "load-tester",
Image: spec.ContainerImage,
Command: []string{"webrtc-load-tester"},
Args: append([]string{spec.Role}, spec.Args...),
}},
RestartPolicy: &konletRestartPolicyNever,
},
})
if err != nil {
return "", "", fmt.Errorf("error creating VM container spec: %w", err)
}

template := &compute.InstanceTemplate{
Name: name,
Description: "test-id=" + spec.TestID,
Properties: &compute.InstanceProperties{
MachineType: spec.MachineType,
Disks: []*compute.AttachedDisk{
{
Type: "PERSISTENT",
Boot: true,
AutoDelete: true,
InitializeParams: &compute.AttachedDiskInitializeParams{
SourceImage: "projects/cos-cloud/global/images/family/cos-stable",
DiskSizeGb: 10,
},
},
},
NetworkInterfaces: []*compute.NetworkInterface{
{
Name: "global/networks/default",
AccessConfigs: []*compute.AccessConfig{
{
Name: "External NAT",
Type: "ONE_TO_ONE_NAT",
NetworkTier: "PREMIUM",
},
},
},
},
Metadata: &compute.Metadata{
Items: []*compute.MetadataItems{
{
Key: "gce-container-declaration",
Value: googleapi.String(string(containerSpec)),
},
{
Key: "google-monitoring-enabled",
Value: googleapi.String("true"),
},
{
Key: "google-logging-enabled",
Value: googleapi.String("true"),
},
},
}},
}

op, err := computeClient.InstanceTemplates.Insert(projectID, template).Context(ctx).Do()
if err != nil {
return "", "", fmt.Errorf("error creating GCE instance template: %w", err)
}

err = waitForOperation(ctx, op)
if err != nil {
return "", "", fmt.Errorf("error creating GCE instance template: %w", err)
}

template, err = computeClient.InstanceTemplates.Get(projectID, name).Context(ctx).Do()
if err != nil {
return "", "", fmt.Errorf("error getting GCE instance template: %w", err)
}

return template.SelfLink, name, nil
}

func DeleteVMTemplate(templateName string) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

glog.Infof("Deleting VM template: %s", templateName)
op, err := computeClient.InstanceTemplates.Delete(projectID, templateName).Context(ctx).Do()
if err != nil {
glog.Errorf("error deleting GCE instance template: %v", err)
return
}

err = waitForOperation(ctx, op)
if err != nil {
glog.Errorf("error deleting GCE instance template: %v", err)
return
}
}

func CreateVMGroup(ctx context.Context, spec VMTemplateSpec, templateURL, region string, numInstances int64) (string, error) {
name := fmt.Sprintf("load-tester-%s-%s-%s", spec.TestID[:8], spec.Role, region)
instanceGroupManager := &compute.InstanceGroupManager{
Name: name,
Description: "test-id=" + spec.TestID,
BaseInstanceName: name,
InstanceTemplate: templateURL,
TargetSize: numInstances,
}

op, err := computeClient.RegionInstanceGroupManagers.Insert(projectID, region, instanceGroupManager).Context(ctx).Do()
if err != nil {
return "", fmt.Errorf("error creating instance group: %w", err)
}

return name, waitForOperation(ctx, op)
}

// DeleteVMGroup deletes a VM group and waits for the operation to complete. It
// doesn't receive a ctx because it's meant to run as a cleanup on shutdown.
func DeleteVMGroup(region, groupName string) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

glog.Infof("Deleting VM group: %s", groupName)
op, err := computeClient.RegionInstanceGroupManagers.Delete(projectID, region, groupName).Context(ctx).Do()

if err != nil {
glog.Errorf("error deleting VM group %s: %v", groupName, err)
return
}

if err = waitForOperation(ctx, op); err != nil {
glog.Errorf("error deleting VM group %s: %v", groupName, err)
return
}
}

type VMGroupInfo struct {
Region, Name string
}

func DeleteVMGroups(groups []VMGroupInfo) {
wg := sync.WaitGroup{}
wg.Add(len(groups))

for _, group := range groups {
go func(group VMGroupInfo) {
defer wg.Done()
DeleteVMGroup(group.Region, group.Name)
}(group)
}

wg.Wait()
}

func CheckVMGroupStatus(ctx context.Context, region, groupName string) error {
instances, err := computeClient.RegionInstanceGroupManagers.ListManagedInstances(projectID, region, groupName).Context(ctx).Do()
if err != nil {
return fmt.Errorf("error getting VM group instances: %w", err)
}

status := map[string]int{}
running := true
for _, instance := range instances.ManagedInstances {
status[instance.InstanceStatus]++
running = running && instance.InstanceStatus == "RUNNING"
}

glog.Infof("VM group %s: running=%v status=%v",
simpleName(groupName), running, status)

return nil
}

func ListVMGroups(ctx context.Context, region, testID string) ([]string, error) {
groups, err := computeClient.RegionInstanceGroupManagers.List(projectID, region).Context(ctx).Do()
if err != nil {
return nil, fmt.Errorf("error listing VM groups: %w", err)
}

var groupNames []string
for _, group := range groups.Items {
if group.Description == "test-id="+testID {
groupNames = append(groupNames, group.Name)
}
}

return groupNames, nil
}

func ListVMTemplates(ctx context.Context, testID string) ([]string, error) {
templates, err := computeClient.InstanceTemplates.List(projectID).Context(ctx).Do()
if err != nil {
return nil, fmt.Errorf("error listing VM templates: %w", err)
}

var templateNames []string
for _, template := range templates.Items {
if template.Description == "test-id="+testID {
templateNames = append(templateNames, template.Name)
}
}

return templateNames, nil
}

func waitForOperation(ctx context.Context, op *compute.Operation) (err error) {
for {
var currentOp *compute.Operation
if op.Region == "" {
currentOp, err = computeClient.GlobalOperations.Get(projectID, op.Name).Context(ctx).Do()
} else {
// op.Region is a fully qualified URL, grab only the last path segment which is the region name
region := op.Region[strings.LastIndex(op.Region, "/")+1:]
currentOp, err = computeClient.RegionOperations.Get(projectID, region, op.Name).Context(ctx).Do()
}
if err != nil {
return fmt.Errorf("error getting operation status: %w", err)
}

if currentOp.Status == "DONE" {
if currentOp.Error != nil {
errMsgs := []string{}
for _, err := range currentOp.Error.Errors {
errMsgs = append(errMsgs, err.Message)
}
return fmt.Errorf("operation error: %v", strings.Join(errMsgs, "; "))
}
return nil
}

time.Sleep(3 * time.Second)
}
}
6 changes: 6 additions & 0 deletions cmd/webrtc-load-tester/gcloud/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
run "cloud.google.com/go/run/apiv2"
"cloud.google.com/go/run/apiv2/runpb"
"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/protobuf/types/known/durationpb"
Expand All @@ -33,6 +34,11 @@ func InitClients(ctx context.Context, credentialsJSON, credsProjectID string) (e
return err
}

computeClient, err = compute.NewService(ctx, credsOpt)
if err != nil {
return err
}

projectID = credsProjectID

return nil
Expand Down
Loading

0 comments on commit 4f4f9f0

Please sign in to comment.