diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 6fb30b1d3d..c19fd23aaa 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -234,12 +234,19 @@ func main() { systemHealth := systemhealth.NewSystemHealth(rootLogger) go app.RunHTTPServer(systemHealth.HealthCheck, rootLogger) + hostname, err := os.Hostname() + if err != nil { + klog.Fatalf("unable to get hostname: %v", err) + } + if flags.F.EnableMultiProjectMode { rootLogger.Info("Multi-project mode is enabled, starting project-syncer") runWithWg(func() { - multiprojectstart.Start( + err := multiprojectstart.StartWithLeaderElection( context.Background(), + leaderElectKubeClient, + hostname, kubeConfig, rootLogger, kubeClient, @@ -249,6 +256,9 @@ func main() { namer, stopCh, ) + if err != nil { + rootLogger.Error(err, "Failed to start multi-project syncer") + } }, rOption.wg) // Wait for the multi-project syncer to finish. @@ -297,11 +307,6 @@ func main() { } ctx := ingctx.NewControllerContext(kubeClient, backendConfigClient, frontendConfigClient, firewallCRClient, svcNegClient, svcAttachmentClient, networkClient, nodeTopologyClient, eventRecorderKubeClient, cloud, namer, kubeSystemUID, ctxConfig, rootLogger) - hostname, err := os.Hostname() - if err != nil { - klog.Fatalf("unable to get hostname: %v", err) - } - leOption := leaderElectionOption{ client: leaderElectKubeClient, recorder: ctx.Recorder(flags.F.LeaderElection.LockObjectNamespace), diff --git a/pkg/multiproject/start/start.go b/pkg/multiproject/start/start.go index 92ee3bfe77..7eb9d12c6d 100644 --- a/pkg/multiproject/start/start.go +++ b/pkg/multiproject/start/start.go @@ -3,12 +3,16 @@ package start import ( "context" "encoding/json" + "fmt" + "math/rand" "os" "k8s.io/apimachinery/pkg/types" informers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/ingress-gce/cmd/glbc/app" ingresscontext "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/flags" @@ -18,13 +22,87 @@ import ( "k8s.io/ingress-gce/pkg/neg/syncers/labels" providerconfigclient "k8s.io/ingress-gce/pkg/providerconfig/client/clientset/versioned" providerconfiginformers "k8s.io/ingress-gce/pkg/providerconfig/client/informers/externalversions" + "k8s.io/ingress-gce/pkg/recorders" svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned" "k8s.io/ingress-gce/pkg/utils/namer" "k8s.io/klog/v2" ) -func Start( +const multiProjectLeaderElectionLockName = "ingress-gce-multi-project-lock" + +func StartWithLeaderElection( ctx context.Context, + leaderElectKubeClient kubernetes.Interface, + hostname string, + kubeConfig *rest.Config, + logger klog.Logger, + kubeClient kubernetes.Interface, + svcNegClient svcnegclient.Interface, + kubeSystemUID types.UID, + eventRecorderKubeClient kubernetes.Interface, + rootNamer *namer.Namer, + stopCh <-chan struct{}, +) error { + recordersManager := recorders.NewManager(eventRecorderKubeClient, logger) + + leConfig, err := makeLeaderElectionConfig(leaderElectKubeClient, hostname, recordersManager, kubeConfig, logger, kubeClient, svcNegClient, kubeSystemUID, eventRecorderKubeClient, rootNamer, stopCh) + if err != nil { + return err + } + + leaderelection.RunOrDie(ctx, *leConfig) + logger.Info("Multi-project controller exited.") + + return nil +} + +func makeLeaderElectionConfig( + leaderElectKubeClient kubernetes.Interface, + hostname string, + recordersManager *recorders.Manager, + kubeConfig *rest.Config, + logger klog.Logger, + kubeClient kubernetes.Interface, + svcNegClient svcnegclient.Interface, + kubeSystemUID types.UID, + eventRecorderKubeClient kubernetes.Interface, + rootNamer *namer.Namer, + stopCh <-chan struct{}, +) (*leaderelection.LeaderElectionConfig, error) { + recorder := recordersManager.Recorder(flags.F.LeaderElection.LockObjectNamespace) + // add a uniquifier so that two processes on the same host don't accidentally both become active + id := fmt.Sprintf("%v_%x", hostname, rand.Intn(1e6)) + + rl, err := resourcelock.New(resourcelock.LeasesResourceLock, + flags.F.LeaderElection.LockObjectNamespace, + multiProjectLeaderElectionLockName, + leaderElectKubeClient.CoreV1(), + leaderElectKubeClient.CoordinationV1(), + resourcelock.ResourceLockConfig{ + Identity: id, + EventRecorder: recorder, + }) + if err != nil { + return nil, fmt.Errorf("couldn't create resource lock: %v", err) + } + + return &leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: flags.F.LeaderElection.LeaseDuration.Duration, + RenewDeadline: flags.F.LeaderElection.RenewDeadline.Duration, + RetryPeriod: flags.F.LeaderElection.RetryPeriod.Duration, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(context.Context) { + start(kubeConfig, logger, kubeClient, svcNegClient, kubeSystemUID, eventRecorderKubeClient, rootNamer, stopCh) + }, + OnStoppedLeading: func() { + logger.Info("Stop running multi-project leader election") + }, + }, + }, nil +} + +func start( kubeConfig *rest.Config, logger klog.Logger, kubeClient kubernetes.Interface,