Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasfrank committed Jul 20, 2023
1 parent 8592768 commit 296422d
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 66 deletions.
21 changes: 11 additions & 10 deletions broker/machinebroker/server/machine_pool_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,22 @@ func (s *Server) PoolInfo(ctx context.Context, req *ori.PoolInfoRequest) (*ori.P
}

var (
sharedCPU, staticCPU int64
sharedMemory, staticMemory uint64
sharedCPU, staticCPU = resource.NewQuantity(0, resource.DecimalSI), resource.NewQuantity(0, resource.DecimalSI)
sharedMemory, staticMemory = resource.NewQuantity(0, resource.BinarySI), resource.NewQuantity(0, resource.BinarySI)
)

for _, onmetalMachinePool := range onmetalMachinePoolList.Items {
staticCPU += onmetalMachinePool.Status.Capacity.Name(corev1alpha1.ResourceCPU, resource.DecimalSI).AsDec().UnscaledBig().Int64()
sharedCPU += onmetalMachinePool.Status.Capacity.Name(corev1alpha1.ResourceSharedCPU, resource.DecimalSI).AsDec().UnscaledBig().Int64()
sharedCPU.Add(*onmetalMachinePool.Status.Capacity.Name(corev1alpha1.ResourceCPU, resource.DecimalSI))
sharedCPU.Add(*onmetalMachinePool.Status.Capacity.Name(corev1alpha1.ResourceSharedCPU, resource.DecimalSI))

staticMemory += onmetalMachinePool.Status.Capacity.Name(corev1alpha1.ResourceMemory, resource.BinarySI).AsDec().UnscaledBig().Uint64()
sharedMemory += onmetalMachinePool.Status.Capacity.Name(corev1alpha1.ResourceSharedMemory, resource.BinarySI).AsDec().UnscaledBig().Uint64()
staticMemory.Add(*onmetalMachinePool.Status.Capacity.Name(corev1alpha1.ResourceMemory, resource.BinarySI))
sharedMemory.Add(*onmetalMachinePool.Status.Capacity.Name(corev1alpha1.ResourceSharedMemory, resource.BinarySI))
}

return &ori.PoolInfoResponse{
SharedCpu: sharedCPU,
StaticCpu: staticCPU,
SharedMemory: sharedMemory,
StaticMemory: staticMemory,
SharedCpu: sharedCPU.AsDec().UnscaledBig().Int64(),
StaticCpu: staticCPU.AsDec().UnscaledBig().Int64(),
SharedMemory: sharedMemory.AsDec().UnscaledBig().Uint64(),
StaticMemory: staticMemory.AsDec().UnscaledBig().Uint64(),
}, nil
}
2 changes: 1 addition & 1 deletion cmd/onmetal-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func main() {

if controllers.Enabled(machineSchedulerController) {
schedulerCache := scheduler.NewCache(mgr.GetLogger(), scheduler.DefaultCacheStrategy)
if mgr.Add(schedulerCache); err != nil {
if err := mgr.Add(schedulerCache); err != nil {
setupLog.Error(err, "unable to create cache", "controller", "MachineSchedulerCache")
os.Exit(1)
}
Expand Down
57 changes: 2 additions & 55 deletions internal/controllers/compute/scheduler/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scheduler
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/util/wait"
"sync"
"time"

Expand All @@ -12,9 +11,8 @@ import (
"golang.org/x/exp/maps"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
ccache "sigs.k8s.io/controller-runtime/pkg/cache"
)

type CacheStrategy interface {
Expand Down Expand Up @@ -79,60 +77,22 @@ type instanceState struct {
func NewCache(log logr.Logger, strategy CacheStrategy) *Cache {
return &Cache{
log: log,
startWait: make(chan struct{}),
assumedInstances: sets.New[types.UID](),
instanceStates: make(map[types.UID]*instanceState),
nodes: make(map[string]*ContainerInfo),
strategy: strategy,
}
}

type listener struct {
handler toolscache.ResourceEventHandler
}

type updateNotification struct {
oldObj any
newObj any
}

type addNotification struct {
newObj any
isInInitialList bool
}

type deleteNotification struct {
oldObj any
}

func (r *listener) add(evt any) {
switch n := evt.(type) {
case addNotification:
r.handler.OnAdd(n, n.isInInitialList)
case updateNotification:
r.handler.OnUpdate(n.oldObj, n.newObj)
case deleteNotification:
r.handler.OnDelete(n.oldObj)
}
}

type Cache struct {
mu sync.RWMutex

startedMu sync.Mutex
started bool
startWait chan struct{}

log logr.Logger
cache ccache.Cache
log logr.Logger

assumedInstances sets.Set[types.UID]
instanceStates map[types.UID]*instanceState
nodes map[string]*ContainerInfo

containerListenersMu sync.RWMutex
containerListeners sets.Set[*listener]

strategy CacheStrategy
}

Expand Down Expand Up @@ -183,12 +143,10 @@ func (c *Cache) Snapshot() *Snapshot {
}

func (c *Cache) IsAssumedInstance(instance *v1alpha1.Machine) (bool, error) {
log := c.log.WithValues("Instance", klog.KObj(instance))
key, err := c.strategy.Key(instance)
if err != nil {
return false, err
}
log = log.WithValues("InstanceKey", key)

c.mu.RLock()
defer c.mu.RUnlock()
Expand Down Expand Up @@ -256,15 +214,6 @@ func (c *Cache) FinishBinding(instance *v1alpha1.Machine) error {
return nil
}

func (c *Cache) distributeContainerEvent(evt any) {
c.containerListenersMu.RLock()
defer c.containerListenersMu.RUnlock()

for l := range c.containerListeners {
l.add(evt)
}
}

func (c *Cache) AddContainer(node *v1alpha1.MachinePool) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -275,7 +224,6 @@ func (c *Cache) AddContainer(node *v1alpha1.MachinePool) {
c.nodes[node.Name] = n
}
n.node = node
return
}

func (c *Cache) UpdateContainer(_, newNode *v1alpha1.MachinePool) {
Expand All @@ -288,7 +236,6 @@ func (c *Cache) UpdateContainer(_, newNode *v1alpha1.MachinePool) {
c.nodes[newNode.Name] = n
}
n.node = newNode
return
}

func (c *Cache) RemoveContainer(node *v1alpha1.MachinePool) error {
Expand Down

0 comments on commit 296422d

Please sign in to comment.