summaryrefslogtreecommitdiff
path: root/cmd/run.go
diff options
context:
space:
mode:
Diffstat (limited to 'cmd/run.go')
-rw-r--r--cmd/run.go353
1 files changed, 353 insertions, 0 deletions
diff --git a/cmd/run.go b/cmd/run.go
new file mode 100644
index 0000000..8016c8f
--- /dev/null
+++ b/cmd/run.go
@@ -0,0 +1,353 @@
+package main
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "strings"
+ "sync"
+ "text/template"
+ "time"
+
+ "github.com/argoproj-labs/argocd-image-updater/pkg/argocd"
+ "github.com/argoproj-labs/argocd-image-updater/pkg/common"
+ "github.com/argoproj-labs/argocd-image-updater/pkg/env"
+ "github.com/argoproj-labs/argocd-image-updater/pkg/health"
+ "github.com/argoproj-labs/argocd-image-updater/pkg/log"
+ "github.com/argoproj-labs/argocd-image-updater/pkg/metrics"
+ "github.com/argoproj-labs/argocd-image-updater/pkg/registry"
+ "github.com/argoproj-labs/argocd-image-updater/pkg/version"
+
+ "github.com/spf13/cobra"
+
+ "golang.org/x/sync/semaphore"
+)
+
+// newRunCommand implements "run" command
+func newRunCommand() *cobra.Command {
+ var cfg *ImageUpdaterConfig = &ImageUpdaterConfig{}
+ var once bool
+ var kubeConfig string
+ var disableKubernetes bool
+ var warmUpCache bool = true
+ var commitMessagePath string
+ var commitMessageTpl string
+ var runCmd = &cobra.Command{
+ Use: "run",
+ Short: "Runs the argocd-image-updater with a set of options",
+ RunE: func(cmd *cobra.Command, args []string) error {
+ if err := log.SetLogLevel(cfg.LogLevel); err != nil {
+ return err
+ }
+
+ if once {
+ cfg.CheckInterval = 0
+ cfg.HealthPort = 0
+ }
+
+ // Enforce sane --max-concurrency values
+ if cfg.MaxConcurrency < 1 {
+ return fmt.Errorf("--max-concurrency must be greater than 1")
+ }
+
+ log.Infof("%s %s starting [loglevel:%s, interval:%s, healthport:%s]",
+ version.BinaryName(),
+ version.Version(),
+ strings.ToUpper(cfg.LogLevel),
+ getPrintableInterval(cfg.CheckInterval),
+ getPrintableHealthPort(cfg.HealthPort),
+ )
+
+ // User can specify a path to a template used for Git commit messages
+ if commitMessagePath != "" {
+ tpl, err := ioutil.ReadFile(commitMessagePath)
+ if err != nil {
+ if errors.Is(err, os.ErrNotExist) {
+ log.Warnf("commit message template at %s does not exist, using default", commitMessagePath)
+ commitMessageTpl = common.DefaultGitCommitMessage
+ } else {
+ log.Fatalf("could not read commit message template: %v", err)
+ }
+ } else {
+ commitMessageTpl = string(tpl)
+ }
+ }
+
+ if commitMessageTpl == "" {
+ log.Infof("Using default Git commit messages")
+ commitMessageTpl = common.DefaultGitCommitMessage
+ }
+
+ if tpl, err := template.New("commitMessage").Parse(commitMessageTpl); err != nil {
+ log.Fatalf("could not parse commit message template: %v", err)
+ } else {
+ log.Debugf("Successfully parsed commit message template")
+ cfg.GitCommitMessage = tpl
+ }
+
+ // Load registries configuration early on. We do not consider it a fatal
+ // error when the file does not exist, but we emit a warning.
+ if cfg.RegistriesConf != "" {
+ st, err := os.Stat(cfg.RegistriesConf)
+ if err != nil || st.IsDir() {
+ log.Warnf("Registry configuration at %s could not be read: %v -- using default configuration", cfg.RegistriesConf, err)
+ } else {
+ err = registry.LoadRegistryConfiguration(cfg.RegistriesConf, false)
+ if err != nil {
+ log.Errorf("Could not load registry configuration from %s: %v", cfg.RegistriesConf, err)
+ return nil
+ }
+ }
+ }
+
+ if cfg.CheckInterval > 0 && cfg.CheckInterval < 60*time.Second {
+ log.Warnf("Check interval is very low - it is not recommended to run below 1m0s")
+ }
+
+ var err error
+ if !disableKubernetes {
+ ctx := context.Background()
+ cfg.KubeClient, err = getKubeConfig(ctx, cfg.ArgocdNamespace, kubeConfig)
+ if err != nil {
+ log.Fatalf("could not create K8s client: %v", err)
+ }
+ if cfg.ClientOpts.ServerAddr == "" {
+ cfg.ClientOpts.ServerAddr = fmt.Sprintf("argocd-server.%s", cfg.KubeClient.Namespace)
+ }
+ }
+ if cfg.ClientOpts.ServerAddr == "" {
+ cfg.ClientOpts.ServerAddr = defaultArgoCDServerAddr
+ }
+
+ if token := os.Getenv("ARGOCD_TOKEN"); token != "" && cfg.ClientOpts.AuthToken == "" {
+ log.Debugf("Using ArgoCD API credentials from environment ARGOCD_TOKEN")
+ cfg.ClientOpts.AuthToken = token
+ }
+
+ log.Infof("ArgoCD configuration: [apiKind=%s, server=%s, auth_token=%v, insecure=%v, grpc_web=%v, plaintext=%v]",
+ cfg.ApplicationsAPIKind,
+ cfg.ClientOpts.ServerAddr,
+ cfg.ClientOpts.AuthToken != "",
+ cfg.ClientOpts.Insecure,
+ cfg.ClientOpts.GRPCWeb,
+ cfg.ClientOpts.Plaintext,
+ )
+
+ // Health server will start in a go routine and run asynchronously
+ var hsErrCh chan error
+ var msErrCh chan error
+ if cfg.HealthPort > 0 {
+ log.Infof("Starting health probe server TCP port=%d", cfg.HealthPort)
+ hsErrCh = health.StartHealthServer(cfg.HealthPort)
+ }
+
+ if cfg.MetricsPort > 0 {
+ log.Infof("Starting metrics server on TCP port=%d", cfg.MetricsPort)
+ msErrCh = metrics.StartMetricsServer(cfg.MetricsPort)
+ }
+
+ if warmUpCache {
+ err := warmupImageCache(cfg)
+ if err != nil {
+ log.Errorf("Error warming up cache: %v", err)
+ return err
+ }
+ }
+
+ // This is our main loop. We leave it only when our health probe server
+ // returns an error.
+ for {
+ select {
+ case err := <-hsErrCh:
+ if err != nil {
+ log.Errorf("Health probe server exited with error: %v", err)
+ } else {
+ log.Infof("Health probe server exited gracefully")
+ }
+ return nil
+ case err := <-msErrCh:
+ if err != nil {
+ log.Errorf("Metrics server exited with error: %v", err)
+ } else {
+ log.Infof("Metrics server exited gracefully")
+ }
+ return nil
+ default:
+ if lastRun.IsZero() || time.Since(lastRun) > cfg.CheckInterval {
+ result, err := runImageUpdater(cfg, false)
+ if err != nil {
+ log.Errorf("Error: %v", err)
+ } else {
+ log.Infof("Processing results: applications=%d images_considered=%d images_skipped=%d images_updated=%d errors=%d",
+ result.NumApplicationsProcessed,
+ result.NumImagesConsidered,
+ result.NumSkipped,
+ result.NumImagesUpdated,
+ result.NumErrors)
+ }
+ lastRun = time.Now()
+ }
+ }
+ if cfg.CheckInterval == 0 {
+ break
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ log.Infof("Finished.")
+ return nil
+ },
+ }
+
+ runCmd.Flags().StringVar(&cfg.ApplicationsAPIKind, "applications-api", env.GetStringVal("APPLICATIONS_API", applicationsAPIKindK8S), "API kind that is used to manage Argo CD applications ('kubernetes' or 'argocd')")
+ runCmd.Flags().StringVar(&cfg.ClientOpts.ServerAddr, "argocd-server-addr", env.GetStringVal("ARGOCD_SERVER", ""), "address of ArgoCD API server")
+ runCmd.Flags().BoolVar(&cfg.ClientOpts.GRPCWeb, "argocd-grpc-web", env.GetBoolVal("ARGOCD_GRPC_WEB", false), "use grpc-web for connection to ArgoCD")
+ runCmd.Flags().BoolVar(&cfg.ClientOpts.Insecure, "argocd-insecure", env.GetBoolVal("ARGOCD_INSECURE", false), "(INSECURE) ignore invalid TLS certs for ArgoCD server")
+ runCmd.Flags().BoolVar(&cfg.ClientOpts.Plaintext, "argocd-plaintext", env.GetBoolVal("ARGOCD_PLAINTEXT", false), "(INSECURE) connect without TLS to ArgoCD server")
+ runCmd.Flags().StringVar(&cfg.ClientOpts.AuthToken, "argocd-auth-token", "", "use token for authenticating to ArgoCD (unsafe - consider setting ARGOCD_TOKEN env var instead)")
+ runCmd.Flags().BoolVar(&cfg.DryRun, "dry-run", false, "run in dry-run mode. If set to true, do not perform any changes")
+ runCmd.Flags().DurationVar(&cfg.CheckInterval, "interval", 2*time.Minute, "interval for how often to check for updates")
+ runCmd.Flags().StringVar(&cfg.LogLevel, "loglevel", env.GetStringVal("IMAGE_UPDATER_LOGLEVEL", "info"), "set the loglevel to one of trace|debug|info|warn|error")
+ runCmd.Flags().StringVar(&kubeConfig, "kubeconfig", "", "full path to kubernetes client configuration, i.e. ~/.kube/config")
+ runCmd.Flags().IntVar(&cfg.HealthPort, "health-port", 8080, "port to start the health server on, 0 to disable")
+ runCmd.Flags().IntVar(&cfg.MetricsPort, "metrics-port", 8081, "port to start the metrics server on, 0 to disable")
+ runCmd.Flags().BoolVar(&once, "once", false, "run only once, same as specifying --interval=0 and --health-port=0")
+ runCmd.Flags().StringVar(&cfg.RegistriesConf, "registries-conf-path", defaultRegistriesConfPath, "path to registries configuration file")
+ runCmd.Flags().BoolVar(&disableKubernetes, "disable-kubernetes", false, "do not create and use a Kubernetes client")
+ runCmd.Flags().IntVar(&cfg.MaxConcurrency, "max-concurrency", 10, "maximum number of update threads to run concurrently")
+ runCmd.Flags().StringVar(&cfg.ArgocdNamespace, "argocd-namespace", "", "namespace where ArgoCD runs in (current namespace by default)")
+ runCmd.Flags().StringSliceVar(&cfg.AppNamePatterns, "match-application-name", nil, "patterns to match application name against")
+ runCmd.Flags().BoolVar(&warmUpCache, "warmup-cache", true, "whether to perform a cache warm-up on startup")
+ runCmd.Flags().StringVar(&cfg.GitCommitUser, "git-commit-user", env.GetStringVal("GIT_COMMIT_USER", "argocd-image-updater"), "Username to use for Git commits")
+ runCmd.Flags().StringVar(&cfg.GitCommitMail, "git-commit-email", env.GetStringVal("GIT_COMMIT_EMAIL", "noreply@argoproj.io"), "E-Mail address to use for Git commits")
+ runCmd.Flags().StringVar(&commitMessagePath, "git-commit-message-path", defaultCommitTemplatePath, "Path to a template to use for Git commit messages")
+ runCmd.Flags().BoolVar(&cfg.DisableKubeEvents, "disable-kube-events", env.GetBoolVal("IMAGE_UPDATER_KUBE_EVENTS", false), "Disable kubernetes events")
+
+ return runCmd
+}
+
+// Main loop for argocd-image-controller
+func runImageUpdater(cfg *ImageUpdaterConfig, warmUp bool) (argocd.ImageUpdaterResult, error) {
+ result := argocd.ImageUpdaterResult{}
+ var err error
+ var argoClient argocd.ArgoCD
+ switch cfg.ApplicationsAPIKind {
+ case applicationsAPIKindK8S:
+ argoClient, err = argocd.NewK8SClient(cfg.KubeClient)
+ case applicationsAPIKindArgoCD:
+ argoClient, err = argocd.NewAPIClient(&cfg.ClientOpts)
+ default:
+ return argocd.ImageUpdaterResult{}, fmt.Errorf("application api '%s' is not supported", cfg.ApplicationsAPIKind)
+ }
+ if err != nil {
+ return result, err
+ }
+ cfg.ArgoClient = argoClient
+
+ apps, err := cfg.ArgoClient.ListApplications()
+ if err != nil {
+ log.WithContext().
+ AddField("argocd_server", cfg.ClientOpts.ServerAddr).
+ AddField("grpc_web", cfg.ClientOpts.GRPCWeb).
+ AddField("grpc_webroot", cfg.ClientOpts.GRPCWebRootPath).
+ AddField("plaintext", cfg.ClientOpts.Plaintext).
+ AddField("insecure", cfg.ClientOpts.Insecure).
+ Errorf("error while communicating with ArgoCD")
+ return result, err
+ }
+
+ // Get the list of applications that are allowed for updates, that is, those
+ // applications which have correct annotation.
+ appList, err := argocd.FilterApplicationsForUpdate(apps, cfg.AppNamePatterns)
+ if err != nil {
+ return result, err
+ }
+
+ metrics.Applications().SetNumberOfApplications(len(appList))
+
+ if !warmUp {
+ log.Infof("Starting image update cycle, considering %d annotated application(s) for update", len(appList))
+ }
+
+ syncState := argocd.NewSyncIterationState()
+
+ // Allow a maximum of MaxConcurrency number of goroutines to exist at the
+ // same time. If in warm-up mode, set to 1 explicitly.
+ var concurrency int = cfg.MaxConcurrency
+ if warmUp {
+ concurrency = 1
+ }
+ var dryRun bool = cfg.DryRun
+ if warmUp {
+ dryRun = true
+ }
+ sem := semaphore.NewWeighted(int64(concurrency))
+
+ var wg sync.WaitGroup
+ wg.Add(len(appList))
+
+ for app, curApplication := range appList {
+ lockErr := sem.Acquire(context.TODO(), 1)
+ if lockErr != nil {
+ log.Errorf("Could not acquire semaphore for application %s: %v", app, lockErr)
+ // Release entry in wait group on error, too - we're never gonna execute
+ wg.Done()
+ continue
+ }
+
+ go func(app string, curApplication argocd.ApplicationImages) {
+ defer sem.Release(1)
+ log.Debugf("Processing application %s", app)
+ upconf := &argocd.UpdateConfiguration{
+ NewRegFN: registry.NewClient,
+ ArgoClient: cfg.ArgoClient,
+ KubeClient: cfg.KubeClient,
+ UpdateApp: &curApplication,
+ DryRun: dryRun,
+ GitCommitUser: cfg.GitCommitUser,
+ GitCommitEmail: cfg.GitCommitMail,
+ GitCommitMessage: cfg.GitCommitMessage,
+ DisableKubeEvents: cfg.DisableKubeEvents,
+ }
+ res := argocd.UpdateApplication(upconf, syncState)
+ result.NumApplicationsProcessed += 1
+ result.NumErrors += res.NumErrors
+ result.NumImagesConsidered += res.NumImagesConsidered
+ result.NumImagesUpdated += res.NumImagesUpdated
+ result.NumSkipped += res.NumSkipped
+ if !warmUp && !cfg.DryRun {
+ metrics.Applications().IncreaseImageUpdate(app, res.NumImagesUpdated)
+ }
+ metrics.Applications().IncreaseUpdateErrors(app, res.NumErrors)
+ metrics.Applications().SetNumberOfImagesWatched(app, res.NumImagesConsidered)
+ wg.Done()
+ }(app, curApplication)
+ }
+
+ // Wait for all goroutines to finish
+ wg.Wait()
+
+ return result, nil
+}
+
+// warmupImageCache performs a cache warm-up, which is basically one cycle of
+// the image update process with dryRun set to true and a maximum concurrency
+// of 1, i.e. sequential processing.
+func warmupImageCache(cfg *ImageUpdaterConfig) error {
+ log.Infof("Warming up image cache")
+ _, err := runImageUpdater(cfg, true)
+ if err != nil {
+ return nil
+ }
+ entries := 0
+ eps := registry.ConfiguredEndpoints()
+ for _, ep := range eps {
+ r, err := registry.GetRegistryEndpoint(ep)
+ if err == nil {
+ entries += r.Cache.NumEntries()
+ }
+ }
+ log.Infof("Finished cache warm-up, pre-loaded %d meta data entries from %d registries", entries, len(eps))
+ return nil
+}