Merge "Add kubernetes apply executor"

This commit is contained in:
Zuul 2020-08-21 17:36:14 +00:00 committed by Gerrit Code Review
commit aedfafa9e5
8 changed files with 514 additions and 2 deletions

View File

@ -45,6 +45,7 @@ func init() {
&Phase{},
&PhasePlan{},
&KubeConfig{},
&KubernetesApply{},
)
_ = AddToScheme(Scheme) //nolint:errcheck
}

View File

@ -0,0 +1,46 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +kubebuilder:object:root=true
// KubernetesApply provides instructions on how to apply resources to kubernetes cluster
type KubernetesApply struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Config ApplyConfig `json:"config,omitempty"`
}
// ApplyConfig provides instructions on how to apply resources to kubernetes cluster
type ApplyConfig struct {
WaitOptions ApplyWaitOptions `json:"waitOptions,omitempty"`
PruneOptions ApplyPruneOptions `json:"pruneOptions,omitempty"`
}
// ApplyWaitOptions provides instructions how to wait for kubernetes resources
type ApplyWaitOptions struct {
// Timeout in seconds
Timeout int `json:"timeout,omitempty"`
}
// ApplyPruneOptions provides instructions how to prune for kubernetes resources
type ApplyPruneOptions struct {
Prune bool `json:"prune,omitempty"`
}

View File

@ -33,4 +33,8 @@ type Phase struct {
type PhaseConfig struct {
ExecutorRef *corev1.ObjectReference `json:"executorRef"`
DocumentEntryPoint string `json:"documentEntryPoint"`
// Name used to identify a cluster that the phase belongs to
ClusterName string `json:"clusterName"`
// ClusterNamespace identifies the namespace that the phase belongs to
ClusterNamespace string `json:"clusterNamespace"`
}

View File

@ -9,6 +9,53 @@ import (
"k8s.io/apimachinery/pkg/runtime"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ApplyConfig) DeepCopyInto(out *ApplyConfig) {
*out = *in
out.WaitOptions = in.WaitOptions
out.PruneOptions = in.PruneOptions
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApplyConfig.
func (in *ApplyConfig) DeepCopy() *ApplyConfig {
if in == nil {
return nil
}
out := new(ApplyConfig)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ApplyPruneOptions) DeepCopyInto(out *ApplyPruneOptions) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApplyPruneOptions.
func (in *ApplyPruneOptions) DeepCopy() *ApplyPruneOptions {
if in == nil {
return nil
}
out := new(ApplyPruneOptions)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ApplyWaitOptions) DeepCopyInto(out *ApplyWaitOptions) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApplyWaitOptions.
func (in *ApplyWaitOptions) DeepCopy() *ApplyWaitOptions {
if in == nil {
return nil
}
out := new(ApplyWaitOptions)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Clusterctl) DeepCopyInto(out *Clusterctl) {
*out = *in
@ -35,6 +82,13 @@ func (in *Clusterctl) DeepCopyInto(out *Clusterctl) {
*out = new(MoveOptions)
**out = **in
}
if in.AdditionalComponentVariables != nil {
in, out := &in.AdditionalComponentVariables, &out.AdditionalComponentVariables
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Clusterctl.
@ -116,6 +170,32 @@ func (in *KubeConfig) DeepCopyObject() runtime.Object {
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *KubernetesApply) DeepCopyInto(out *KubernetesApply) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
out.Config = in.Config
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KubernetesApply.
func (in *KubernetesApply) DeepCopy() *KubernetesApply {
if in == nil {
return nil
}
out := new(KubernetesApply)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *KubernetesApply) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *MoveOptions) DeepCopyInto(out *MoveOptions) {
*out = *in

View File

@ -94,7 +94,7 @@ func TestApplierRun(t *testing.T) {
},
{
name: "bundle failure",
expectedString: "Can not apply nil bundle, airship.kubernetes.Client",
expectedString: "Cannot apply nil bundle",
expectErr: true,
},
{

View File

@ -33,5 +33,5 @@ type ErrApplyNilBundle struct {
}
func (e ErrApplyNilBundle) Error() string {
return "Can not apply nil bundle, airship.kubernetes.Client"
return "Cannot apply nil bundle"
}

118
pkg/k8s/applier/executor.go Normal file
View File

@ -0,0 +1,118 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package applier
import (
"io"
"time"
"sigs.k8s.io/cli-utils/pkg/common"
airshipv1 "opendev.org/airship/airshipctl/pkg/api/v1alpha1"
"opendev.org/airship/airshipctl/pkg/config"
"opendev.org/airship/airshipctl/pkg/document"
"opendev.org/airship/airshipctl/pkg/errors"
"opendev.org/airship/airshipctl/pkg/events"
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
"opendev.org/airship/airshipctl/pkg/k8s/utils"
"opendev.org/airship/airshipctl/pkg/log"
"opendev.org/airship/airshipctl/pkg/phase/ifc"
)
// ExecutorOptions provide a way to configure executor
type ExecutorOptions struct {
BundleName string
ExecutorDocument document.Document
ExecutorBundle document.Bundle
Kubeconfig kubeconfig.Interface
AirshipConfig *config.Config
}
// Executor applies resources to kubernetes
type Executor struct {
Options ExecutorOptions
apiObject *airshipv1.KubernetesApply
cleanup kubeconfig.Cleanup
}
// NewExecutor returns instance of executor
func NewExecutor(opts ExecutorOptions) (*Executor, error) {
apiObj := &airshipv1.KubernetesApply{}
err := opts.ExecutorDocument.ToAPIObject(apiObj, airshipv1.Scheme)
if err != nil {
return nil, err
}
return &Executor{
Options: opts,
apiObject: apiObj,
}, nil
}
// Run executor, should be performed in separate go routine
func (e *Executor) Run(ch chan events.Event, runOpts ifc.RunOptions) {
applier, filteredBundle, err := e.prepareApplier(ch)
if err != nil {
handleError(ch, err)
close(ch)
return
}
defer e.cleanup()
dryRunStrategy := common.DryRunNone
if runOpts.DryRun {
dryRunStrategy = common.DryRunClient
}
applyOptions := ApplyOptions{
DryRunStrategy: dryRunStrategy,
Prune: e.apiObject.Config.PruneOptions.Prune,
BundleName: e.Options.BundleName,
WaitTimeout: time.Second * time.Duration(e.apiObject.Config.WaitOptions.Timeout),
}
applier.ApplyBundle(filteredBundle, applyOptions)
}
func (e *Executor) prepareApplier(ch chan events.Event) (*Applier, document.Bundle, error) {
log.Debug("Getting kubeconfig file information from kubeconfig provider")
path, cleanup, err := e.Options.Kubeconfig.GetFile()
if err != nil {
return nil, nil, err
}
if e.Options.ExecutorBundle == nil {
return nil, nil, ErrApplyNilBundle{}
}
log.Debug("Filtering out documents that shouldn't be applied to kubernetes from document bundle")
bundle, err := e.Options.ExecutorBundle.SelectBundle(document.NewDeployToK8sSelector())
if err != nil {
cleanup()
return nil, nil, err
}
// set up cleanup only if all calls up to here were successful
e.cleanup = cleanup
factory := utils.FactoryFromKubeConfigPath(path)
streams := utils.Streams()
return NewApplier(ch, factory, streams), bundle, nil
}
// Validate document set
func (e *Executor) Validate() error {
return errors.ErrNotImplemented{}
}
// Render document set
func (e *Executor) Render(w io.Writer, _ ifc.RenderOptions) error {
return e.Options.ExecutorBundle.Write(w)
}

View File

@ -0,0 +1,263 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package applier_test
import (
"bytes"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"opendev.org/airship/airshipctl/pkg/config"
"opendev.org/airship/airshipctl/pkg/document"
"opendev.org/airship/airshipctl/pkg/events"
"opendev.org/airship/airshipctl/pkg/k8s/applier"
"opendev.org/airship/airshipctl/pkg/k8s/kubeconfig"
"opendev.org/airship/airshipctl/pkg/k8s/utils"
"opendev.org/airship/airshipctl/pkg/phase/ifc"
"opendev.org/airship/airshipctl/testutil/fs"
)
const (
ValidExecutorDoc = `apiVersion: airshipit.org/v1alpha1
kind: KubernetesApply
metadata:
labels:
airshipit.org/deploy-k8s: "false"
name: kubernetes-apply
config:
waitOptions:
timeout: 600
pruneOptions:
prune: false
`
ValidExecutorDocNamespaced = `apiVersion: airshipit.org/v1alpha1
kind: KubernetesApply
metadata:
labels:
airshipit.org/deploy-k8s: "false"
name: kubernetes-apply-namespaced
namespace: bundle
config:
waitOptions:
timeout: 600
pruneOptions:
prune: false
`
testValidKubeconfig = `apiVersion: v1
clusters:
- cluster:
certificate-authority-data: ca-data
server: https://10.0.1.7:6443
name: kubernetes_target
contexts:
- context:
cluster: kubernetes_target
user: kubernetes-admin
name: kubernetes-admin@kubernetes
current-context: ""
kind: Config
preferences: {}
users:
- name: kubernetes-admin
user:
client-certificate-data: cert-data
client-key-data: client-keydata
`
)
func TestNewExecutor(t *testing.T) {
tests := []struct {
name string
cfgDoc string
expectedErr string
airConfig *config.Config
kubeconf kubeconfig.Interface
bundleFunc func(t *testing.T) document.Bundle
}{
{
name: "valid executor",
cfgDoc: ValidExecutorDoc,
kubeconf: testKubeconfig(testValidKubeconfig),
airConfig: makeDefaultConfig(),
bundleFunc: func(t *testing.T) document.Bundle {
return newBundle("testdata/source_bundle", t)
},
},
{
name: "wrong config document",
cfgDoc: `apiVersion: v1
kind: ConfigMap
metadata:
name: first-map
namespace: default
labels:
cli-utils.sigs.k8s.io/inventory-id: "some id"`,
expectedErr: "wrong config document",
airConfig: makeDefaultConfig(),
bundleFunc: func(t *testing.T) document.Bundle {
return newBundle("testdata/source_bundle", t)
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
doc, err := document.NewDocumentFromBytes([]byte(tt.cfgDoc))
require.NoError(t, err)
require.NotNil(t, doc)
exec, err := applier.NewExecutor(
applier.ExecutorOptions{
ExecutorDocument: doc,
ExecutorBundle: tt.bundleFunc(t),
Kubeconfig: tt.kubeconf,
AirshipConfig: tt.airConfig,
})
if tt.expectedErr != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), "")
assert.Nil(t, exec)
} else {
require.NoError(t, err)
require.NotNil(t, exec)
}
})
}
}
// TODO We need valid test that checks that actuall bundle has arrived to applier
// for that we need a way to inject fake applier, which is not doable with `black box` test currently
// since we tests are in different package from executor
func TestExecutorRun(t *testing.T) {
tests := []struct {
name string
containsErr string
kubeconf kubeconfig.Interface
execDoc document.Document
bundleFunc func(t *testing.T) document.Bundle
airConfig *config.Config
}{
{
name: "cant read kubeconfig error",
containsErr: "no such file or directory",
airConfig: makeDefaultConfig(),
bundleFunc: func(t *testing.T) document.Bundle {
return newBundle("testdata/source_bundle", t)
},
kubeconf: testKubeconfig(`invalid kubeconfig`),
execDoc: toKubernetesApply(t, ValidExecutorDocNamespaced),
},
{
name: "Nil bundle provided",
execDoc: toKubernetesApply(t, ValidExecutorDoc),
containsErr: "Cannot apply nil bundle",
kubeconf: testKubeconfig(testValidKubeconfig),
airConfig: makeDefaultConfig(),
bundleFunc: func(t *testing.T) document.Bundle {
return nil
},
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
exec, err := applier.NewExecutor(
applier.ExecutorOptions{
ExecutorDocument: tt.execDoc,
AirshipConfig: tt.airConfig,
ExecutorBundle: tt.bundleFunc(t),
Kubeconfig: tt.kubeconf,
})
require.NoError(t, err)
require.NotNil(t, exec)
ch := make(chan events.Event)
go exec.Run(ch, ifc.RunOptions{})
processor := events.NewDefaultProcessor(utils.Streams())
err = processor.Process(ch)
if tt.containsErr != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), tt.containsErr)
} else {
assert.NoError(t, err)
}
})
}
}
func TestRender(t *testing.T) {
execDoc, err := document.NewDocumentFromBytes([]byte(ValidExecutorDoc))
require.NoError(t, err)
require.NotNil(t, execDoc)
exec, err := applier.NewExecutor(applier.ExecutorOptions{
ExecutorBundle: newBundle("testdata/source_bundle", t),
ExecutorDocument: execDoc,
})
require.NoError(t, err)
require.NotNil(t, exec)
writerReader := bytes.NewBuffer([]byte{})
err = exec.Render(writerReader, ifc.RenderOptions{})
require.NoError(t, err)
result := writerReader.String()
assert.Contains(t, result, "ReplicationController")
}
func makeDefaultConfig() *config.Config {
conf := &config.Config{
CurrentContext: "default",
Contexts: map[string]*config.Context{
"default": {
Manifest: "default-manifest",
},
},
Manifests: map[string]*config.Manifest{
"default-manifest": {
MetadataPath: "metadata-path",
TargetPath: "testdata",
},
},
}
return conf
}
// toKubernetesApply converts string to document object
func toKubernetesApply(t *testing.T, s string) document.Document {
doc, err := document.NewDocumentFromBytes([]byte(s))
require.NoError(t, err)
require.NotNil(t, doc)
return doc
}
func testKubeconfig(stringData string) kubeconfig.Interface {
return kubeconfig.NewKubeConfig(
kubeconfig.FromByte([]byte(stringData)),
kubeconfig.InjectFileSystem(
fs.MockFileSystem{
MockTempFile: func(root, pattern string) (document.File, error) {
return fs.TestFile{
MockName: func() string { return "kubeconfig-142398" },
MockWrite: func() (int, error) { return 0, nil },
MockClose: func() error { return nil },
}, nil
},
MockRemoveAll: func() error { return nil },
},
))
}