From 4f4f9f08f105e87ddf769f45b7cf148e67b0a94b Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Thu, 30 Nov 2023 18:37:35 -0300 Subject: [PATCH] webrtc-load-tester: Migrate from Cloud Run to Compute Engine (#348) * .github: Fix failure notification * wrtc-lt: Migrate from Cloud Run to Compute Engine We found out UDP networking issues with Cloud Run and need to go lower level. This pretty much does that. * wrtc-lt: Fix instance template deletion * wrtc-lt/compute: Switch to premium network tier Ingress rate is free anyway, so let's use it! * wrtc-lt/orch: Fix VM group resource deletion * wrtc-lt/compute: Add monitoring and logging * .github: Update load-test action with new params * [DEV] .github: Run branch container image * wrtc-lt/orch: Fix template vs group deletion order * wrtc-lt/orch: Increase streaming duration VMs take longer to start * wrtc-lt/orch: Increase timeout for deleting VM group VMs are buuuuuuulky * Revert "[DEV] .github: Run branch container image" This reverts commit 79948f166a7a2df11d062bfde436f2dfa8f4c999. --- .github/workflows/load-test.yaml | 5 +- cmd/webrtc-load-tester/gcloud/compute.go | 266 +++++++++++++++++++ cmd/webrtc-load-tester/gcloud/jobs.go | 6 + cmd/webrtc-load-tester/roles/orchestrator.go | 168 ++++++------ cmd/webrtc-load-tester/roles/player.go | 19 +- go.mod | 5 +- go.sum | 6 +- 7 files changed, 374 insertions(+), 101 deletions(-) create mode 100644 cmd/webrtc-load-tester/gcloud/compute.go diff --git a/.github/workflows/load-test.yaml b/.github/workflows/load-test.yaml index d40f5902..59da9e78 100644 --- a/.github/workflows/load-test.yaml +++ b/.github/workflows/load-test.yaml @@ -186,8 +186,7 @@ jobs: LT_WEBRTC_PLAYBACK_MANIFEST_URL: "${{ steps.env.outputs.playback-manifest-url }}" LT_WEBRTC_PLAYBACK_JWT_PRIVATE_KEY: ${{ steps.env.outputs.playback-jwt-private-key }} LT_WEBRTC_PLAYBACK_VIEWERS_PER_WORKER: 10 - LT_WEBRTC_PLAYBACK_VIEWERS_PER_CPU: 2 - LT_WEBRTC_PLAYBACK_MEMORY_PER_VIEWER_MIB: 400 + LT_WEBRTC_PLAYBACK_MACHINE_TYPE: n2-highcpu-4 LT_WEBRTC_PLAYBACK_REGION_VIEWERS_JSON: '${{ inputs.playback-region-viewers-json }}' LT_WEBRTC_PLAYBACK_BASE_SCREENSHOT_FOLDER_OS: ${{ secrets.LOAD_TEST_SCREENSHOT_FOLDER_OS }} LT_WEBRTC_GOOGLE_CREDENTIALS_JSON: '${{ secrets.LOAD_TEST_GOOGLE_CREDENTIALS_JSON }}' @@ -227,7 +226,7 @@ jobs: DISCORD_USERNAME: ${{ github.triggering_actor }} DISCORD_EMBEDS: > [{ - "title": "${{ inputs.playback-protocol }} load test has failed!, + "title": "${{ inputs.playback-protocol }} load test has failed!", "url": "${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}", "color": 8388608, "author": { diff --git a/cmd/webrtc-load-tester/gcloud/compute.go b/cmd/webrtc-load-tester/gcloud/compute.go new file mode 100644 index 00000000..c2981d83 --- /dev/null +++ b/cmd/webrtc-load-tester/gcloud/compute.go @@ -0,0 +1,266 @@ +package gcloud + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + konlet "github.com/GoogleCloudPlatform/konlet/gce-containers-startup/types" + "github.com/golang/glog" + compute "google.golang.org/api/compute/v1" + "google.golang.org/api/googleapi" + "gopkg.in/yaml.v2" +) + +var ( + computeClient *compute.Service + + konletRestartPolicyNever = konlet.RestartPolicyNever // just so we can take an address easily +) + +type VMTemplateSpec struct { + ContainerImage string + Role string + Args []string + + TestID string + MachineType string +} + +func CreateVMTemplate(ctx context.Context, spec VMTemplateSpec) (url, name string, err error) { + name = fmt.Sprintf("load-tester-%s-%s", spec.TestID[:8], spec.Role) + + containerSpec, err := yaml.Marshal(konlet.ContainerSpec{ + Spec: konlet.ContainerSpecStruct{ + Containers: []konlet.Container{{ + Name: "load-tester", + Image: spec.ContainerImage, + Command: []string{"webrtc-load-tester"}, + Args: append([]string{spec.Role}, spec.Args...), + }}, + RestartPolicy: &konletRestartPolicyNever, + }, + }) + if err != nil { + return "", "", fmt.Errorf("error creating VM container spec: %w", err) + } + + template := &compute.InstanceTemplate{ + Name: name, + Description: "test-id=" + spec.TestID, + Properties: &compute.InstanceProperties{ + MachineType: spec.MachineType, + Disks: []*compute.AttachedDisk{ + { + Type: "PERSISTENT", + Boot: true, + AutoDelete: true, + InitializeParams: &compute.AttachedDiskInitializeParams{ + SourceImage: "projects/cos-cloud/global/images/family/cos-stable", + DiskSizeGb: 10, + }, + }, + }, + NetworkInterfaces: []*compute.NetworkInterface{ + { + Name: "global/networks/default", + AccessConfigs: []*compute.AccessConfig{ + { + Name: "External NAT", + Type: "ONE_TO_ONE_NAT", + NetworkTier: "PREMIUM", + }, + }, + }, + }, + Metadata: &compute.Metadata{ + Items: []*compute.MetadataItems{ + { + Key: "gce-container-declaration", + Value: googleapi.String(string(containerSpec)), + }, + { + Key: "google-monitoring-enabled", + Value: googleapi.String("true"), + }, + { + Key: "google-logging-enabled", + Value: googleapi.String("true"), + }, + }, + }}, + } + + op, err := computeClient.InstanceTemplates.Insert(projectID, template).Context(ctx).Do() + if err != nil { + return "", "", fmt.Errorf("error creating GCE instance template: %w", err) + } + + err = waitForOperation(ctx, op) + if err != nil { + return "", "", fmt.Errorf("error creating GCE instance template: %w", err) + } + + template, err = computeClient.InstanceTemplates.Get(projectID, name).Context(ctx).Do() + if err != nil { + return "", "", fmt.Errorf("error getting GCE instance template: %w", err) + } + + return template.SelfLink, name, nil +} + +func DeleteVMTemplate(templateName string) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + + glog.Infof("Deleting VM template: %s", templateName) + op, err := computeClient.InstanceTemplates.Delete(projectID, templateName).Context(ctx).Do() + if err != nil { + glog.Errorf("error deleting GCE instance template: %v", err) + return + } + + err = waitForOperation(ctx, op) + if err != nil { + glog.Errorf("error deleting GCE instance template: %v", err) + return + } +} + +func CreateVMGroup(ctx context.Context, spec VMTemplateSpec, templateURL, region string, numInstances int64) (string, error) { + name := fmt.Sprintf("load-tester-%s-%s-%s", spec.TestID[:8], spec.Role, region) + instanceGroupManager := &compute.InstanceGroupManager{ + Name: name, + Description: "test-id=" + spec.TestID, + BaseInstanceName: name, + InstanceTemplate: templateURL, + TargetSize: numInstances, + } + + op, err := computeClient.RegionInstanceGroupManagers.Insert(projectID, region, instanceGroupManager).Context(ctx).Do() + if err != nil { + return "", fmt.Errorf("error creating instance group: %w", err) + } + + return name, waitForOperation(ctx, op) +} + +// DeleteVMGroup deletes a VM group and waits for the operation to complete. It +// doesn't receive a ctx because it's meant to run as a cleanup on shutdown. +func DeleteVMGroup(region, groupName string) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + glog.Infof("Deleting VM group: %s", groupName) + op, err := computeClient.RegionInstanceGroupManagers.Delete(projectID, region, groupName).Context(ctx).Do() + + if err != nil { + glog.Errorf("error deleting VM group %s: %v", groupName, err) + return + } + + if err = waitForOperation(ctx, op); err != nil { + glog.Errorf("error deleting VM group %s: %v", groupName, err) + return + } +} + +type VMGroupInfo struct { + Region, Name string +} + +func DeleteVMGroups(groups []VMGroupInfo) { + wg := sync.WaitGroup{} + wg.Add(len(groups)) + + for _, group := range groups { + go func(group VMGroupInfo) { + defer wg.Done() + DeleteVMGroup(group.Region, group.Name) + }(group) + } + + wg.Wait() +} + +func CheckVMGroupStatus(ctx context.Context, region, groupName string) error { + instances, err := computeClient.RegionInstanceGroupManagers.ListManagedInstances(projectID, region, groupName).Context(ctx).Do() + if err != nil { + return fmt.Errorf("error getting VM group instances: %w", err) + } + + status := map[string]int{} + running := true + for _, instance := range instances.ManagedInstances { + status[instance.InstanceStatus]++ + running = running && instance.InstanceStatus == "RUNNING" + } + + glog.Infof("VM group %s: running=%v status=%v", + simpleName(groupName), running, status) + + return nil +} + +func ListVMGroups(ctx context.Context, region, testID string) ([]string, error) { + groups, err := computeClient.RegionInstanceGroupManagers.List(projectID, region).Context(ctx).Do() + if err != nil { + return nil, fmt.Errorf("error listing VM groups: %w", err) + } + + var groupNames []string + for _, group := range groups.Items { + if group.Description == "test-id="+testID { + groupNames = append(groupNames, group.Name) + } + } + + return groupNames, nil +} + +func ListVMTemplates(ctx context.Context, testID string) ([]string, error) { + templates, err := computeClient.InstanceTemplates.List(projectID).Context(ctx).Do() + if err != nil { + return nil, fmt.Errorf("error listing VM templates: %w", err) + } + + var templateNames []string + for _, template := range templates.Items { + if template.Description == "test-id="+testID { + templateNames = append(templateNames, template.Name) + } + } + + return templateNames, nil +} + +func waitForOperation(ctx context.Context, op *compute.Operation) (err error) { + for { + var currentOp *compute.Operation + if op.Region == "" { + currentOp, err = computeClient.GlobalOperations.Get(projectID, op.Name).Context(ctx).Do() + } else { + // op.Region is a fully qualified URL, grab only the last path segment which is the region name + region := op.Region[strings.LastIndex(op.Region, "/")+1:] + currentOp, err = computeClient.RegionOperations.Get(projectID, region, op.Name).Context(ctx).Do() + } + if err != nil { + return fmt.Errorf("error getting operation status: %w", err) + } + + if currentOp.Status == "DONE" { + if currentOp.Error != nil { + errMsgs := []string{} + for _, err := range currentOp.Error.Errors { + errMsgs = append(errMsgs, err.Message) + } + return fmt.Errorf("operation error: %v", strings.Join(errMsgs, "; ")) + } + return nil + } + + time.Sleep(3 * time.Second) + } +} diff --git a/cmd/webrtc-load-tester/gcloud/jobs.go b/cmd/webrtc-load-tester/gcloud/jobs.go index 69a0f94f..966646e8 100644 --- a/cmd/webrtc-load-tester/gcloud/jobs.go +++ b/cmd/webrtc-load-tester/gcloud/jobs.go @@ -10,6 +10,7 @@ import ( run "cloud.google.com/go/run/apiv2" "cloud.google.com/go/run/apiv2/runpb" "github.com/golang/glog" + compute "google.golang.org/api/compute/v1" "google.golang.org/api/iterator" "google.golang.org/api/option" "google.golang.org/protobuf/types/known/durationpb" @@ -33,6 +34,11 @@ func InitClients(ctx context.Context, credentialsJSON, credsProjectID string) (e return err } + computeClient, err = compute.NewService(ctx, credsOpt) + if err != nil { + return err + } + projectID = credsProjectID return nil diff --git a/cmd/webrtc-load-tester/roles/orchestrator.go b/cmd/webrtc-load-tester/roles/orchestrator.go index bb05550b..878c6b6d 100644 --- a/cmd/webrtc-load-tester/roles/orchestrator.go +++ b/cmd/webrtc-load-tester/roles/orchestrator.go @@ -4,7 +4,6 @@ import ( "context" "flag" "fmt" - "math" "net/url" "os" "path" @@ -17,7 +16,7 @@ import ( "github.com/livepeer/stream-tester/cmd/webrtc-load-tester/utils" ) -const jobsPollingInterval = 1 * time.Minute +const statusPollingInterval = 30 * time.Second type loadTestArguments struct { TestID string // set one to recover a running test. auto-generated if not provided @@ -45,8 +44,7 @@ type loadTestArguments struct { JWTPrivateKey string RegionViewersJSON map[string]int ViewersPerWorker int - ViewersPerCPU float64 - MemoryPerViewerMiB int + MachineType string DelayBetweenRegions time.Duration BaseScreenshotFolderOS *url.URL ScreenshotPeriod time.Duration @@ -78,8 +76,7 @@ func Orchestrator() { fs.StringVar(&cliFlags.Playback.JWTPrivateKey, "playback-jwt-private-key", "", "Private key to sign JWT tokens for access controlled playback") utils.JSONVarFlag(fs, &cliFlags.Playback.RegionViewersJSON, "playback-region-viewers-json", `{"us-central1":100,"europe-west2":100}`, "JSON object of Google Cloud regions to the number of viewers that should be simulated there. Notice that the values must be multiples of playback-viewers-per-worker, and up to 1000 x that") fs.IntVar(&cliFlags.Playback.ViewersPerWorker, "playback-viewers-per-worker", 10, "Number of viewers to simulate per worker") - fs.Float64Var(&cliFlags.Playback.ViewersPerCPU, "playback-viewers-per-cpu", 2, "Number of viewers to allocate per CPU on player jobs") - fs.IntVar(&cliFlags.Playback.MemoryPerViewerMiB, "playback-memory-per-viewer-mib", 300, "Amount of memory to allocate per viewer (browser tab)") + fs.StringVar(&cliFlags.Playback.MachineType, "playback-machine-type", "n2-highcpu-4", "Machine type to use for player jobs") fs.DurationVar(&cliFlags.Playback.DelayBetweenRegions, "playback-delay-between-regions", 1*time.Minute, "How long to wait between starting jobs on different regions") utils.URLVarFlag(fs, &cliFlags.Playback.BaseScreenshotFolderOS, "playback-base-screenshot-folder-os", "", "Object Store URL for a folder where to save screenshots of the player. If unset, no screenshots will be taken") fs.DurationVar(&cliFlags.Playback.ScreenshotPeriod, "playback-screenshot-period", 1*time.Minute, "How often to take a screenshot of the player") @@ -168,33 +165,46 @@ func runLoadTest(ctx context.Context, args loadTestArguments) (err error) { glog.Infof("Access the stream at: https://%s", path.Join(args.APIServer, "/dashboard/streams", stream.ID)) - var jobsToDelete []string - defer func() { gcloud.DeleteJobs(jobsToDelete) }() + streamerSpec := streamerVMSpec(args, stream.StreamKey) + streamerTemplateURL, streamerTemplateName, err := gcloud.CreateVMTemplate(ctx, streamerVMSpec(args, stream.StreamKey)) + if err != nil { + return fmt.Errorf("failed to create streamer VM template: %w", err) + } + defer gcloud.DeleteVMTemplate(streamerTemplateName) - streamerJob, streamer, err := gcloud.CreateJob(ctx, streamerJobSpec(args, stream.StreamKey)) + playerSpec := playerVMSpec(args, stream.PlaybackID) + playerTemplateURL, playerTemplateName, err := gcloud.CreateVMTemplate(ctx, playerSpec) if err != nil { - return fmt.Errorf("failed to create streamer job: %w", err) + return fmt.Errorf("failed to create player VM template: %w", err) } - jobsToDelete = append(jobsToDelete, streamerJob.Name) + defer gcloud.DeleteVMTemplate(playerTemplateName) + + var createdVMGroups []gcloud.VMGroupInfo + defer func() { gcloud.DeleteVMGroups(createdVMGroups) }() - glog.Infof("Streamer job created on region %s: %s (execution: %s)", args.Streamer.Region, streamer.Job, streamer.Name) + streamerGroup, err := gcloud.CreateVMGroup(ctx, streamerSpec, streamerTemplateURL, args.Streamer.Region, 1) + if err != nil { + return fmt.Errorf("failed to create streamer VM group: %w", err) + } + createdVMGroups = append(createdVMGroups, gcloud.VMGroupInfo{args.Streamer.Region, streamerGroup}) + + glog.Infof("Streamer VM created on region %s: %s", args.Streamer.Region, streamerGroup) - executions := []string{streamer.Name} for region, numViewers := range args.Playback.RegionViewersJSON { glog.Infof("Waiting %s before starting player in %s", args.Playback.DelayBetweenRegions, region) wait(ctx, args.Playback.DelayBetweenRegions) - viewerJob, viewer, err := gcloud.CreateJob(ctx, playerJobSpec(args, region, numViewers, stream.PlaybackID)) + numInstances := int64(numViewers / args.Playback.ViewersPerWorker) + viewerGroup, err := gcloud.CreateVMGroup(ctx, playerSpec, playerTemplateURL, region, numInstances) if err != nil { - return fmt.Errorf("failed to create player job: %w", err) + return fmt.Errorf("failed to create player VM group in %s: %w", region, err) } - jobsToDelete = append(jobsToDelete, viewerJob.Name) + createdVMGroups = append(createdVMGroups, gcloud.VMGroupInfo{region, viewerGroup}) - glog.Infof("Player job created on region %s: %s (execution: %s)", region, viewer.Job, viewer.Name) - executions = append(executions, viewer.Name) + glog.Infof("Player VM group created on region %s: %s", region, viewerGroup) } - waitTestFinished(ctx, stream.ID, executions) + waitTestFinished(ctx, stream.ID, createdVMGroups, false) return nil } @@ -206,79 +216,80 @@ func recoverLoadTest(ctx context.Context, args loadTestArguments) error { glog.Infof("Recovering test with ID %s", args.TestID) wait(ctx, 5*time.Second) - var executions []string - var jobsToDelete []string - defer func() { gcloud.DeleteJobs(jobsToDelete) }() + templates, err := gcloud.ListVMTemplates(ctx, args.TestID) + if err != nil { + return fmt.Errorf("failed to list VM templates: %w", err) + } + for _, template := range templates { + defer gcloud.DeleteVMTemplate(template) + } + + var vmGroups []gcloud.VMGroupInfo + defer func() { gcloud.DeleteVMGroups(vmGroups) }() for region := range args.Playback.RegionViewersJSON { - regionExecs, err := gcloud.ListExecutions(ctx, region, args.TestID) + regionGroups, err := gcloud.ListVMGroups(ctx, region, args.TestID) if err != nil { return fmt.Errorf("failed to list executions on region %s: %w", region, err) } - ownedJobs := map[string]bool{} - for _, exec := range regionExecs { - executions = append(executions, exec.Name) - - if job := gcloud.FullJobName(region, exec.Job); !ownedJobs[job] { - ownedJobs[job] = true - glog.Infof("Taking ownership of %s job on region %s", job, region) - jobsToDelete = append(jobsToDelete, job) - } + for _, group := range regionGroups { + vmGroups = append(vmGroups, gcloud.VMGroupInfo{region, group}) } } - waitTestFinished(ctx, args.TestID, executions) + waitTestFinished(ctx, args.TestID, vmGroups, true) return nil } -func waitTestFinished(ctx context.Context, streamID string, executions []string) { +func waitTestFinished(ctx context.Context, streamID string, vmGroups []gcloud.VMGroupInfo, isRecover bool) { glog.Infof("Waiting for test to finish...") - ticker := time.NewTicker(jobsPollingInterval) + ticker := time.NewTicker(statusPollingInterval) defer ticker.Stop() + // assume the stream was already active if we're recovering a running test + streamWasActive := isRecover for { - select { - case <-ctx.Done(): - return - case <-ticker.C: + stream, err := studioApi.GetStream(streamID, false) + if err != nil { + glog.Errorf("Error getting stream status: %v\n", err) + continue } + streamWasActive = streamWasActive || stream.IsActive - if streamID != "" { - stream, err := studioApi.GetStream(streamID, false) + glog.Infof("Stream status: isActive=%v lastSeen=%s sourceDurationSec=%.2f sourceSegments=%d transcodedSegments=%d", + stream.IsActive, time.UnixMilli(stream.LastSeen).Format(time.RFC3339), stream.SourceSegmentsDuration, stream.SourceSegments, stream.TranscodedSegments) + + for _, group := range vmGroups { + err := gcloud.CheckVMGroupStatus(ctx, group.Region, group.Name) if err != nil { - glog.Errorf("Error getting stream status: %v\n", err) - continue + glog.Errorf("Error checking VM group status: %v\n", err) } - - glog.Infof("Stream status: lastSeen=%s sourceDurationSec=%.2f sourceSegments=%d transcodedSegments=%d", - time.UnixMilli(stream.LastSeen).Format(time.RFC3339), stream.SourceSegmentsDuration, stream.SourceSegments, stream.TranscodedSegments) } - allFinished := true - for _, exec := range executions { - finished := gcloud.CheckExecutionStatus(ctx, exec) - allFinished = allFinished && finished - } - - if allFinished { + // we consider the test finished if we saw the stream active at least + // once and then it went inactive. + if streamWasActive && !stream.IsActive { glog.Infof("Test finished") break } + + select { + case <-ctx.Done(): + return + case <-ticker.C: + } } } -func streamerJobSpec(args loadTestArguments, streamKey string) gcloud.JobSpec { +func streamerVMSpec(args loadTestArguments, streamKey string) gcloud.VMTemplateSpec { // Stream for a little longer since viewers join slowly - additionalStreamDelay := args.Playback.DelayBetweenRegions * time.Duration(1+len(args.Playback.RegionViewersJSON)) + additionalStreamDelay := args.Playback.DelayBetweenRegions*time.Duration(len(args.Playback.RegionViewersJSON)) + 3*time.Minute duration := args.TestDuration + additionalStreamDelay - timeout := duration + 10*time.Minute - - return gcloud.JobSpec{ - Region: args.Streamer.Region, + return gcloud.VMTemplateSpec{ ContainerImage: args.ContainerImage, Role: "streamer", Args: []string{ @@ -287,19 +298,15 @@ func streamerJobSpec(args loadTestArguments, streamKey string) gcloud.JobSpec { "-input-file", args.Streamer.InputFile, "-duration", duration.String(), }, - Timeout: timeout, - TestID: args.TestID, - NumTasks: 1, - CPUs: 1, - MemoryMiB: 512, + TestID: args.TestID, + MachineType: "n1-standard-1", } } -func playerJobSpec(args loadTestArguments, region string, viewers int, playbackID string) gcloud.JobSpec { +func playerVMSpec(args loadTestArguments, playbackID string) gcloud.VMTemplateSpec { simultaneous := args.Playback.ViewersPerWorker - numTasks := viewers / args.Playback.ViewersPerWorker - timeout := args.TestDuration + 10*time.Minute + // numTasks := viewers / args.Playback.ViewersPerWorker playbackURL := "" if args.Playback.ManifestURL != "" { @@ -316,38 +323,19 @@ func playerJobSpec(args loadTestArguments, region string, viewers int, playbackI } if args.Playback.BaseScreenshotFolderOS != nil { jobArgs = append(jobArgs, - "-screenshot-folder-os", args.Playback.BaseScreenshotFolderOS.JoinPath(args.TestID, region).String(), + "-screenshot-folder-os", args.Playback.BaseScreenshotFolderOS.JoinPath(args.TestID).String(), "-screenshot-period", args.Playback.ScreenshotPeriod.String(), ) } - return gcloud.JobSpec{ - Region: region, - + return gcloud.VMTemplateSpec{ ContainerImage: args.ContainerImage, Role: "player", Args: jobArgs, - Timeout: timeout, - - TestID: args.TestID, - NumTasks: numTasks, - CPUs: allowedCPUValue(float64(simultaneous) / args.Playback.ViewersPerCPU), // Defaults to 1 CPU every 2 viewers - MemoryMiB: int(math.Ceil(float64(simultaneous*args.Playback.MemoryPerViewerMiB)/128) * 128), // Round up to 128MB increments - } -} -// allowedCPUValue returns the first allowed value for the CPU equal or higher than the requested value. The allowed -// values for the CPU are [1 2 4 6 8]. Cloud Run supports values <1 but we don't. -func allowedCPUValue(viewersPerCPU float64) int { - allowedValues := []int{1, 2, 4, 6, 8} - cpuInt := int(math.Ceil(viewersPerCPU)) - - for _, v := range allowedValues { - if v >= cpuInt { - return v - } + TestID: args.TestID, + MachineType: args.Playback.MachineType, } - return allowedValues[len(allowedValues)-1] } func wait(ctx context.Context, dur time.Duration) { diff --git a/cmd/webrtc-load-tester/roles/player.go b/cmd/webrtc-load-tester/roles/player.go index e544966f..c7a69601 100644 --- a/cmd/webrtc-load-tester/roles/player.go +++ b/cmd/webrtc-load-tester/roles/player.go @@ -126,11 +126,13 @@ func runSinglePlayerTest(ctx context.Context, args playerArguments, idx uint) er if args.ScreenshotFolderOS == nil { tasks = append(tasks, chromedp.Sleep(args.TestDuration)) } else { - osFolder := args.ScreenshotFolderOS. - JoinPath(getHostname(), fmt.Sprintf("player-%d", idx)). - String() + osFolder := args.ScreenshotFolderOS + if region := getRegion(); region != "" { + osFolder = osFolder.JoinPath(region) + } + osFolder = osFolder.JoinPath(getHostname(), fmt.Sprintf("player-%d", idx)) - driver, err := drivers.ParseOSURL(osFolder, true) + driver, err := drivers.ParseOSURL(osFolder.String(), true) if err != nil { return err } @@ -213,6 +215,15 @@ func getHostname() string { return hostname } +func getRegion() string { + if zone := os.Getenv("ZONE"); zone != "" { + return zone + } else if gceZone := os.Getenv("GCE_ZONE"); gceZone != "" { + return gceZone + } + return "" +} + func addQuery(urlStr, name, value string) string { u, err := url.Parse(urlStr) if err != nil { diff --git a/go.mod b/go.mod index 44a03b8a..7cae9582 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/golang/glog v1.1.2 github.com/gosuri/uilive v0.0.3 // indirect github.com/gosuri/uiprogress v0.0.1 - github.com/livepeer/go-api-client v0.4.14-0.20231122152358-8756b0dfa4f4 + github.com/livepeer/go-api-client v0.4.14-0.20231130155418-dd87e78bea93 github.com/livepeer/go-livepeer v0.7.2-0.20231110152159-b17a70dfe719 github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07 github.com/livepeer/leaderboard-serverless v1.0.0 @@ -97,7 +97,7 @@ require ( google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b // indirect google.golang.org/protobuf v1.31.0 gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 // indirect howett.net/plist v1.0.0 // indirect ) @@ -110,6 +110,7 @@ require ( require ( cloud.google.com/go/run v1.3.1 + github.com/GoogleCloudPlatform/konlet v0.0.0-20221118094820-015a8324dd48 github.com/chromedp/chromedp v0.9.3 github.com/golang-jwt/jwt/v4 v4.5.0 github.com/livepeer/catalyst-api v0.1.1 diff --git a/go.sum b/go.sum index 82f3cc71..59e827c7 100644 --- a/go.sum +++ b/go.sum @@ -80,6 +80,8 @@ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBp github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DataDog/zstd v1.5.2 h1:vUG4lAyuPCXO0TLbXvPv7EB7cNK1QV/luu55UHLrrn8= +github.com/GoogleCloudPlatform/konlet v0.0.0-20221118094820-015a8324dd48 h1:3WllLHYiJb0o2iy6QKSSFjC+ejLGZ74+rQ5tHCEdqKM= +github.com/GoogleCloudPlatform/konlet v0.0.0-20221118094820-015a8324dd48/go.mod h1:rc3M3P9GlhytgXmz3APA1TzcVeF18r0HqJ8Uu3Fhi3Y= github.com/Necroforger/dgrouter v0.0.0-20200517224846-e66453b957c1 h1:3OHJOlf0r1CVSA1E3Ts4uLWsCnucYndMRjNk4rFiQdE= github.com/Necroforger/dgrouter v0.0.0-20200517224846-e66453b957c1/go.mod h1:FdMxPfOp4ppZW2OJjLagSMri7g5k9luvTm7Y3aIxQSc= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -553,8 +555,8 @@ github.com/libp2p/go-netroute v0.2.0 h1:0FpsbsvuSnAhXFnCY0VLFbJOzaK0VnP0r1QT/o4n github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo= github.com/livepeer/catalyst-api v0.1.1 h1:WP4rHH88b+lsxo33wPCjl0yvqVDNyxkleZH1sA0M5GE= github.com/livepeer/catalyst-api v0.1.1/go.mod h1:d6XPE9ehhCutWhCqqcmlYqQa+e9bf3Ke92x+gRZlzoQ= -github.com/livepeer/go-api-client v0.4.14-0.20231122152358-8756b0dfa4f4 h1:epSCD9cs+5gZy1lI8Ibjz7nRpcjvRDwF9/J/7IQkYIU= -github.com/livepeer/go-api-client v0.4.14-0.20231122152358-8756b0dfa4f4/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= +github.com/livepeer/go-api-client v0.4.14-0.20231130155418-dd87e78bea93 h1:vQYapLFJ9EyRWTjOsJr1ullF0wiazRme2fSJDZnFrIs= +github.com/livepeer/go-api-client v0.4.14-0.20231130155418-dd87e78bea93/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= github.com/livepeer/go-livepeer v0.7.2-0.20231110152159-b17a70dfe719 h1:468kFmwQFaI00eNCLL8qA5XuIBMwqqVgKEXvqS7msa8= github.com/livepeer/go-livepeer v0.7.2-0.20231110152159-b17a70dfe719/go.mod h1:d6qTStiNmXTQ/5YLB9fhzgDV9MdXg3KmqESQpur2Ak0= github.com/livepeer/go-tools v0.3.0 h1:xK0mJyPWWyvj9Oi9nfLglhCtk0KM8883WB7VO1oPF8g=