Change executor interface to accept channels

Now Run method will accept channels that they will write
events to, this will allow us to pass channels we want to them,
instead of execting them to return us a channel.

Wait method is removed from interface for now, as it needs more
design.

Change-Id: Ibd47dfe49172f537b79bcc8f83f7c76aece8e862
This commit is contained in:
Kostiantyn Kalynovskyi 2020-08-17 23:34:35 -05:00
parent efc46c2203
commit 2f1b520ab2
3 changed files with 62 additions and 27 deletions

View File

@ -58,6 +58,5 @@ func NewRunCommand(rootSettings *environment.AirshipCTLSettings) *cobra.Command
"dry-run",
false,
"simulate phase execution")
// TODO add kubeconfig flags when https://review.opendev.org/#/c/744382 is merged
return runCmd
}

View File

@ -26,10 +26,9 @@ import (
// Executor interface should be implemented by each runner
type Executor interface {
Run(RunOptions) <-chan events.Event
Run(<-chan events.Event, RunOptions)
Render(io.Writer, RenderOptions) error
Validate() error
Wait(WaitOptions) <-chan events.Event
}
// RunOptions holds options for run method
@ -49,14 +48,15 @@ type WaitOptions struct {
}
// ExecutorFactory for executor instantiation
// First argument is document object which represents executor
// configuration.
// Second argument is document bundle used by executor.
// Third argument airship configuration settings since each phase
// has to be aware of execution context and global settings
type ExecutorFactory func(
document.Document,
document.Bundle,
*environment.AirshipCTLSettings,
kubeconfig.Interface,
) (Executor, error)
type ExecutorFactory func(config ExecutorConfig) (Executor, error)
// ExecutorConfig container to store all executor options
type ExecutorConfig struct {
PhaseName string
ClusterName string
ExecutorDocument document.Document
ExecutorBundle document.Bundle
AirshipSettings *environment.AirshipCTLSettings
KubeConfig kubeconfig.Interface
}

View File

@ -21,11 +21,15 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
airshipv1 "opendev.org/airship/airshipctl/pkg/api/v1alpha1"
"opendev.org/airship/airshipctl/pkg/config"
"opendev.org/airship/airshipctl/pkg/document"
"opendev.org/airship/airshipctl/pkg/environment"
"opendev.org/airship/airshipctl/pkg/events"
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
k8sutils "opendev.org/airship/airshipctl/pkg/k8s/utils"
"opendev.org/airship/airshipctl/pkg/log"
"opendev.org/airship/airshipctl/pkg/phase/ifc"
"opendev.org/airship/airshipctl/pkg/util"
)
// ExecutorRegistry returns map with executor factories
@ -109,7 +113,7 @@ func (p *Cmd) GetExecutor(phase *airshipv1.Phase) (ifc.Executor, error) {
ByGvk(refGVK.Group, refGVK.Version, refGVK.Kind).
ByName(phaseConfig.ExecutorRef.Name).
ByNamespace(phaseConfig.ExecutorRef.Namespace)
doc, err := bundle.SelectOne(selector)
executorDoc, err := bundle.SelectOne(selector)
if err != nil {
return nil, err
}
@ -131,21 +135,43 @@ func (p *Cmd) GetExecutor(phase *airshipv1.Phase) (ifc.Executor, error) {
if !found {
return nil, ErrExecutorNotFound{GVK: refGVK}
}
// When https://review.opendev.org/#/c/744382 add provider from there.
return executorFactory(doc, executorDocBundle, p.AirshipCTLSettings, nil)
kubeConfPath := p.AirshipCTLSettings.Config.KubeConfigPath()
homeDir := util.UserHomeDir()
workDir := filepath.Join(homeDir, config.AirshipConfigDir)
fs := document.NewDocumentFs()
source := kubeconfig.FromFile(kubeConfPath, fs)
fileOption := kubeconfig.InjectFilePath(kubeConfPath, fs)
tempRootOption := kubeconfig.InjectTempRoot(workDir)
kubeConfig := kubeconfig.NewKubeConfig(source, fileOption, tempRootOption)
// TODO add function to decide on how to build kubeconfig instead of hardcoding it here,
// when more kubeconfigs sources are available.
return executorFactory(ifc.ExecutorConfig{
ExecutorBundle: executorDocBundle,
PhaseName: phase.Name,
ExecutorDocument: executorDoc,
AirshipSettings: p.AirshipCTLSettings,
KubeConfig: kubeConfig,
})
}
// Exec particular phase
// Exec starts executor goroutine and processes the events
func (p *Cmd) Exec(name string) error {
executor, err := p.getPhaseExecutor(name)
if err != nil {
return err
}
ch := executor.Run(ifc.RunOptions{
Debug: p.AirshipCTLSettings.Debug,
DryRun: p.DryRun,
})
return p.Processor.Process(ch)
runCh := make(chan events.Event)
processor := events.NewDefaultProcessor(k8sutils.Streams())
go func() {
executor, err := p.getPhaseExecutor(name)
if err != nil {
handleError(err, runCh)
return
}
executor.Run(runCh, ifc.RunOptions{
Debug: p.Debug,
DryRun: p.DryRun,
})
}()
return processor.Process(runCh)
}
// Plan shows available phase names
@ -178,3 +204,13 @@ func (p *Cmd) Plan() (map[string][]string, error) {
}
return result, nil
}
func handleError(err error, ch chan events.Event) {
ch <- events.Event{
Type: events.ErrorType,
ErrorEvent: events.ErrorEvent{
Error: err,
},
}
close(ch)
}