Skip to content

Commit

Permalink
Create additional NEGs based on subnets in Node Topology CR.
Browse files Browse the repository at this point in the history
* Query Node Topology CR for the current set of NEGs in the cluster.
* When ensureNetworkEndpointGroups(), ensure NEGs are properly
  provisioned in the non-default subnets as well.
  • Loading branch information
sawsa307 committed Oct 23, 2024
1 parent 95f94ab commit 264bd6c
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2115,6 +2115,7 @@ func newTestNode(name string, unschedulable bool) *apiv1.Node {
Name: name,
},
Spec: apiv1.NodeSpec{
PodCIDR: "10.100.1.0/24",
Unschedulable: unschedulable,
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const (
negName1 = "neg1"

defaultTestSubnet = "default"
defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default"
defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/mock-project/regions/test-region/subnetworks/default"
)

func NewTestSyncerManager(kubeClient kubernetes.Interface) (*syncerManager, *gce.Cloud, *negtypes.TestContext) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/readiness/reflector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

const (
defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/proj/regions/us-central1/subnetworks/default"
defaultTestSubnetURL = "https://www.googleapis.com/compute/v1/projects/mock-project/regions/test-region/subnetworks/default"

defaultTestSubnet = "default"
nonDefaultTestSubnet = "non-default"
Expand Down
91 changes: 91 additions & 0 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"google.golang.org/api/googleapi"
apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -35,6 +36,7 @@ import (
"k8s.io/ingress-gce/pkg/network"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/ingress-gce/pkg/utils/endpointslices"
"k8s.io/ingress-gce/pkg/utils/namer"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -471,6 +473,13 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error {
}
}

if flags.F.EnableMultiSubnetClusterPhase1 {
negs, needToUpdate, errs := s.ensureNetworkEndpointGroupsFromNodeTopology(zones, negsByLocation)
negObjRefs = append(negObjRefs, negs...)
updateNEGStatus = updateNEGStatus && needToUpdate
errList = append(errList, errs...)
}

if updateNEGStatus {
s.updateInitStatus(negObjRefs, errList)
}
Expand All @@ -479,6 +488,88 @@ func (s *transactionSyncer) ensureNetworkEndpointGroups() error {
return utilerrors.NewAggregate(errList)
}

func (s *transactionSyncer) ensureNetworkEndpointGroupsFromNodeTopology(zones []string, negsByLocation map[string]int) ([]negv1beta1.NegObjectReference, bool, []error) {
var errList []error
var negObjRefs []negv1beta1.NegObjectReference

// Get subnets from NodeTopologyCRD.
subnetConfigs, err := s.zoneGetter.ListSubnets(s.logger)
if err != nil {
errList = append(errList, err)
return nil, true, errList
}

defaultSubnet, err := utils.KeyName(s.networkInfo.SubnetworkURL)
if err != nil {
s.logger.Error(err, "Errored getting default subnet from NetworkInfo")
errList = append(errList, err)
return nil, true, errList
}

for _, subnetConfig := range subnetConfigs {
// Skip default subnet since it has already been ensured.
if subnetConfig.Name == defaultSubnet {
continue
}

// Determine the NEG name for the non-default subnet NEGs.
negName := s.namer.NonDefaultSubnetNEG(s.NegSyncerKey.Namespace, s.NegSyncerKey.Name, subnetConfig.Name, s.NegSyncerKey.PortTuple.Port)
if s.customName {
if len(s.NegSyncerKey.NegName) > 56 {
s.logger.Error(err, "Unable to create custom NEGs in non-default subnet", "customNegName", s.NegSyncerKey.Name)
errList = append(errList, ErrCustomNEGNameTooLong)
continue
}
negName = fmt.Sprintf("%s-%s", s.NegSyncerKey.Name, namer.SubnetHash(subnetConfig.Name))
}

// Use a networkInfo with non-default subnet as subnetURL.
networkInfoInNonDefaultSubnet := s.networkInfo
resourceID, err := cloud.ParseResourceURL(subnetConfig.SubnetPath)
if err != nil {
s.logger.Error(err, "Failed to parse subnet path", "subnetPath", subnetConfig.SubnetPath)
errList = append(errList, err)
continue
}

// Add compute and version GA prefix.
networkInfoInNonDefaultSubnet.SubnetworkURL = cloud.SelfLink(meta.VersionGA, resourceID.ProjectID, "subnetworks", resourceID.Key)

for _, zone := range zones {
negObj, err := ensureNetworkEndpointGroup(
s.Namespace,
s.Name,
negName,
zone,
s.NegSyncerKey.String(),
s.kubeSystemUID,
fmt.Sprint(s.NegSyncerKey.PortTuple.Port),
s.NegSyncerKey.NegType,
s.cloud,
s.serviceLister,
s.recorder,
s.NegSyncerKey.GetAPIVersion(),
s.customName,
networkInfoInNonDefaultSubnet,
s.logger,
)
if err != nil {
errList = append(errList, err)
if errors.Is(err, utils.ErrNEGUsedByAnotherSyncer) {
return nil, false, errList
}
}

if s.svcNegClient != nil && err == nil {
negObjRefs = append(negObjRefs, negObj)
negsByLocation[zone]++
}
}
}
return negObjRefs, true, errList

}

// syncNetworkEndpoints spins off go routines to execute NEG operations
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet, endpointPodLabelMap labels.EndpointPodLabelMap, migrationZone negtypes.EndpointGroupInfo) error {
syncFunc := func(endpointMap map[negtypes.EndpointGroupInfo]negtypes.NetworkEndpointSet, operation transactionOp) error {
Expand Down
147 changes: 147 additions & 0 deletions pkg/neg/syncers/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"testing"
"time"

nodetopologyv1 "github.com/GoogleCloudPlatform/gke-networking-api/apis/nodetopology/v1"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -1338,6 +1339,152 @@ func TestTransactionSyncerWithNegCR(t *testing.T) {
}
}

func TestEnsureNetworkEndpointGroupsFromNodeTopology(t *testing.T) {
zones := []string{negtypes.TestZone1, negtypes.TestZone2, negtypes.TestZone3}
testNetworkURL := cloud.SelfLink(meta.VersionGA, "mock-project", "networks", meta.GlobalKey(defaultTestSubnet))
testSubnetworkURL := cloud.SelfLink(meta.VersionGA, "mock-project", "subnetworks", meta.RegionalKey(defaultTestSubnet, "test-region"))
testNegType := negtypes.VmIpPortEndpointType
additionalTestSubnet := "additional-subnet"
additionalTestSubnetworkURL := cloud.SelfLink(meta.VersionGA, "mock-project", "subnetworks", meta.RegionalKey(additionalTestSubnet, "test-region"))

nodeTopologyCrWithDefaultSubnetOnly := nodetopologyv1.NodeTopology{
TypeMeta: metav1.TypeMeta{
Kind: "NodeTopology",
APIVersion: "networking.gke.io/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "default",
},
Status: nodetopologyv1.NodeTopologyStatus{
Subnets: []nodetopologyv1.SubnetConfig{
{Name: defaultTestSubnet, SubnetPath: fmt.Sprintf("projects/mock-project/regions/test-region/subnetworks/%s", defaultTestSubnet)},
},
},
}
nodeTopologyCrWithAdditionalSubnets := nodeTopologyCrWithDefaultSubnetOnly
nodeTopologyCrWithAdditionalSubnets.Status.Subnets = append(nodeTopologyCrWithAdditionalSubnets.Status.Subnets,
nodetopologyv1.SubnetConfig{
Name: additionalTestSubnet,
SubnetPath: fmt.Sprintf("projects/mock-project/regions/test-region/subnetworks/%s", additionalTestSubnet),
},
)

currNodeTopologyCRName := flags.F.NodeTopologyCRName
prevFlag := flags.F.EnableMultiSubnetClusterPhase1
defer func() {
flags.F.NodeTopologyCRName = currNodeTopologyCRName
flags.F.EnableMultiSubnetClusterPhase1 = prevFlag
}()
flags.F.NodeTopologyCRName = "default"
flags.F.EnableMultiSubnetClusterPhase1 = true

testCases := []struct {
desc string
customNEGName string
nodeTopologyCr *nodetopologyv1.NodeTopology
negDesc string
expectError bool
// expectNeedToUpdate indicates whether there is any conflicting NEG description.
// When there is conflict, we do not update NEG Object Ref.
expectNeedToUpdate bool
}{
{
desc: "NodeTopology CR doesn't exist",
expectError: true,
expectNeedToUpdate: true,
},
{
desc: "NodeTopology CR only contains default subnet",
nodeTopologyCr: &nodeTopologyCrWithDefaultSubnetOnly,
expectNeedToUpdate: true,
},
{
desc: "NodeTopology CR contains additional subnets, auto-generated NEG name",
nodeTopologyCr: &nodeTopologyCrWithAdditionalSubnets,
expectNeedToUpdate: true,
},
{
desc: "NodeTopology CR contains additional subnets, custom NEG name not exceeding character limit",
customNEGName: "custom-neg",
nodeTopologyCr: &nodeTopologyCrWithAdditionalSubnets,
expectError: false,
expectNeedToUpdate: true,
},
{
desc: "NodeTopology CR contains additional subnets, custom NEG name exceeding character limit",
customNEGName: "012345678901234567890123456789012345678901234567890123456", // 57 characters
nodeTopologyCr: &nodeTopologyCrWithAdditionalSubnets,
expectError: true,
expectNeedToUpdate: true,
},
{
desc: "NodeTopology CR contains additional subnets, conflicting NEG description",
nodeTopologyCr: &nodeTopologyCrWithAdditionalSubnets,
negDesc: utils.NegDescription{
ClusterUID: kubeSystemUID,
Namespace: testServiceNamespace,
ServiceName: testServiceName,
Port: "81", // Expected port to be 80
}.String(),
expectError: true,
expectNeedToUpdate: false,
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
fakeCloud := negtypes.NewFakeNetworkEndpointGroupCloud(testSubnetworkURL, testNetworkURL)

_, syncer := newTestTransactionSyncer(fakeCloud, testNegType, tc.customNEGName != "")
zonegetter.SetNodeTopologyHasSynced(syncer.zoneGetter, func() bool { return true })
if tc.customNEGName != "" {
syncer.NegSyncerKey.NegName = tc.customNEGName
}
if tc.negDesc != "" {
negName := syncer.namer.NonDefaultSubnetNEG(syncer.Namespace, syncer.Name, additionalTestSubnet, syncer.PortTuple.Port)
for _, zone := range zones {
err := fakeCloud.CreateNetworkEndpointGroup(&composite.NetworkEndpointGroup{
Version: syncer.NegSyncerKey.GetAPIVersion(),
Name: negName,
NetworkEndpointType: string(syncer.NegSyncerKey.NegType),
Network: fakeCloud.NetworkURL(),
Subnetwork: fakeCloud.SubnetworkURL(),
Description: tc.negDesc,
}, zone, klog.TODO())
if err != nil {
t.Fatalf("Failed to create NEG: %v", err)
}
}
}

negsByLocation := make(map[string]int)
if tc.nodeTopologyCr != nil {
if err := zonegetter.AddNodeTopologyCR(syncer.zoneGetter, tc.nodeTopologyCr); err != nil {
t.Fatalf("Failed to create Node Topology CR: %v", err)
}
}
negObjRefs, needToUpdate, errs := syncer.ensureNetworkEndpointGroupsFromNodeTopology(zones, negsByLocation)

if tc.expectError && len(errs) == 0 {
t.Errorf("Got no errors after ensureNetworkEndpointGroupsFromNodeTopology(), expected errors")
}
if !tc.expectError && len(errs) != 0 {
t.Errorf("Got errors %v after ensureNetworkEndpointGroupsFromNodeTopology(), expected no errors", errs)
}
if needToUpdate != tc.expectNeedToUpdate {
t.Errorf("Got needToUpdate = %v, expected %v", needToUpdate, tc.expectNeedToUpdate)
}
if needToUpdate {
for _, negObjRef := range negObjRefs {
if negObjRef.SubnetURL != additionalTestSubnetworkURL {
t.Errorf("Got subnetURL = %s for NEG %s, expected %s", negObjRef.SubnetURL, negObjRef.SelfLink, additionalTestSubnetworkURL)
}
}
}
})
}
}

// TestUpdateInitStatusWithMultiSubnetCluster iterates over different zone
// transition situation, and checks if NEG Object Reference in the corresponding
// zone has the expected State.
Expand Down
3 changes: 3 additions & 0 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import (
"k8s.io/klog/v2"
)

var ErrCustomNEGNameTooLong = fmt.Errorf("custom NEG name exceeds 56 characters limit")

const (
MAX_NETWORK_ENDPOINTS_PER_BATCH = 500
// For each NEG, only retries 15 times to process it.
Expand Down Expand Up @@ -148,6 +150,7 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService
ServiceName: svcName,
Port: port,
}
fmt.Printf("remove me: expectedDesc = %v \n", expectedDesc)
if customName && neg.Description == "" {
negLogger.Error(nil, "Found Neg with custom name but empty description")
return negv1beta1.NegObjectReference{}, fmt.Errorf("found a custom named neg %s with an empty description", negName)
Expand Down
2 changes: 1 addition & 1 deletion pkg/neg/types/fakes.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (f *FakeNetworkEndpointGroupCloud) SubnetworkURL() string {
}

func (f *FakeNetworkEndpointGroupCloud) NetworkProjectID() string {
return "test-network-project-id"
return "mock-project"
}

func (f *FakeNetworkEndpointGroupCloud) Region() string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/namer/l4_namer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (namer *L4Namer) L4NonDefaultSubnetNEG(namespace, name, subnetName string)
namer.v2Prefix,
namer.v2ClusterUID,
getTrimmedNamespacedName(namespace, name, maximumL4CombinedLength-subnetHashLength-1),
subnetHash(subnetName),
SubnetHash(subnetName),
namer.getClusterSuffix(namespace, name),
}, "-")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/utils/namer/namer.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (n *Namer) NEG(namespace, name string, port int32) string {
// subnets(e.g.: us-central1-subnet, us-central2-subnet).
func (n *Namer) NonDefaultSubnetNEG(namespace, name, subnetName string, port int32) string {
portStr := fmt.Sprintf("%v", port)
hashedSubnet := subnetHash(subnetName)
hashedSubnet := SubnetHash(subnetName)
truncFields := TrimFieldsEvenly(maxNEGDescriptiveLabel-subnetHashLength-1, namespace, name, portStr)
truncNamespace := truncFields[0]
truncName := truncFields[1]
Expand Down Expand Up @@ -511,8 +511,8 @@ func negSuffix(uid, namespace, name, port, subset string) string {
return negHash[:8]
}

// subnetHash returns hash code with 6 characters
func subnetHash(subnetName string) string {
// SubnetHash returns hash code with 6 characters
func SubnetHash(subnetName string) string {
subnetHash := fmt.Sprintf("%x", sha256.Sum256([]byte(subnetName)))
return subnetHash[:6]
}

0 comments on commit 264bd6c

Please sign in to comment.