Merge "Change executor interface to accept channels"

This commit is contained in:
Zuul 2020-08-18 16:02:08 +00:00 committed by Gerrit Code Review
commit 51e027be7d
3 changed files with 62 additions and 27 deletions

View File

@ -58,6 +58,5 @@ func NewRunCommand(rootSettings *environment.AirshipCTLSettings) *cobra.Command
"dry-run", "dry-run",
false, false,
"simulate phase execution") "simulate phase execution")
// TODO add kubeconfig flags when https://review.opendev.org/#/c/744382 is merged
return runCmd return runCmd
} }

View File

@ -26,10 +26,9 @@ import (
// Executor interface should be implemented by each runner // Executor interface should be implemented by each runner
type Executor interface { type Executor interface {
Run(RunOptions) <-chan events.Event Run(<-chan events.Event, RunOptions)
Render(io.Writer, RenderOptions) error Render(io.Writer, RenderOptions) error
Validate() error Validate() error
Wait(WaitOptions) <-chan events.Event
} }
// RunOptions holds options for run method // RunOptions holds options for run method
@ -49,14 +48,15 @@ type WaitOptions struct {
} }
// ExecutorFactory for executor instantiation // ExecutorFactory for executor instantiation
// First argument is document object which represents executor type ExecutorFactory func(config ExecutorConfig) (Executor, error)
// configuration.
// Second argument is document bundle used by executor. // ExecutorConfig container to store all executor options
// Third argument airship configuration settings since each phase type ExecutorConfig struct {
// has to be aware of execution context and global settings PhaseName string
type ExecutorFactory func( ClusterName string
document.Document,
document.Bundle, ExecutorDocument document.Document
*environment.AirshipCTLSettings, ExecutorBundle document.Bundle
kubeconfig.Interface, AirshipSettings *environment.AirshipCTLSettings
) (Executor, error) KubeConfig kubeconfig.Interface
}

View File

@ -21,11 +21,15 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
airshipv1 "opendev.org/airship/airshipctl/pkg/api/v1alpha1" airshipv1 "opendev.org/airship/airshipctl/pkg/api/v1alpha1"
"opendev.org/airship/airshipctl/pkg/config"
"opendev.org/airship/airshipctl/pkg/document" "opendev.org/airship/airshipctl/pkg/document"
"opendev.org/airship/airshipctl/pkg/environment" "opendev.org/airship/airshipctl/pkg/environment"
"opendev.org/airship/airshipctl/pkg/events" "opendev.org/airship/airshipctl/pkg/events"
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
k8sutils "opendev.org/airship/airshipctl/pkg/k8s/utils"
"opendev.org/airship/airshipctl/pkg/log" "opendev.org/airship/airshipctl/pkg/log"
"opendev.org/airship/airshipctl/pkg/phase/ifc" "opendev.org/airship/airshipctl/pkg/phase/ifc"
"opendev.org/airship/airshipctl/pkg/util"
) )
// ExecutorRegistry returns map with executor factories // ExecutorRegistry returns map with executor factories
@ -109,7 +113,7 @@ func (p *Cmd) GetExecutor(phase *airshipv1.Phase) (ifc.Executor, error) {
ByGvk(refGVK.Group, refGVK.Version, refGVK.Kind). ByGvk(refGVK.Group, refGVK.Version, refGVK.Kind).
ByName(phaseConfig.ExecutorRef.Name). ByName(phaseConfig.ExecutorRef.Name).
ByNamespace(phaseConfig.ExecutorRef.Namespace) ByNamespace(phaseConfig.ExecutorRef.Namespace)
doc, err := bundle.SelectOne(selector) executorDoc, err := bundle.SelectOne(selector)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -131,21 +135,43 @@ func (p *Cmd) GetExecutor(phase *airshipv1.Phase) (ifc.Executor, error) {
if !found { if !found {
return nil, ErrExecutorNotFound{GVK: refGVK} return nil, ErrExecutorNotFound{GVK: refGVK}
} }
// When https://review.opendev.org/#/c/744382 add provider from there.
return executorFactory(doc, executorDocBundle, p.AirshipCTLSettings, nil) kubeConfPath := p.AirshipCTLSettings.Config.KubeConfigPath()
homeDir := util.UserHomeDir()
workDir := filepath.Join(homeDir, config.AirshipConfigDir)
fs := document.NewDocumentFs()
source := kubeconfig.FromFile(kubeConfPath, fs)
fileOption := kubeconfig.InjectFilePath(kubeConfPath, fs)
tempRootOption := kubeconfig.InjectTempRoot(workDir)
kubeConfig := kubeconfig.NewKubeConfig(source, fileOption, tempRootOption)
// TODO add function to decide on how to build kubeconfig instead of hardcoding it here,
// when more kubeconfigs sources are available.
return executorFactory(ifc.ExecutorConfig{
ExecutorBundle: executorDocBundle,
PhaseName: phase.Name,
ExecutorDocument: executorDoc,
AirshipSettings: p.AirshipCTLSettings,
KubeConfig: kubeConfig,
})
} }
// Exec particular phase // Exec starts executor goroutine and processes the events
func (p *Cmd) Exec(name string) error { func (p *Cmd) Exec(name string) error {
runCh := make(chan events.Event)
processor := events.NewDefaultProcessor(k8sutils.Streams())
go func() {
executor, err := p.getPhaseExecutor(name) executor, err := p.getPhaseExecutor(name)
if err != nil { if err != nil {
return err handleError(err, runCh)
return
} }
ch := executor.Run(ifc.RunOptions{ executor.Run(runCh, ifc.RunOptions{
Debug: p.AirshipCTLSettings.Debug, Debug: p.Debug,
DryRun: p.DryRun, DryRun: p.DryRun,
}) })
return p.Processor.Process(ch) }()
return processor.Process(runCh)
} }
// Plan shows available phase names // Plan shows available phase names
@ -178,3 +204,13 @@ func (p *Cmd) Plan() (map[string][]string, error) {
} }
return result, nil return result, nil
} }
func handleError(err error, ch chan events.Event) {
ch <- events.Event{
Type: events.ErrorType,
ErrorEvent: events.ErrorEvent{
Error: err,
},
}
close(ch)
}