Merge "Modify applier to use event from constructor"

This commit is contained in:
Zuul 2020-08-21 15:50:54 +00:00 committed by Gerrit Code Review
commit 6db611c1dd
5 changed files with 99 additions and 88 deletions

View File

@ -52,13 +52,14 @@ type Applier struct {
Streams genericclioptions.IOStreams
Poller poller.Poller
ManifestReaderFactory utils.ManifestReaderFactory
eventChannel chan events.Event
}
// ReaderFactory function that returns reader factory interface
type ReaderFactory func(validate bool, bundle document.Bundle, factory cmdutil.Factory) manifestreader.ManifestReader
// NewApplier returns instance of Applier
func NewApplier(f cmdutil.Factory, streams genericclioptions.IOStreams) *Applier {
func NewApplier(eventCh chan events.Event, f cmdutil.Factory, streams genericclioptions.IOStreams) *Applier {
return &Applier{
Factory: f,
Streams: streams,
@ -66,76 +67,68 @@ func NewApplier(f cmdutil.Factory, streams genericclioptions.IOStreams) *Applier
Driver: &Adaptor{
CliUtilsApplier: cliapply.NewApplier(f, streams),
},
eventChannel: eventCh,
}
}
// ApplyBundle apply bundle to kubernetes cluster
func (a *Applier) ApplyBundle(bundle document.Bundle, ao ApplyOptions) <-chan events.Event {
eventCh := make(chan events.Event)
go func() {
defer close(eventCh)
if bundle == nil {
// TODO add this to errors
handleError(eventCh, ErrApplyNilBundle{})
return
func (a *Applier) ApplyBundle(bundle document.Bundle, ao ApplyOptions) {
defer close(a.eventChannel)
log.Debugf("Getting infos for bundle, inventory id is %s", ao.BundleName)
infos, err := a.getInfos(ao.BundleName, bundle)
if err != nil {
handleError(a.eventChannel, err)
return
}
ctx := context.Background()
ch := a.Driver.Run(ctx, infos, cliApplyOptions(ao))
for e := range ch {
a.eventChannel <- events.Event{
Type: events.ApplierType,
ApplierEvent: e,
}
log.Printf("Applying bundle, inventory id: %s", ao.BundleName)
// TODO Get this selector from document package instead
// Selector to filter invenotry document from bundle
selector := document.
NewSelector().
ByLabel(clicommon.InventoryLabel).
ByKind(document.ConfigMapKind)
// if we could find exactly one inventory document, we don't do anything else with it
_, err := bundle.SelectOne(selector)
// if we got an error, which means we could not find Config Map with invetory ID at rest
// now we need to generate and inject one at runtime
if err != nil && errors.As(err, &document.ErrDocNotFound{}) {
log.Debug("Inventory Object config Map not found, auto generating Invetory object")
invDoc, innerErr := NewInventoryDocument(ao.BundleName)
if innerErr != nil {
// this should never happen
log.Debug("Failed to create new invetory document")
handleError(eventCh, innerErr)
return
}
log.Debugf("Injecting Invetory Object: %v into bundle", invDoc)
innerErr = bundle.Append(invDoc)
if innerErr != nil {
log.Debug("Couldn't append bunlde with inventory document")
handleError(eventCh, innerErr)
return
}
log.Debugf("Making sure that inventory object namespace %s exists", invDoc.GetNamespace())
innerErr = a.ensureNamespaceExists(invDoc.GetNamespace())
if innerErr != nil {
handleError(eventCh, innerErr)
return
}
} else if err != nil {
handleError(eventCh, err)
return
}
}
func (a *Applier) getInfos(bundleName string, bundle document.Bundle) ([]*resource.Info, error) {
if bundle == nil {
return nil, ErrApplyNilBundle{}
}
selector := document.
NewSelector().
ByLabel(clicommon.InventoryLabel).
ByKind(document.ConfigMapKind)
// if we could find exactly one inventory document, we don't do anything else with it
_, err := bundle.SelectOne(selector)
// if we got an error, which means we could not find Config Map with invetory ID at rest
// now we need to generate and inject one at runtime
if err != nil && errors.As(err, &document.ErrDocNotFound{}) {
log.Debug("Inventory Object config Map not found, auto generating Invetory object")
invDoc, innerErr := NewInventoryDocument(bundleName)
if innerErr != nil {
// this should never happen
log.Debug("Failed to create new invetory document")
return nil, innerErr
}
err = a.Driver.Initialize(a.Poller)
if err != nil {
handleError(eventCh, err)
return
log.Debugf("Injecting Invetory Object: %v into bundle", invDoc)
innerErr = bundle.Append(invDoc)
if innerErr != nil {
log.Debug("Couldn't append bunlde with inventory document")
return nil, innerErr
}
ctx := context.Background()
infos, err := a.ManifestReaderFactory(false, bundle, a.Factory).Read()
if err != nil {
handleError(eventCh, err)
return
log.Debugf("Making sure that inventory object namespace %s exists", invDoc.GetNamespace())
innerErr = a.ensureNamespaceExists(invDoc.GetNamespace())
if innerErr != nil {
return nil, innerErr
}
ch := a.Driver.Run(ctx, infos, cliApplyOptions(ao))
for e := range ch {
eventCh <- events.Event{
Type: events.ApplierType,
ApplierEvent: e,
}
}
}()
return eventCh
} else if err != nil {
return nil, err
}
if err = a.Driver.Initialize(a.Poller); err != nil {
return nil, err
}
return a.ManifestReaderFactory(false, bundle, a.Factory).Read()
}
func (a *Applier) ensureNamespaceExists(name string) error {

View File

@ -36,11 +36,13 @@ import (
)
func TestFakeApplier(t *testing.T) {
a := applier.NewFakeApplier(genericclioptions.IOStreams{
In: os.Stdin,
Out: os.Stdout,
ErrOut: os.Stderr,
}, k8stest.SuccessEvents(), k8stest.FakeFactory(t, []k8stest.ClientHandler{}))
ch := make(chan events.Event)
a := applier.NewFakeApplier(ch,
genericclioptions.IOStreams{
In: os.Stdin,
Out: os.Stdout,
ErrOut: os.Stderr,
}, k8stest.SuccessEvents(), k8stest.FakeFactory(t, []k8stest.ClientHandler{}))
assert.NotNil(t, a)
}
@ -120,7 +122,8 @@ func TestApplierRun(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
// create default applier
a := applier.NewApplier(f, s)
eventChan := make(chan events.Event)
a := applier.NewApplier(eventChan, f, s)
opts := applier.ApplyOptions{
WaitTimeout: time.Second * 5,
BundleName: "test-bundle",
@ -132,9 +135,10 @@ func TestApplierRun(t *testing.T) {
if tt.poller != nil {
a.Poller = tt.poller
}
ch := a.ApplyBundle(tt.bundle, opts)
// start writing to channel
go a.ApplyBundle(tt.bundle, opts)
var airEvents []events.Event
for e := range ch {
for e := range eventChan {
airEvents = append(airEvents, e)
}
var errs []error

View File

@ -28,6 +28,7 @@ import (
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/object"
"opendev.org/airship/airshipctl/pkg/events"
"opendev.org/airship/airshipctl/pkg/k8s/utils"
)
@ -75,12 +76,16 @@ func (fa FakeAdaptor) WithInitError(err error) FakeAdaptor {
}
// NewFakeApplier returns applier with events you want
func NewFakeApplier(streams genericclioptions.IOStreams, events []applyevent.Event, f cmdutil.Factory) *Applier {
func NewFakeApplier(
eventCh chan events.Event,
streams genericclioptions.IOStreams,
events []applyevent.Event, f cmdutil.Factory) *Applier {
return &Applier{
Driver: NewFakeAdaptor().WithEvents(events),
Poller: &FakePoller{},
Factory: f,
ManifestReaderFactory: utils.DefaultManifestReaderFactory,
eventChannel: eventCh,
}
}

View File

@ -30,21 +30,23 @@ import (
// Options is an abstraction used to apply the phase
type Options struct {
RootSettings *environment.AirshipCTLSettings
Applier *applier.Applier
Processor events.EventProcessor
WaitTimeout time.Duration
DryRun bool
Prune bool
PhaseName string
WaitTimeout time.Duration
RootSettings *environment.AirshipCTLSettings
Applier *applier.Applier
Processor events.EventProcessor
EventChannel chan events.Event
}
// Initialize Options with required field, such as Applier
func (o *Options) Initialize() {
f := utils.FactoryFromKubeConfigPath(o.RootSettings.KubeConfigPath)
streams := utils.Streams()
o.Applier = applier.NewApplier(f, streams)
o.EventChannel = make(chan events.Event)
o.Applier = applier.NewApplier(o.EventChannel, f, streams)
o.Processor = events.NewDefaultProcessor(streams)
}
@ -87,6 +89,6 @@ func (o *Options) Run() error {
if err != nil {
return err
}
ch := o.Applier.ApplyBundle(bundle, ao)
return o.Processor.Process(ch)
go o.Applier.ApplyBundle(bundle, ao)
return o.Processor.Process(o.EventChannel)
}

View File

@ -23,10 +23,12 @@ import (
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
applyevent "sigs.k8s.io/cli-utils/pkg/apply/event"
"opendev.org/airship/airshipctl/pkg/config"
"opendev.org/airship/airshipctl/pkg/document"
"opendev.org/airship/airshipctl/pkg/environment"
"opendev.org/airship/airshipctl/pkg/events"
"opendev.org/airship/airshipctl/pkg/k8s/applier"
"opendev.org/airship/airshipctl/pkg/phase/apply"
"opendev.org/airship/airshipctl/testutil"
@ -59,18 +61,14 @@ func TestDeploy(t *testing.T) {
tests := []struct {
name string
expectedErrorString string
cliApplier *applier.Applier
clusterPurposes map[string]*config.ClusterPurpose
phaseName string
events []applyevent.Event
}{
{
name: "success",
expectedErrorString: "",
cliApplier: applier.NewFakeApplier(genericclioptions.IOStreams{
In: os.Stdin,
Out: os.Stdout,
ErrOut: os.Stderr,
}, k8sutils.SuccessEvents(), f),
events: k8sutils.SuccessEvents(),
},
{
name: "missing clusters",
@ -94,8 +92,17 @@ func TestDeploy(t *testing.T) {
ao.Initialize()
ao.PhaseName = "initinfra"
ao.DryRun = true
if tt.cliApplier != nil {
ao.Applier = tt.cliApplier
if tt.events != nil {
ch := make(chan events.Event)
cliApplier := applier.NewFakeApplier(
ch,
genericclioptions.IOStreams{
In: os.Stdin,
Out: os.Stdout,
ErrOut: os.Stderr,
}, k8sutils.SuccessEvents(), f)
ao.Applier = cliApplier
ao.EventChannel = ch
}
if tt.clusterPurposes != nil {
ao.RootSettings.Config.Clusters = tt.clusterPurposes