Skip to content

Commit

Permalink
incorporating review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ushabelgur committed Jul 10, 2024
1 parent 209af93 commit bb44927
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 83 deletions.
2 changes: 1 addition & 1 deletion broker/machinebroker/server/event_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *Server) filterEvents(events []*iri.Event, filter *iri.EventFilter) []*i
}

func (s *Server) ListEvents(ctx context.Context, req *iri.ListEventsRequest) (*iri.ListEventsResponse, error) {
ironcoreMachineList, err := s.listIroncoreMachine(ctx)
ironcoreMachineList, err := s.listIroncoreMachines(ctx)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions broker/machinebroker/server/machine_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

func (s *Server) listAggregateIronCoreMachines(ctx context.Context) ([]AggregateIronCoreMachine, error) {
ironcoreMachineList, err := s.listIroncoreMachine(ctx)
ironcoreMachineList, err := s.listIroncoreMachines(ctx)
if err != nil {
return nil, fmt.Errorf("error listing ironcore machines: %w", err)
}
Expand Down Expand Up @@ -55,7 +55,7 @@ func (s *Server) listAggregateIronCoreMachines(ctx context.Context) ([]Aggregate
return res, nil
}

func (s *Server) listIroncoreMachine(ctx context.Context) (*computev1alpha1.MachineList, error) {
func (s *Server) listIroncoreMachines(ctx context.Context) (*computev1alpha1.MachineList, error) {
ironcoreMachineList := &computev1alpha1.MachineList{}
if err := s.cluster.Client().List(ctx, ironcoreMachineList,
client.InNamespace(s.cluster.Namespace()),
Expand Down
31 changes: 15 additions & 16 deletions iri/testing/machine/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

iri "github.com/ironcore-dev/ironcore/iri/apis/machine/v1alpha1"
"github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -63,12 +62,17 @@ type FakeMachineClassStatus struct {
iri.MachineClassStatus
}

type FakeMachineEvents struct {
iri.MachineEvents
}

type FakeRuntimeService struct {
sync.Mutex

Machines map[string]*FakeMachine
MachineClassStatus map[string]*FakeMachineClassStatus
GetExecURL func(req *iri.ExecRequest) string
MachineEvents map[string]*FakeMachineEvents
}

// ListEvents implements machine.RuntimeService.
Expand All @@ -77,21 +81,8 @@ func (r *FakeRuntimeService) ListEvents(ctx context.Context, req *iri.ListEvents
defer r.Unlock()
machineEvents := []*iri.MachineEvents{}

for _, m := range r.Machines {
machineEvent := &iri.MachineEvents{
InvolvedObjectMeta: &v1alpha1.ObjectMetadata{
Id: m.Metadata.Id,
Labels: m.Metadata.Labels,
},
Events: []*iri.Event{{
Spec: &iri.EventSpec{
Reason: "testing",
Message: "this is test event",
Type: "Normal",
EventTime: time.Now().Unix(),
}}},
}
machineEvents = append(machineEvents, machineEvent)
for _, m := range r.MachineEvents {
machineEvents = append(machineEvents, &m.MachineEvents)
}

return &iri.ListEventsResponse{MachineEvents: machineEvents}, nil
Expand All @@ -101,6 +92,7 @@ func NewFakeRuntimeService() *FakeRuntimeService {
return &FakeRuntimeService{
Machines: make(map[string]*FakeMachine),
MachineClassStatus: make(map[string]*FakeMachineClassStatus),
MachineEvents: make(map[string]*FakeMachineEvents),
}
}

Expand Down Expand Up @@ -131,6 +123,13 @@ func (r *FakeRuntimeService) SetGetExecURL(f func(req *iri.ExecRequest) string)
r.GetExecURL = f
}

func (r *FakeRuntimeService) SetEvents(machineId string, events *FakeMachineEvents) {
r.Lock()
defer r.Unlock()

r.MachineEvents[machineId] = events
}

func (r *FakeRuntimeService) Version(ctx context.Context, req *iri.VersionRequest) (*iri.VersionResponse, error) {
return &iri.VersionResponse{
RuntimeName: FakeRuntimeName,
Expand Down
56 changes: 11 additions & 45 deletions poollet/machinepoollet/mem/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@ package mem
import (
"context"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
"github.com/gogo/protobuf/proto"
computev1alpha1 "github.com/ironcore-dev/ironcore/api/compute/v1alpha1"
"github.com/ironcore-dev/ironcore/iri/apis/machine"
iri "github.com/ironcore-dev/ironcore/iri/apis/machine/v1alpha1"
"github.com/ironcore-dev/ironcore/poollet/machinepoollet/api/v1alpha1"
"golang.org/x/exp/maps"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -28,54 +25,33 @@ import (
type Generic struct {
record.EventRecorder
client.Client
mu sync.RWMutex

sync bool
synced chan struct{}

machineEventsByID map[string]*iri.MachineEvents

machineRuntime machine.RuntimeService

relistPeriod time.Duration
}

func isNewEventsPresent(oldMachineEventsByID map[string]*iri.MachineEvents, machineEvent *iri.MachineEvents, machineID string) bool {
oldMachineEvent, ok := oldMachineEventsByID[machineID]
if !ok {
return true
}
return !proto.Equal(machineEvent, oldMachineEvent)
lastFetched time.Time
}

func (g *Generic) relist(ctx context.Context, log logr.Logger) error {
log.V(1).Info("Relisting machine cluster events")
toEventFilterTime := time.Now()
fromEventFilterTime := toEventFilterTime.Add(-1 * g.relistPeriod)
res, err := g.machineRuntime.ListEvents(ctx, &iri.ListEventsRequest{
Filter: &iri.EventFilter{EventsFromTime: fromEventFilterTime.Unix(), EventsToTime: toEventFilterTime.Add(-1 * g.relistPeriod).Unix()},
Filter: &iri.EventFilter{EventsFromTime: g.lastFetched.Unix(), EventsToTime: toEventFilterTime.Unix()},
})
if err != nil {
return fmt.Errorf("error listing machine cluster events: %w", err)
}

g.mu.Lock()
defer g.mu.Unlock()

oldMachineEventsByID := maps.Clone(g.machineEventsByID)
g.lastFetched = toEventFilterTime

maps.Clear(g.machineEventsByID)

var shouldPublishEvents bool
for _, machineEvent := range res.MachineEvents {
if machine, err := g.getMachine(ctx, machineEvent.InvolvedObjectMeta.GetLabels()); err == nil {
shouldPublishEvents = isNewEventsPresent(oldMachineEventsByID, machineEvent, string(machine.GetUID()))
if shouldPublishEvents {
for _, event := range machineEvent.Events {
g.Eventf(machine, event.Spec.Type, event.Spec.Reason, event.Spec.Message)
}
for _, event := range machineEvent.Events {
g.Eventf(machine, event.Spec.Type, event.Spec.Reason, event.Spec.Message)
}
g.machineEventsByID[string(machine.GetUID())] = machineEvent
}
}

Expand All @@ -101,6 +77,7 @@ func (g *Generic) getMachine(ctx context.Context, labels map[string]string) (*co

func (g *Generic) Start(ctx context.Context) error {
log := ctrl.LoggerFrom(ctx).WithName("mem")
g.lastFetched = time.Now()
wait.UntilWithContext(ctx, func(ctx context.Context) {
if err := g.relist(ctx, log); err != nil {
log.Error(err, "Error relisting")
Expand All @@ -109,16 +86,6 @@ func (g *Generic) Start(ctx context.Context) error {
return nil
}

func (g *Generic) GetMachineEventFor(ctx context.Context, machineID string) ([]*iri.Event, error) {
g.mu.RLock()
defer g.mu.RUnlock()

if byName, ok := g.machineEventsByID[machineID]; ok {
return byName.Events, nil
}
return nil, ErrNoMatchingMachineEvents
}

func (g *Generic) WaitForSync(ctx context.Context) error {
select {
case <-g.synced:
Expand All @@ -141,11 +108,10 @@ func setGenericOptionsDefaults(o *GenericOptions) {
func NewGeneric(client client.Client, runtime machine.RuntimeService, recorder record.EventRecorder, opts GenericOptions) MachineEventMapper {
setGenericOptionsDefaults(&opts)
return &Generic{
synced: make(chan struct{}),
machineEventsByID: map[string]*iri.MachineEvents{},
Client: client,
machineRuntime: runtime,
relistPeriod: opts.RelistPeriod,
EventRecorder: recorder,
synced: make(chan struct{}),
Client: client,
machineRuntime: runtime,
relistPeriod: opts.RelistPeriod,
EventRecorder: recorder,
}
}
69 changes: 52 additions & 17 deletions poollet/machinepoollet/mem/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,31 @@ import (

computev1alpha1 "github.com/ironcore-dev/ironcore/api/compute/v1alpha1"
iri "github.com/ironcore-dev/ironcore/iri/apis/machine/v1alpha1"
"github.com/ironcore-dev/ironcore/iri/testing/machine"
"github.com/ironcore-dev/ironcore/iri/apis/meta/v1alpha1"
fakemachine "github.com/ironcore-dev/ironcore/iri/testing/machine"
"github.com/ironcore-dev/ironcore/poollet/machinepoollet/controllers"
"github.com/ironcore-dev/ironcore/poollet/machinepoollet/mcm"
"github.com/ironcore-dev/ironcore/poollet/machinepoollet/mem"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
metricserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
)

var _ = Describe("MachineController", func() {
var srv = &machine.FakeRuntimeService{}
var srv = &fakemachine.FakeRuntimeService{}
var machineEventMapper mem.MachineEventMapper
ns, mp, mc := SetupTest()

BeforeEach(func(ctx SpecContext) {
*srv = *machine.NewFakeRuntimeService()
srv.SetMachineClasses([]*machine.FakeMachineClassStatus{
*srv = *fakemachine.NewFakeRuntimeService()
srv.SetMachineClasses([]*fakemachine.FakeMachineClassStatus{
{
MachineClassStatus: iri.MachineClassStatus{
MachineClass: &iri.MachineClass{
Expand All @@ -58,7 +61,7 @@ var _ = Describe("MachineController", func() {
})
Expect(k8sManager.Add(machineClassMapper)).To(Succeed())

machineEventMapper = mem.NewGeneric(k8sManager.GetClient(), srv, &record.FakeRecorder{}, mem.GenericOptions{
machineEventMapper = mem.NewGeneric(k8sManager.GetClient(), srv, k8sManager.GetEventRecorderFor("test"), mem.GenericOptions{
RelistPeriod: 2 * time.Second,
})
Expect(k8sManager.Add(machineEventMapper)).To(Succeed())
Expand All @@ -67,8 +70,8 @@ var _ = Describe("MachineController", func() {
EventRecorder: &record.FakeRecorder{},
Client: k8sManager.GetClient(),
MachineRuntime: srv,
MachineRuntimeName: machine.FakeRuntimeName,
MachineRuntimeVersion: machine.FakeVersion,
MachineRuntimeName: fakemachine.FakeRuntimeName,
MachineRuntimeVersion: fakemachine.FakeVersion,
MachineClassMapper: machineClassMapper,
MachinePoolName: mp.Name,
DownwardAPILabels: map[string]string{
Expand Down Expand Up @@ -107,17 +110,49 @@ var _ = Describe("MachineController", func() {
HaveField("Machines", HaveLen(1)),
))

By("getting mchine events")
Eventually(func(g Gomega) []*iri.Event {
resp, err := machineEventMapper.GetMachineEventFor(ctx, string(machine.GetUID()))
By("setting an event for iri machine")
_, iriMachine := GetSingleMapEntry(srv.Machines)
machineEvent := &fakemachine.FakeMachineEvents{
MachineEvents: iri.MachineEvents{
InvolvedObjectMeta: &v1alpha1.ObjectMetadata{
Id: iriMachine.Metadata.Id,
Labels: iriMachine.Metadata.Labels,
},
Events: []*iri.Event{{
Spec: &iri.EventSpec{
Reason: "testing",
Message: "this is test event",
Type: "Normal",
EventTime: time.Now().Unix(),
}}},
},
}
srv.SetEvents(iriMachine.Metadata.Id, machineEvent)

By("validating event has been emitted for correct mchine")
machineEventList := &corev1.EventList{}
selectorField := fields.Set{}
selectorField["involvedObject.name"] = machine.GetName()
Eventually(func(g Gomega) []corev1.Event {
err := k8sClient.List(ctx, machineEventList,
client.InNamespace(ns.Name), client.MatchingFieldsSelector{Selector: selectorField.AsSelector()},
)
g.Expect(err).NotTo(HaveOccurred())
return resp
}).Should(ConsistOf(SatisfyAll(
HaveField("Spec", SatisfyAll(
HaveField("Reason", Equal("testing")),
HaveField("Message", Equal("this is test event")),
HaveField("Type", Equal(corev1.EventTypeNormal)),
)),
return machineEventList.Items
}).Should(ContainElement(SatisfyAll(
HaveField("Reason", Equal("testing")),
HaveField("Message", Equal("this is test event")),
HaveField("Type", Equal(corev1.EventTypeNormal)),
)))
})
})

func GetSingleMapEntry[K comparable, V any](m map[K]V) (K, V) {
if n := len(m); n != 1 {
Fail(fmt.Sprintf("Expected for map to have a single entry but got %d", n), 1)
}
for k, v := range m {
return k, v
}
panic("unreachable")
}
2 changes: 0 additions & 2 deletions poollet/machinepoollet/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@ import (
"context"
"errors"

iri "github.com/ironcore-dev/ironcore/iri/apis/machine/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

var ErrNoMatchingMachineEvents = errors.New("no matching machine events")

type MachineEventMapper interface {
manager.Runnable
GetMachineEventFor(ctx context.Context, machineID string) ([]*iri.Event, error)
WaitForSync(ctx context.Context) error
}

0 comments on commit bb44927

Please sign in to comment.