Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Performance mega boost - queue per app #1990

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

bnetzi
Copy link

@bnetzi bnetzi commented Apr 17, 2024

Issues related

#1138
#1574
#783
#1489

Purpose of this PR

First of all - this PR is mainly a Draft which we think should be discussed, and that's why we are submitting it even though we haven't added documentation not unit tests.

The original design of spark-operator posses one queue which is used for all the applications.
This design is causing a huge latency issue when trying to deal with hundreds / thousands of applications concurrently.

In benchmarks me and my fellows from Mobileye preformed, we showed clearly a linear latency increase depend on the amount of apps handled.
When getting to more than 500 applications, the avg time from creating a spark application object until pod creation is ~130 seconds for app to start. When going up to 1200 apps concurrently it can go Up to 20 minutes on average for each spark application to be created.
Scale up vertically would not be helpful as the CPU is doing nothing, most of the time is spent on the queue mutex.

The change we are presenting here is to create a queue for each app.
It required a big change all around the code, but it is not changing the main flow in any way.

Our benchmarks showed that even with 1000 apps concurrently, Avg time for application creation is ~7 seconds

We also added a nice feature of using memoryLimit for driver / executor which is larger than the request by using the admission webhook.

Proposed changes:

  • create a queue for each app
  • Add a parameter to controll qps and burst for k8s api
  • adding memoryLimit option for executors and drivers

Change Category

  • Feature (non-breaking change which adds functionality)

What are we still missing:

  • fixing broken unit test
  • documentation
  • Peer review

I would point out that this code currently runs on our production environment with massive scale without any issues.

@vara-bonthu
Copy link
Contributor

/assign @ChenYi015
/assign @yuchaoran2011

Please review the changes when you get a chance

Copy link
Contributor

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please ask for approval from yuchaoran2011. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@bnetzi bnetzi marked this pull request as ready for review June 16, 2024 21:39
Copy link
Contributor

@yuchaoran2011 yuchaoran2011 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impressive work! A few questions:

  • What do we need K8s burst and QPS for?
  • What's the impact on memory and CPU consumption of setting a higher or lower frequency of queue cleaning?
  • Will there be edge cases where a queue's mutex is already deleted due to not receiving any updates for a while but the application still exists?

@bnetzi
Copy link
Author

bnetzi commented Jun 18, 2024

Impressive work! A few questions:

  • What do we need K8s burst and QPS for?
  • What's the impact on memory and CPU consumption of setting a higher or lower frequency of queue cleaning?
  • Will there be edge cases where a queue's mutex is already deleted due to not receiving any updates for a while but the application still exists?

Hi @yuchaoran2011 thanks for reviewing!

  • Qps: we first did all the rest of the code and did not increase qps and Burst and the outcome was pretty good, but several issues still bothered us:
  1. CPU usage was very high at high scale
  2. Even though app submit was fast, application status updates were very slow and could take 20 minutes at scale, which was really bad for us
  3. At some point app were just stuck forever without status, even though the app was finished, and when spark operator restarted - it submitted the apps again as they had no status and no resources related
    After profiling we discovered most of the time was spent on kiss API exponential back off that was caused from low qps and Burst
    (We also tested just raising the qps and Burst without the rest of our changes and it was not preforming well)
  • As for memory and CPU consumption - we mostly tried to avoid running the queue clean often as it's locking a map when it cleans it. We haven't noticed high memory nor CPU but I can provide some CPU and memory consumption graphs to show the affects of amount of apps etc.

  • about the edge case, yes of course it can happen, especially of you set low values of queue cleanup intervals, but it does not cause any harm as every access to an app queue is always by the get or create method and the cleanup is not cleaning queues that aren't empty

@luohao
Copy link

luohao commented Jun 18, 2024

Thanks for this patch!

We also ran into similar issues at scale. What we observed was a race condition between the SparkOperator reconciliation and k8s Pod GC. We have autoscaler that terminates nodes when no Pods is running so completed Pods are deleted from API server when the node is terminated. If the SparkOperator could not update the status in time, it won't find the driver Pod and thus make the app as failed due to driver pod not found.

Copy link
Contributor

@vara-bonthu vara-bonthu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ben-tzionlmb Looking forward to see this feature to be merged. Would be nice to add some tests with this feature.

Comment on lines +77 to +78
- -max-queue-time-without-update-in-minutes=30
- -queue-cleaner-interval-in-minutes=10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make these two values configurable? Would there be a possibility for users to modify these values?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I will

timestamp=$(date +%Y-%m-%d_%H-%M-%S)

# Function to fetch goroutine profile
fetch_goroutine_profile() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this script used anywhere? Couldn't find the references or docs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This script is used for manual profiling which we've added as part of this feature (disabled by default). I probably need to add some documenting on how to use it

@yuchaoran2011
Copy link
Contributor

@bnetzi Thanks for the clarification. That makes sense. The default is 50, if I remember correctly. Doubling the Spark operator QPS strikes a balance between allowing for more requests from operator to be processed and not overloading k8s API server. Overall the idea looks really solid. I second fleshing out this PR to include tests

Comment on lines +80 to +84
type AppQueuesMap struct {
appQueueMutex *sync.RWMutex
appQueueMap map[string]AppQueueInfo
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming of this struct is strange. The overall type name is AppQueuesMap but a field is named similarly appQueueMap. Can we give the struct type a more intuitive name? Something that indicates that it includes the mutex?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is not good naming, I will change it

@bnetzi
Copy link
Author

bnetzi commented Jun 19, 2024

@bnetzi Thanks for the clarification. That makes sense. The default is 50, if I remember correctly. Doubling the Spark operator QPS strikes a balance between allowing for more requests from operator to be processed and not overloading k8s API server. Overall the idea looks really solid. I second fleshing out this PR to include tests

The default in golang is actually 5 qps and 10 burst which is very low.

Maybe we should put default value which is lower than 100 and 200 by default for small k8s clusters or such with mix usage

@bnetzi bnetzi force-pushed the master branch 2 times, most recently from 53e6a2b to 0f3789f Compare June 19, 2024 14:41
@vincentye38
Copy link

Please help me to understand your statement in the description more

Scale up vertically would not be helpful as the CPU is doing nothing, most of the time is spent on the queue mutex.

For the original runWorker()->processNext(), processNext()'s queue.Get() get a spark-app to process. Multiple workers calling runWorker() each will pull spark-app items from queue sequentially because of queue mutex, but the items will be processed concurrently inside each worker's processNext(). Is 20 minutes on average incurred by processing the item, but not by blocking in queue.Get()?

@yuchaoran2011
Copy link
Contributor

@bnetzi I think @vincentye38 brought up a valid point above. What he's proposing (i.e. having more workers concurrently process items) complements the solution proposed in your PR. Have you tried bumping up the number of workers to see what effect that has on performance?

@bnetzi
Copy link
Author

bnetzi commented Jun 20, 2024

@vincentye38 @yuchaoran2011 Thanks for commenting

First I would mention that we obviously increased the amount of controller threads while raising the cpu, and it did not help at all. The weirdest thing is that although we were stuck and everything was very slow - the cpu was only 5% on a 32 vCPU instance.

I think that in order to understand it we need to understand some of the mechanism and parameters involved here.

In my understanding (and this behavior is what we saw in the logs as well) each of the go routines is waiting in line to preform a q.Get() but the next item in queue be processed by another go routine only when q.Done() is called which is after we are doing the sync. It actually makes a lot of sense as if we have an event that triggers app creation, and then some app update - we want the creation to be done and the app state to be synced before we are preforming the next operation.
But this design makes all other go routines to wait for this task to be done. So the controller threads not making a significant impact on performance.

Also - the operator is using a a RateLimitingInterface workqueue, and this queue is also adding waits when the rate limit is reached, so it makes everything even worse. We tried to increase the limits and it helped a little but did not help enough, and that's why we decided on a queue per app approach.

Another point to mention here, (and in my opinion it is what causing us to get to finally 20 minutes after running a long time in a large scale env) is that we have a vast amount of events per app. It is not just app creation, driver pod start and finish, but also each executor pod state change is an event. This is causing much longer waiting time as there are much more

@vincentye38
Copy link

@bnetzi Thank you for explaining. It's interesting that the cpu utilization is so low.

each of the go routines is waiting in line to preform a q.Get() but the next item in queue be processed by another go routine only when q.Done() is called which is after we are doing the sync.

My understanding from the source code of q.Get() and q.Done() is that go routines can pickup other items from the queue. the queue deduplicates item which has identical one in the queue or in process when the item is inserted. The new identical item will be set aside into dirty set util the one in process is marked by q.done(item), then the new item will be moved from dirty to queue. WorkQueue is a dedup-queue. so q.done() doesn't block other go routine to q.Get() other item.

I guess the problem that cause low cpu utilization, low throughput and blocking items being dequeued is RateLimitingInterface. Could you mind to test out using WorkQueue directory without wrapping it with RateLimitingInterface? Thank you.

@bnetzi
Copy link
Author

bnetzi commented Jun 20, 2024

@bnetzi Thank you for explaining. It's interesting that the cpu utilization is so low.

each of the go routines is waiting in line to preform a q.Get() but the next item in queue be processed by another go routine only when q.Done() is called which is after we are doing the sync.

My understanding from the source code of q.Get() and q.Done() is that go routines can pickup other items from the queue. the queue deduplicates item which has identical one in the queue or in process when the item is inserted. The new identical item will be set aside into dirty set util the one in process is marked by q.done(item), then the new item will be moved from dirty to queue. WorkQueue is a dedup-queue. so q.done() doesn't block other go routine to q.Get() other item.

I guess the problem that cause low cpu utilization, low throughput and blocking items being dequeued is RateLimitingInterface. Could you mind to test out using WorkQueue directory without wrapping it with RateLimitingInterface? Thank you.

Although from the source code it looks like it - I don't think the way you describe it is the way it actually works. We are missing something that would explain exactly why it doesn't work in parallel.
The things I know for sure are - in reality it did not ran the important code segments in parallel, profiling with pprof showed most of time the code is stuck on mutex, and increasing the rate limit even to absurd numbers (100,000) did not change this behavior.

In any case - in my opinion the solution we came up with is a better design overall and it actually works for us in large scale production and after extensive tests. I don't have the capacity nor the resources to preform more tests at large scale just in order to understand the bug here, especially as we have a working version . However, If you can try to create a version without rate limit and test it yourself I think it would be beneficial

@bnetzi
Copy link
Author

bnetzi commented Jun 20, 2024

Adding a google doc I've made for illustrating the issue and our solution

https://docs.google.com/document/u/0/d/14x76jK7naRjxeFCkqUe7MgoDzZjYETixXXa828o0kSM/mobilebasic

@luohao
Copy link

luohao commented Jul 10, 2024

I wanted to share that we've been running a build where we replaced the RateLimitingInterface with the default workqueue. We've seen a significant reduction in reconciliation delay, even without increasing the client QPS as suggested in this PR. Occasionally, during peak hours, we still experience up to a 5-minute delay before the process starts, but this might not be entirely due to the operator reconciliation delay.

We haven't noticed any negative impacts from removing the rate limiter. This makes me wonder why the spark-operator uses it. What specific scenario is it designed to protect against?

@robscc
Copy link

robscc commented Jul 10, 2024

I wanted to share that we've been running a build where we replaced the RateLimitingInterface with the default workqueue. We've seen a significant reduction in reconciliation delay, even without increasing the client QPS as suggested in this PR. Occasionally, during peak hours, we still experience up to a 5-minute delay before the process starts, but this might not be entirely due to the operator reconciliation delay.

We haven't noticed any negative impacts from removing the rate limiter. This makes me wonder why the spark-operator uses it. What specific scenario is it designed to protect against?

ratelimit is necessary to control the pressure of apiserver and limit the repeated updates of the spark application

LastUpdateTs time.Time
}

// A struck that will have map of queues and mutex for the map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

struct

// assert.NotNil(t, m.appQueueMap[app.Name].LastUpdateTs)

// Check that the SparkApplication was enqueued.
q := ctrl.GetOrCreateRelevantQueue(app.Name).Queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should GetOrCreateRelevantQueue with from calling key, err := keyFunc(app)

// Check that m is not nil.
assert.NotNil(t, m)
// Check that m.appQueueMap[appName].Queue is not nil.
assert.NotNil(t, m.appQueueMap[app.Name])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be
key, _ := keyFunc(app)
m.appQueueMap[key]

// Start starts the Controller by registering a watcher for SparkApplication objects.
func (c *Controller) Start(workers int, stopCh <-chan struct{}) error {
func (c *Controller) Start(maxAge time.Duration, checkInterval time.Duration, stopCh <-chan struct{}) error {
// Wait for all involved caches to be synced, before processing items from the queue is started.
if !cache.WaitForCacheSync(stopCh, c.cacheSynced) {
return fmt.Errorf("timed out waiting for cache to sync")
}
Copy link
Contributor

@tiechengsu tiechengsu Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, we are breaking the leader election feature, as previously the worker thread creation is blocked by startCh so that only the leader operator has active worker thread. But we right now we are creating worker thread as new sparkapp comes in
https://www.kubeflow.org/docs/components/spark-operator/user-guide/leader-election/

One potential solution is that we rely on startCh (which previously used to block worker thread creation) to set a global variable. e.g. startWorker , which will only be set to true when the operator receive from startCh and become leader. And then we can skip start worker thread if startWorker is false

@xyzyng
Copy link

xyzyng commented Sep 27, 2024

We are doing load test on spark operator and we also run into this issue. The test is run on an EKS cluster with 50 m5.4xlarge EC2 instance. We keep increasing the input traffic load by creating many spark application from client, we noticed spark job submitted from operator can keep up with the speed of the input rate at the begining, but once the input rate goes between 250 or 300 jobs per minute the spark-submit rate become stalled and cannot further increase. The start job latency once increased to more than 800 seconds. We are running operator with 30 worker and tried increase further but it does not really help. Looking at the metrics, we noticed the same behavior that CPU utilization of the operator is always below 10 vcores, but at the same time, the workqueue depth keep increasing to even more than 1000, where we have to stop the test.

We suspect we are mostly hit by not able to increase qps and burst as we see lots of such errors in operator log

I0926 11:50:49.403544      10 request.go:682] Waited for 39.99761059s due to client-side throttling, not priority and fairness, request: PUT:https://172.20.0.1:443/apis/sparkoperator.k8s.io/v1beta2/namespaces/spark-job4/sparkapplications/spark-pi-e6a3693e-6512-4e30-ad7e-69b4f33bdb66/status
I0926 11:50:59.403555      10 request.go:682] Waited for 39.997706246s due to client-side throttling, not priority and fairness, request: PUT:https://172.20.0.1:443/apis/sparkoperator.k8s.io/v1beta2/namespaces/spark-job3/sparkapplications/spark-pi-3fa65941-2ca1-44d4-8898-d1071ee0a2ee/status

But we may also be impacted by the single workqueue mutex given the high traffic.

It will be really appreciated if this issue can be fixed in further spark operator versions. Thanks.

@bnetzi
Copy link
Author

bnetzi commented Sep 27, 2024

For all the followers if this MR, per the massive change in spark operator design, I've waited with this MR to see if it is still relevant after the changes done.

I haven't had time to test V2 at scale yet, nor to accommodate my changes to V2.
I would be happy to know if anyone already tested V2 at scale and have some conclusions

@ImpSy
Copy link
Contributor

ImpSy commented Sep 27, 2024

My company actually went with a different implementation (customizing the workqueue ratelimiter) to handle high load clusters
The change from manually creating the controller (historical implemtation) to using controller-runtime (V2 changes) make it still relevant

I've actually open a PR to backport it here -> #2186

To add context we implemented this to fix a customer issue because their airflow would submit 25K SparkApplication CR at the same time
Obviously this is not a silver bullet, you also need to increase cpu/memory and the number of controller threads

@xyzyng
Copy link

xyzyng commented Sep 28, 2024

For all the followers if this MR, per the massive change in spark operator design, I've waited with this MR to see if it is still relevant after the changes done.

I haven't had time to test V2 at scale yet, nor to accommodate my changes to V2. I would be happy to know if anyone already tested V2 at scale and have some conclusions

I actually tested V2 operator on the cluster with same size (50 ec2 m5.4xlarge) today see similar behavior as V1. The job start latency is 2-4 seconds when the input rate is around 50-100 jobs per minutes. After increasing it to 150, start latency goes to 12s, which is stills table. Further increased to 200 job per minutes causes the start latency increased continuously, at one time it reached 150s. It is also interesting that after the operator run with 200 job per min for some time (maybe after 30 minutes), the start latency and running job number all dropped to 0 (and at same time SUBMITTED/COMPLETED/SUCCEEDING job number all keep increasing), if I randomly sample job from operator log, the latency still increases to more than 4 minutes.

@bnetzi
Copy link
Author

bnetzi commented Sep 28, 2024

@xyzyng from what @ImpSy wrote it sounds like the main issue with V2 is just some tweaks with the controller runtime configurations (which make sense, since it used for many high volume operators)

@vara-bonthu - referring you to the discussion here, I think we just need to provide in general more options to configure the controller runtime, and that my PR is irrelevant

@gangahiremath
Copy link

@bnetzi , @vara-bonthu , regarding the point 'referring you to the discussion here, I think we just need to provide in general more options to configure the controller runtime, and that my PR is irrelevant',

Summary of changes :

     - port spark-submit to golang
     - this removes JVM invocation, hence, performance-wise faster
     - no dependency on apache spark(as the frequency and quantity of changes to driver pod going to be minimal in future releases of apache spark)

@vara-bonthu
Copy link
Contributor

@bnetzi , @vara-bonthu , regarding the point 'referring you to the discussion here, I think we just need to provide in general more options to configure the controller runtime, and that my PR is irrelevant',

Summary of changes :

     - port spark-submit to golang
     - this removes JVM invocation, hence, performance-wise faster
     - no dependency on apache spark(as the frequency and quantity of changes to driver pod going to be minimal in future releases of apache spark)

Thanks @gangahiremath ! Please add your feedback to this issue #2193

@c-h-afzal
Copy link

Hey @bnetzi -

  • we at Salesforce are running performance tests for Spark Operator using your PR branch.. From the recent conversation in this thread, it seems like V2 operator (without this PR) still suffers from performance issues as mentioned by @xyzyng . Can you please let us know the following:
  1. Do you intend to pursue this MR and do you still see the one mutex per queue change as the major reason for low performance in V2?

  2. Can you update your PR branch with the latest (or V2) changes and we can try to run performance tests on that branch. That’ll help us test your change+V2 for performance.

  3. Do you or any of the other folks on this thread, suspect JVM spin-up times as a possible reason for spark job submission latency?

Thank you.

@bnetzi
Copy link
Author

bnetzi commented Oct 15, 2024

@bnetzi , @vara-bonthu , regarding the point 'referring you to the discussion here, I think we just need to provide in general more options to configure the controller runtime, and that my PR is irrelevant',

Summary of changes :

     - port spark-submit to golang
     - this removes JVM invocation, hence, performance-wise faster
     - no dependency on apache spark(as the frequency and quantity of changes to driver pod going to be minimal in future releases of apache spark)

I'm not sure that MR 2186 is the solution here, as there are also qps and burst configurations for other k8s API users (such as CRD Listers etc.)
Unfortunately I don't have the time nor resources to test V2 at scale and detect the performance issues there. I would probably be able to get to it only on mid November.
Would be great if someone else can test it.

As for the JVM overhead. Although it has some performance impact, moving to golang submit would require an operator version for every change in spark submit and would probably tend to break / have missing functions more often on new spark versions.
I believe we can get to a good enough solution without adding it.

@bnetzi
Copy link
Author

bnetzi commented Oct 15, 2024

Hey @bnetzi -

  • we at Salesforce are running performance tests for Spark Operator using your PR branch.. From the recent conversation in this thread, it seems like V2 operator (without this PR) still suffers from performance issues as mentioned by @xyzyng . Can you please let us know the following:
  1. Do you intend to pursue this MR and do you still see the one mutex per queue change as the major reason for low performance in V2?
  2. Can you update your PR branch with the latest (or V2) changes and we can try to run performance tests on that branch. That’ll help us test your change+V2 for performance.
  3. Do you or any of the other folks on this thread, suspect JVM spin-up times as a possible reason for spark job submission latency?

Thank you.

I can't promise when, but if someone else wouldn't in the meantime - I will make sure V2 won't have scale performance issues. It might take a while but our platform is heavily based on spark operator and we do no intend to be stuck with V1.

The thing is in V2 the whole code was changed and many infra issues were fixed by moving to be based on controller runtime.
So the way I see it - work queue per app might not longer be the solution. As I suggested in other comments in this MR, tweaking the k8s config (qps + burst) for the controller + the work queue might be a simpler more effective way here. But only testing would find out.

@ImpSy
Copy link
Contributor

ImpSy commented Oct 17, 2024

Another solution could be to move the submission phase to a dedicated pod
That way app reporting will not have side effect on the most important part of the operator (launching spark applications)

@bnetzi
Copy link
Author

bnetzi commented Oct 21, 2024

Another solution could be to move the submission phase to a dedicated pod That way app reporting will not have side effect on the most important part of the operator (launching spark applications)

I think in the first place the reason we have a queue is to make sure all actions on the pod are preformed by order. If you would separate it you might change this behavior.
For example - what would happen if while the creation pod is trying to create the pod, there is an API request to delete the app and app is deleted before pod creation? Probably what would happen as the app is the owner ref, the pod would be created but the k8s garbage collector will terminate it eventually, but when? It might cause very weird bugs.

But in general I do think the spark operator pod is doing way too much work, especially executors related stuff and should be separated to different containers at least

@ImpSy
Copy link
Contributor

ImpSy commented Oct 22, 2024

I think in the first place the reason we have a queue is to make sure all actions on the pod are preformed by order. If you would separate it you might change this behavior.

I only made this proposition because that's the implementation that the apache/spark-kubernetes-operator decided to make
But I feel like 2 components running the reconciliation on the resources (Application CR in that case) would not break the order processing since the Submitted State would only be set after a successful submission by the submission worker and the controller would pick the lifecycle of the app afterwards

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.