Skip to content

Commit

Permalink
Fixed syncer async handling and copyright statements
Browse files Browse the repository at this point in the history
  • Loading branch information
nikolay-andreev committed Nov 20, 2024
1 parent 27acae3 commit 622a43b
Show file tree
Hide file tree
Showing 20 changed files with 301 additions and 298 deletions.
83 changes: 60 additions & 23 deletions cmd/syncer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os/signal"
"regexp"
"strings"
"sync"
"syscall"
"time"

Expand All @@ -34,6 +35,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
rl "k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/sample-controller/pkg/signals"

"sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/node"
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/config"
Expand Down Expand Up @@ -248,11 +250,19 @@ func initSyncerComponents(ctx context.Context, clusterFlavor cnstypes.CnsCluster
log := logger.GetLogger(ctx)
// Disconnect vCenter sessions on restart
defer func() {
log.Info("Cleaning up vc sessions syncer components")
if r := recover(); r != nil {
fmt.Printf("panic: %+v", r)
cleanupSessions(ctx, r)
}
}()

var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()

errChan := make(chan error, 4)
defer close(errChan)

if err := manager.InitCommonModules(ctx, clusterFlavor, coInitParams); err != nil {
log.Errorf("Error initializing common modules for all flavors. Error: %+v", err)
os.Exit(1)
Expand All @@ -274,9 +284,13 @@ func initSyncerComponents(ctx context.Context, clusterFlavor cnstypes.CnsCluster
}
}

wg := sync.WaitGroup{}

// Initialize CNS Operator for Supervisor clusters.
if clusterFlavor == cnstypes.CnsClusterFlavorWorkload {
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
log.Info("Cleaning up vc sessions storage pool service")
if r := recover(); r != nil {
Expand All @@ -290,6 +304,7 @@ func initSyncerComponents(ctx context.Context, clusterFlavor cnstypes.CnsCluster
}
}()
}

if clusterFlavor == cnstypes.CnsClusterFlavorVanilla {
// Initialize node manager so that syncer components can
// retrieve NodeVM using the NodeID.
Expand Down Expand Up @@ -346,50 +361,72 @@ func initSyncerComponents(ctx context.Context, clusterFlavor cnstypes.CnsCluster
}
}
}

wg.Add(1)
go func() {
defer wg.Done()
defer func() {
log.Info("Cleaning up vc sessions cns operator")
if r := recover(); r != nil {
cleanupSessions(ctx, r)
}
}()
if err := manager.InitCnsOperator(ctx, clusterFlavor, configInfo, coInitParams); err != nil {
log.Errorf("Error initializing Cns Operator. Error: %+v", err)
utils.LogoutAllvCenterSessions(ctx)
os.Exit(0)
errChan <- fmt.Errorf("failed to initialize CNS operator: %w", err)
}
}()
if commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.BYOK_FSS) &&
clusterFlavor == cnstypes.CnsClusterFlavorWorkload {
if clusterFlavor == cnstypes.CnsClusterFlavorWorkload &&
commonco.ContainerOrchestratorUtility.IsFSSEnabled(ctx, common.BYOK_FSS) {
// Start BYOK Operator for Supervisor clusters.
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
cleanupSessions(ctx, r)
}
}()
if err := startByokOperator(ctx, clusterFlavor, configInfo); err != nil {
log.Errorf("Error running BYOK Operator. Error: %+v", err)
utils.LogoutAllvCenterSessions(ctx)
os.Exit(0)
errChan <- fmt.Errorf("failed to run BYOK operator: %w", err)
}
}()
}
syncer.PeriodicSyncIntervalInMin = *periodicSyncIntervalInMin
if err := syncer.InitMetadataSyncer(ctx, clusterFlavor, configInfo); err != nil {
log.Errorf("Error initializing Metadata Syncer. Error: %+v", err)
utils.LogoutAllvCenterSessions(ctx)
os.Exit(0)

wg.Add(1)
go func() {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
cleanupSessions(ctx, r)
}
}()
syncer.PeriodicSyncIntervalInMin = *periodicSyncIntervalInMin
if err := syncer.InitMetadataSyncer(ctx, clusterFlavor, configInfo); err != nil {
errChan <- fmt.Errorf("failed to initialize Metadata Syncer: %w", err)
}
}()

defer func() {
utils.LogoutAllvCenterSessions(context.Background())
}()

defer func() {
log.Info("Terminating syncer components")
cancel()
wg.Wait()
}()

select {
case <-ctx.Done():
case <-signals.SetupSignalHandler().Done():
case err := <-errChan:
log.Error(err)
}
}
}

func startByokOperator(ctx context.Context,
clusterFlavor cnstypes.CnsClusterFlavor,
configInfo *config.ConfigurationInfo) error {
log := logger.GetLogger(ctx)

defer func() {
log.Info("Cleaning up vc sessions cns operator")
if r := recover(); r != nil {
cleanupSessions(ctx, r)
}
}()

mgr, err := byokoperator.NewManager(ctx, clusterFlavor, configInfo)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/stretchr/testify v1.9.0
github.com/vmware-tanzu/vm-operator/api v1.8.2
github.com/vmware-tanzu/vm-operator/external/byok v0.0.0-20241108223224-20f977201370
github.com/vmware/govmomi v0.47.0-alpha.0.0.20241111085955-321b6f0907a4
github.com/vmware/govmomi v0.46.1
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.26.0
golang.org/x/sync v0.8.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dougm/pretty v0.0.0-20160325215624-add1dbc86daf h1:A2XbJkAuMMFy/9EftoubSKBUIyiOm6Z8+X5G7QpS6so=
github.com/dougm/pretty v0.0.0-20160325215624-add1dbc86daf/go.mod h1:7NQ3kWOx2cZOSjtcveTa5nqupVr2s6/83sG+rTlI7uA=
github.com/dougm/pretty v0.0.0-20171025230240-2ee9d7453c02 h1:tR3jsKPiO/mb6ntzk/dJlHZtm37CPfVp1C9KIo534+4=
github.com/dougm/pretty v0.0.0-20171025230240-2ee9d7453c02/go.mod h1:7NQ3kWOx2cZOSjtcveTa5nqupVr2s6/83sG+rTlI7uA=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/emicklei/go-restful/v3 v3.8.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc=
Expand Down Expand Up @@ -658,8 +658,8 @@ github.com/vmware-tanzu/vm-operator/api v1.8.2 h1:7cZHVusqAmAMFWvsiU7X5xontxdjas
github.com/vmware-tanzu/vm-operator/api v1.8.2/go.mod h1:vauVboD3sQxP+pb28TnI9wfrj+0nH2zSEc9Q7AzWJ54=
github.com/vmware-tanzu/vm-operator/external/byok v0.0.0-20241108223224-20f977201370 h1:XYJD5bIAMJJ7n2sTpXd0BvIwzebXMo53zcNG3AT7Wqo=
github.com/vmware-tanzu/vm-operator/external/byok v0.0.0-20241108223224-20f977201370/go.mod h1:mAGA/R19pPESFFsIP98hu1fOmTu1hwTk8MikZJTSdH0=
github.com/vmware/govmomi v0.47.0-alpha.0.0.20241111085955-321b6f0907a4 h1:IT6au955Ii7EF4rosunq64Ifs+41uHw0U1QrRrezzTA=
github.com/vmware/govmomi v0.47.0-alpha.0.0.20241111085955-321b6f0907a4/go.mod h1:XEwh4Y9aXKcJq3wLnpVkWl+/t1pxTEFa4ETfSVWQwvo=
github.com/vmware/govmomi v0.46.1 h1:RBoIR/vlGBYn+t7I1LFLLbDQSoBn+xN6gPYYMbFZ+80=
github.com/vmware/govmomi v0.46.1/go.mod h1:uoLVU9zlXC4p4GmLVG+ZJmBC0Gn3Q7mytOJvi39OhxA=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
Expand Down
39 changes: 33 additions & 6 deletions pkg/common/cns-lib/crypto/client.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package crypto

import (
Expand All @@ -12,7 +28,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/cns-lib/crypto/internal"
cnsconfig "sigs.k8s.io/vsphere-csi-driver/v3/pkg/common/config"
Expand Down Expand Up @@ -44,7 +59,7 @@ func NewClientWithConfig(ctx context.Context, config *rest.Config) (Client, erro
return nil, err
}

k8sClient, err := ctrlclient.New(config, client.Options{
k8sClient, err := ctrlclient.New(config, ctrlclient.Options{
Scheme: scheme,
})
if err != nil {
Expand Down Expand Up @@ -177,7 +192,10 @@ func (c *defaultClient) MarkEncryptedStorageClass(
return c.Client.Patch(ctx, &obj, objPatch)
}

func (c *defaultClient) isEncryptedStorageClass(ctx context.Context, storageClass *storagev1.StorageClass) (bool, string, error) {
func (c *defaultClient) isEncryptedStorageClass(
ctx context.Context,
storageClass *storagev1.StorageClass,
) (bool, string, error) {
var (
obj corev1.ConfigMap
objKey = ctrlclient.ObjectKey{
Expand All @@ -200,7 +218,10 @@ func (c *defaultClient) isEncryptedStorageClass(ctx context.Context, storageClas
return false, "", nil
}

func (c *defaultClient) GetEncryptionClass(ctx context.Context, name, namespace string) (*byokv1.EncryptionClass, error) {
func (c *defaultClient) GetEncryptionClass(
ctx context.Context,
name, namespace string,
) (*byokv1.EncryptionClass, error) {
var obj byokv1.EncryptionClass
key := ctrlclient.ObjectKey{Namespace: namespace, Name: name}
err := c.Client.Get(ctx, key, &obj)
Expand All @@ -210,7 +231,10 @@ func (c *defaultClient) GetEncryptionClass(ctx context.Context, name, namespace
return &obj, nil
}

func (c *defaultClient) GetDefaultEncryptionClass(ctx context.Context, namespace string) (*byokv1.EncryptionClass, error) {
func (c *defaultClient) GetDefaultEncryptionClass(
ctx context.Context,
namespace string,
) (*byokv1.EncryptionClass, error) {
var list byokv1.EncryptionClassList
if err := c.Client.List(
ctx,
Expand All @@ -231,7 +255,10 @@ func (c *defaultClient) GetDefaultEncryptionClass(ctx context.Context, namespace
return &list.Items[0], nil
}

func (c *defaultClient) GetEncryptionClassForPVC(ctx context.Context, name, namespace string) (*byokv1.EncryptionClass, error) {
func (c *defaultClient) GetEncryptionClassForPVC(
ctx context.Context,
name, namespace string,
) (*byokv1.EncryptionClass, error) {
var pvc corev1.PersistentVolumeClaim
pvcKey := ctrlclient.ObjectKey{Namespace: namespace, Name: name}
if err := c.Client.Get(ctx, pvcKey, &pvc); err != nil {
Expand Down
16 changes: 16 additions & 0 deletions pkg/common/cns-lib/crypto/constants.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package crypto

import (
Expand Down
16 changes: 16 additions & 0 deletions pkg/common/cns-lib/crypto/internal/storageclass.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package internal

import (
Expand Down
16 changes: 16 additions & 0 deletions pkg/common/cns-lib/crypto/kubernetes.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package crypto

import (
Expand Down
16 changes: 16 additions & 0 deletions pkg/common/cns-lib/crypto/util.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package crypto

import (
Expand Down
3 changes: 1 addition & 2 deletions pkg/kubernetes/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/sample-controller/pkg/signals"

"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/logger"
)
Expand Down Expand Up @@ -77,7 +76,7 @@ func NewInformer(ctx context.Context, client clientset.Interface, inClusterClnt
if informerInstance == nil {
informerInstance = &InformerManager{
client: client,
stopCh: signals.SetupSignalHandler().Done(),
stopCh: ctx.Done(),
informerFactory: informers.NewSharedInformerFactory(client, noResyncPeriodFunc()),
}

Expand Down
Loading

0 comments on commit 622a43b

Please sign in to comment.