Skip to content

Commit

Permalink
Add leader election to starting multi-project-controller
Browse files Browse the repository at this point in the history
  • Loading branch information
panslava committed Jan 15, 2025
1 parent afb7f1f commit fd75ee7
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 7 deletions.
17 changes: 11 additions & 6 deletions cmd/glbc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,19 @@ func main() {
systemHealth := systemhealth.NewSystemHealth(rootLogger)
go app.RunHTTPServer(systemHealth.HealthCheck, rootLogger)

hostname, err := os.Hostname()
if err != nil {
klog.Fatalf("unable to get hostname: %v", err)
}

if flags.F.EnableMultiProjectMode {
rootLogger.Info("Multi-project mode is enabled, starting project-syncer")

runWithWg(func() {
multiprojectstart.Start(
err := multiprojectstart.StartWithLeaderElection(
context.Background(),
leaderElectKubeClient,
hostname,
kubeConfig,
rootLogger,
kubeClient,
Expand All @@ -249,6 +256,9 @@ func main() {
namer,
stopCh,
)
if err != nil {
rootLogger.Error(err, "Failed to start multi-project syncer")
}
}, rOption.wg)

// Wait for the multi-project syncer to finish.
Expand Down Expand Up @@ -297,11 +307,6 @@ func main() {
}
ctx := ingctx.NewControllerContext(kubeClient, backendConfigClient, frontendConfigClient, firewallCRClient, svcNegClient, svcAttachmentClient, networkClient, nodeTopologyClient, eventRecorderKubeClient, cloud, namer, kubeSystemUID, ctxConfig, rootLogger)

hostname, err := os.Hostname()
if err != nil {
klog.Fatalf("unable to get hostname: %v", err)
}

leOption := leaderElectionOption{
client: leaderElectKubeClient,
recorder: ctx.Recorder(flags.F.LeaderElection.LockObjectNamespace),
Expand Down
80 changes: 79 additions & 1 deletion pkg/multiproject/start/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package start
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"os"

"k8s.io/apimachinery/pkg/types"
informers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/ingress-gce/cmd/glbc/app"
ingresscontext "k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/flags"
Expand All @@ -18,13 +22,87 @@ import (
"k8s.io/ingress-gce/pkg/neg/syncers/labels"
providerconfigclient "k8s.io/ingress-gce/pkg/providerconfig/client/clientset/versioned"
providerconfiginformers "k8s.io/ingress-gce/pkg/providerconfig/client/informers/externalversions"
"k8s.io/ingress-gce/pkg/recorders"
svcnegclient "k8s.io/ingress-gce/pkg/svcneg/client/clientset/versioned"
"k8s.io/ingress-gce/pkg/utils/namer"
"k8s.io/klog/v2"
)

func Start(
const multiProjectLeaderElectionLockName = "ingress-gce-multi-project-lock"

func StartWithLeaderElection(
ctx context.Context,
leaderElectKubeClient kubernetes.Interface,
hostname string,
kubeConfig *rest.Config,
logger klog.Logger,
kubeClient kubernetes.Interface,
svcNegClient svcnegclient.Interface,
kubeSystemUID types.UID,
eventRecorderKubeClient kubernetes.Interface,
rootNamer *namer.Namer,
stopCh <-chan struct{},
) error {
recordersManager := recorders.NewManager(eventRecorderKubeClient, logger)

leConfig, err := makeLeaderElectionConfig(leaderElectKubeClient, hostname, recordersManager, kubeConfig, logger, kubeClient, svcNegClient, kubeSystemUID, eventRecorderKubeClient, rootNamer, stopCh)
if err != nil {
return err
}

leaderelection.RunOrDie(ctx, *leConfig)
logger.Info("Multi-project controller exited.")

return nil
}

func makeLeaderElectionConfig(
leaderElectKubeClient kubernetes.Interface,
hostname string,
recordersManager *recorders.Manager,
kubeConfig *rest.Config,
logger klog.Logger,
kubeClient kubernetes.Interface,
svcNegClient svcnegclient.Interface,
kubeSystemUID types.UID,
eventRecorderKubeClient kubernetes.Interface,
rootNamer *namer.Namer,
stopCh <-chan struct{},
) (*leaderelection.LeaderElectionConfig, error) {
recorder := recordersManager.Recorder(flags.F.LeaderElection.LockObjectNamespace)
// add a uniquifier so that two processes on the same host don't accidentally both become active
id := fmt.Sprintf("%v_%x", hostname, rand.Intn(1e6))

rl, err := resourcelock.New(resourcelock.LeasesResourceLock,
flags.F.LeaderElection.LockObjectNamespace,
multiProjectLeaderElectionLockName,
leaderElectKubeClient.CoreV1(),
leaderElectKubeClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
})
if err != nil {
return nil, fmt.Errorf("couldn't create resource lock: %v", err)
}

return &leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: flags.F.LeaderElection.LeaseDuration.Duration,
RenewDeadline: flags.F.LeaderElection.RenewDeadline.Duration,
RetryPeriod: flags.F.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(context.Context) {
start(kubeConfig, logger, kubeClient, svcNegClient, kubeSystemUID, eventRecorderKubeClient, rootNamer, stopCh)
},
OnStoppedLeading: func() {
logger.Info("Stop running multi-project leader election")
},
},
}, nil
}

func start(
kubeConfig *rest.Config,
logger klog.Logger,
kubeClient kubernetes.Interface,
Expand Down

0 comments on commit fd75ee7

Please sign in to comment.