Adding loadbalancer service

This commit creates a secret which defines a haproxy.cfg and mounts
that secret to a newly created loadbalancer pod running with haproxy
image. A nodeport service is then created for the loadbalancer pod. The
loadbalancer is created to load balance the nodes defined as master.

Co-Authored-By: Francis Yi <francisyi19@gmail.com>
Co-Authored-By: Kostyantyn Kalynovskyi <kkalynovskyi@mirantis.com>
Change-Id: I91eb23892254ed2e7a68e13fe2b10489415d8589
This commit is contained in:
Rick Bartra 2020-12-08 15:17:57 -06:00 committed by Francis Bacon Yi
parent 952c27bd04
commit 59694518d2
17 changed files with 559 additions and 310 deletions

View File

@ -46,9 +46,7 @@ spec:
nodeInterfaceId: nodeInterfaceId:
type: string type: string
nodePorts: nodePorts:
items:
type: integer type: integer
type: array
nodelabels: nodelabels:
additionalProperties: additionalProperties:
type: string type: string

View File

@ -24,3 +24,16 @@ subjects:
- kind: ServiceAccount - kind: ServiceAccount
name: default name: default
namespace: sip-cluster-system namespace: sip-cluster-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: cluster-infra-service-rolebinding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: sipcluster-infra-service
subjects:
- kind: ServiceAccount
name: default
namespace: sip-cluster-system

View File

@ -50,6 +50,25 @@ rules:
- "" - ""
resources: resources:
- namespaces - namespaces
- pods
- secrets
verbs:
- create
- delete
- update
- get
- list
- watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: sipcluster-infra-service
rules:
- apiGroups:
- ""
resources:
- services
verbs: verbs:
- create - create
- delete - delete

View File

@ -24,13 +24,10 @@ spec:
loadbalancer: loadbalancer:
optional: optional:
clusterIP: 1.2.3.4 #<-- this aligns to the VIP IP for undercloud k8s clusterIP: 1.2.3.4 #<-- this aligns to the VIP IP for undercloud k8s
image: haproxy:foo image: haproxy:2.3.2
nodeLabels: nodeLabels:
- airship-masters - airship-masters
nodePorts: nodePort: 30000
- 7000
- 7001
- 7002
nodeInterfaceId: oam-ipv4 nodeInterfaceId: oam-ipv4
jumppod: jumppod:
optional: optional:
@ -38,13 +35,11 @@ spec:
image: sshpod:foo image: sshpod:foo
nodeLabels: nodeLabels:
- airship-masters - airship-masters
nodePorts: nodePort: 7022
- 7022
nodeInterfaceId: oam-ipv4 nodeInterfaceId: oam-ipv4
authpod: authpod:
image: sshpod:foo image: sshpod:foo
nodeLabels: nodeLabels:
- airship-masters - airship-masters
nodePorts: nodePort: 7023
- 7023
nodeInterfaceId: oam-ipv4 nodeInterfaceId: oam-ipv4

View File

@ -125,7 +125,7 @@ type InfraConfig struct {
OptionalData *OptsConfig `json:"optional,omitempty"` OptionalData *OptsConfig `json:"optional,omitempty"`
Image string `json:"image,omitempty"` Image string `json:"image,omitempty"`
NodeLabels map[string]string `json:"nodelabels,omitempty"` NodeLabels map[string]string `json:"nodelabels,omitempty"`
NodePorts []int `json:"nodePorts,omitempty"` NodePort int `json:"nodePort,omitempty"`
NodeInterface string `json:"nodeInterfaceId,omitempty"` NodeInterface string `json:"nodeInterfaceId,omitempty"`
} }

View File

@ -39,11 +39,6 @@ func (in *InfraConfig) DeepCopyInto(out *InfraConfig) {
(*out)[key] = val (*out)[key] = val
} }
} }
if in.NodePorts != nil {
in, out := &in.NodePorts, &out.NodePorts
*out = make([]int, len(*in))
copy(*out, *in)
}
} }
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InfraConfig. // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InfraConfig.

View File

@ -75,7 +75,7 @@ func (r *SIPClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err return ctrl.Result{}, err
} }
err = r.deployInfra(sip, machines) err = r.deployInfra(sip, machines, log)
if err != nil { if err != nil {
log.Error(err, "unable to deploy infrastructure services") log.Error(err, "unable to deploy infrastructure services")
return ctrl.Result{}, err return ctrl.Result{}, err
@ -187,23 +187,14 @@ func (r *SIPClusterReconciler) gatherVBMH(ctx context.Context, sip airshipv1.SIP
return machines, nil return machines, nil
} }
func (r *SIPClusterReconciler) deployInfra(sip airshipv1.SIPCluster, machines *airshipvms.MachineList) error { func (r *SIPClusterReconciler) deployInfra(sip airshipv1.SIPCluster, machines *airshipvms.MachineList, logger logr.Logger) error {
for sName, sConfig := range sip.Spec.InfraServices { if err := airshipsvc.CreateNS(sip.Spec.ClusterName, r.Client); err != nil {
// Instantiate
service, err := airshipsvc.NewService(sName, sConfig)
if err != nil {
return err return err
} }
newServiceSet := airshipsvc.NewServiceSet(logger, sip, machines, r.Client)
// Lets deploy the Service serviceList := newServiceSet.ServiceList()
err = service.Deploy(sip, machines, r.Client) for _, svc := range serviceList {
if err != nil { err := svc.Deploy()
return err
}
// Did it deploy correctly, letcs check
err = service.Validate()
if err != nil { if err != nil {
return err return err
} }
@ -225,19 +216,16 @@ Such as i'e what are we doing with the lables on the vBMH's
**/ **/
func (r *SIPClusterReconciler) finalize(ctx context.Context, sip airshipv1.SIPCluster) error { func (r *SIPClusterReconciler) finalize(ctx context.Context, sip airshipv1.SIPCluster) error {
logger := logr.FromContext(ctx) logger := logr.FromContext(ctx)
for sName, sConfig := range sip.Spec.InfraServices { machines := &airshipvms.MachineList{}
service, err := airshipsvc.NewService(sName, sConfig) serviceSet := airshipsvc.NewServiceSet(logger, sip, machines, r.Client)
if err != nil { serviceList := serviceSet.ServiceList()
return err for _, svc := range serviceList {
} err := svc.Finalize()
err = service.Finalize(sip, r.Client)
if err != nil { if err != nil {
return err return err
} }
} }
err := serviceSet.Finalize()
err := airshipsvc.FinalizeCommon(sip, r.Client)
if err != nil { if err != nil {
return err return err
} }
@ -246,7 +234,6 @@ func (r *SIPClusterReconciler) finalize(ctx context.Context, sip airshipv1.SIPCl
// 2- Let me now select the one's that meet the scheduling criteria // 2- Let me now select the one's that meet the scheduling criteria
// If I schedule successfully then // If I schedule successfully then
// If Not complete schedule , then throw an error. // If Not complete schedule , then throw an error.
machines := &airshipvms.MachineList{}
logger.Info("finalize sip machines", "machines", machines.String()) logger.Info("finalize sip machines", "machines", machines.String())
// Update the list of Machines. // Update the list of Machines.
err = machines.GetCluster(sip, r.Client) err = machines.GetCluster(sip, r.Client)

View File

@ -76,6 +76,7 @@ var _ = BeforeSuite(func(done Done) {
k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{ k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme, Scheme: scheme.Scheme,
MetricsBindAddress: "0",
}) })
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())

View File

@ -1,33 +0,0 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package services
import (
airshipv1 "sipcluster/pkg/api/v1"
)
type AuthHost struct {
Service
}
func newAuthHost(infraCfg airshipv1.InfraConfig) InfrastructureService {
authhost := &AuthHost{
Service: Service{
serviceName: airshipv1.AuthHostService,
config: infraCfg,
},
}
return authhost
}

View File

@ -1,111 +0,0 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package services
import (
"context"
"fmt"
airshipv1 "sipcluster/pkg/api/v1"
airshipvms "sipcluster/pkg/vbmh"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// Infrastructure interface should be implemented by each Tenant Required
// Infrastructure Service
// Init : prepares the Service
// Deploy : deploys the service
// Validate : will make sure that the deployment is successful
type InfrastructureService interface {
//
Deploy(airshipv1.SIPCluster, *airshipvms.MachineList, client.Client) error
Validate() error
Finalize(airshipv1.SIPCluster, client.Client) error
}
// Generic Service Factory
type Service struct {
serviceName airshipv1.InfraService
config airshipv1.InfraConfig
}
func (s *Service) Deploy(sip airshipv1.SIPCluster, machines *airshipvms.MachineList, c client.Client) error {
if err := s.createNS(sip.Spec.ClusterName, c); err != nil {
return err
}
// Take the data from the appropriate Machines
// Prepare the Config
fmt.Printf("Deploy Service:%v \n", s.serviceName)
return nil
}
func (s *Service) createNS(serviceNamespaceName string, c client.Client) error {
ns := &corev1.Namespace{}
key := client.ObjectKey{Name: serviceNamespaceName}
if err := c.Get(context.Background(), key, ns); err == nil {
// Namespace already exists
return nil
}
serviceNamespace := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: serviceNamespaceName,
},
}
return c.Create(context.TODO(), serviceNamespace)
}
func (s *Service) Validate() error {
fmt.Printf("Validate Service:%v \n", s.serviceName)
return nil
}
func (s *Service) Finalize(sip airshipv1.SIPCluster, c client.Client) error {
return nil
}
func FinalizeCommon(sip airshipv1.SIPCluster, c client.Client) error {
serviceNamespace := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: sip.Spec.ClusterName,
},
}
return c.Delete(context.TODO(), serviceNamespace)
}
// Service Factory
func NewService(infraName airshipv1.InfraService, infraCfg airshipv1.InfraConfig) (InfrastructureService, error) {
switch infraName {
case airshipv1.LoadBalancerService:
return newLoadBalancer(infraCfg), nil
case airshipv1.JumpHostService:
return newJumpHost(infraCfg), nil
case airshipv1.AuthHostService:
return newAuthHost(infraCfg), nil
}
return nil, ErrInfraServiceNotSupported{}
}

View File

@ -1,59 +0,0 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package services
import (
airshipv1 "sipcluster/pkg/api/v1"
)
type JumpHost struct {
Service
}
func newJumpHost(infraCfg airshipv1.InfraConfig) InfrastructureService {
return &JumpHost{
Service: Service{
serviceName: airshipv1.JumpHostService,
config: infraCfg,
},
}
}
/*
The SIP Cluster operator will manufacture a jump host pod specifically for this
tenant cluster. Much like we did above for master nodes by extracting IP
addresses, we would need to extract the `oam-ipv4` ip address for all nodes and
create a configmap to bind mount into the pod so it understands what host IPs
represent the clusters.
The expectation is the Jump Pod runs `sshd` protected by `uam` to allow
operators to SSH directly to the Jump Pod and authenticate via UAM to
immediately access their cluster.
It will provide the following functionality over SSH:
- The Jump Pod will be fronted by a `NodePort` service to allow incoming ssh.
- The Jump Pod will be UAM secured (for SSH)
- Bind mount in cluster-specific SSH key for cluster
- Ability to Power Cycle the cluster VMs
- A kubectl binary and kubeconfig (cluster-admin) for the cluster
- SSH access to the cluster node VMs
- Libvirt console logs for the VMs
- We will secure libvirt with tls and provide keys to every jump host
with curated interfaces to extract logs remotely for all VMs for their
clusters.
*/

View File

@ -15,71 +15,228 @@
package services package services
import ( import (
"fmt" "bytes"
"github.com/go-logr/logr"
"html/template"
"k8s.io/apimachinery/pkg/types"
airshipv1 "sipcluster/pkg/api/v1" airshipv1 "sipcluster/pkg/api/v1"
airshipvms "sipcluster/pkg/vbmh" airshipvms "sipcluster/pkg/vbmh"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
) )
type LoadBalancer struct { const (
Service // ConfigSecretName name of the haproxy config secret name/volume/mount
ConfigSecretName = "haproxy-config"
// DefaultBalancerImage is the image that will be used as load balancer
DefaultBalancerImage = "haproxy:2.3.2"
)
func (lb loadBalancer) Deploy() error {
if lb.config.Image == "" {
lb.config.Image = DefaultBalancerImage
}
if lb.config.NodePort < 30000 || lb.config.NodePort > 32767 {
lb.logger.Info("Either NodePort is not defined in the CR or NodePort is not in the required range of 30000-32767")
return nil
} }
func (l *LoadBalancer) Deploy(sip airshipv1.SIPCluster, machines *airshipvms.MachineList, c client.Client) error { pod, secret, err := lb.generatePodAndSecret()
// Take the data from the appropriate Machines
// Prepare the Config
err := l.Service.Deploy(sip, machines, c)
if err != nil { if err != nil {
return err return err
} }
return l.Prepare(sip, machines, c)
lb.logger.Info("Applying loadbalancer secret", "secret", secret.GetNamespace()+"/"+secret.GetName())
err = applyRuntimeObject(client.ObjectKey{Name: secret.GetName(), Namespace: secret.GetNamespace()}, secret, lb.client)
if err != nil {
return err
} }
func (l *LoadBalancer) Prepare(sip airshipv1.SIPCluster, machines *airshipvms.MachineList, c client.Client) error { lb.logger.Info("Applying loadbalancer pod", "pod", pod.GetNamespace()+"/"+pod.GetName())
fmt.Printf("%s.Prepare machines:%s \n", l.Service.serviceName, machines) err = applyRuntimeObject(client.ObjectKey{Name: pod.GetName(), Namespace: pod.GetNamespace()}, pod, lb.client)
for _, machine := range machines.Machines { if err != nil {
if machine.VMRole == airshipv1.VMMaster { return err
ip := machine.Data.IPOnInterface[sip.Spec.InfraServices[l.Service.serviceName].NodeInterface]
fmt.Printf("%s.Prepare for machine:%s ip is %s\n", l.Service.serviceName, machine, ip)
} }
lbService, err := lb.generateService()
if err != nil {
return err
}
lb.logger.Info("Applying loadbalancer service", "service", lbService.GetNamespace()+"/"+lbService.GetName())
err = applyRuntimeObject(client.ObjectKey{Name: lbService.GetName(), Namespace: lbService.GetNamespace()}, lbService, lb.client)
if err != nil {
return err
} }
return nil return nil
} }
func newLoadBalancer(infraCfg airshipv1.InfraConfig) InfrastructureService { func (lb loadBalancer) generatePodAndSecret() (*corev1.Pod, *corev1.Secret, error) {
return &LoadBalancer{ secret, err := lb.generateSecret()
Service: Service{ if err != nil {
serviceName: airshipv1.LoadBalancerService, return nil, nil, err
config: infraCfg, }
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: lb.sipName.Name + "-load-balancer",
Namespace: lb.sipName.Namespace,
Labels: map[string]string{"lb-name": lb.sipName.Namespace + "-haproxy"},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "balancer",
Image: lb.config.Image,
Ports: []corev1.ContainerPort{
{
Name: "http",
ContainerPort: 6443,
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: ConfigSecretName,
MountPath: "/usr/local/etc/haproxy",
},
},
},
},
Volumes: []corev1.Volume{
{
Name: ConfigSecretName,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secret.GetName(),
},
},
},
},
}, },
} }
return pod, secret, nil
} }
/* func (lb loadBalancer) generateSecret() (*corev1.Secret, error) {
p := proxy{
FrontPort: 6443,
Backends: make([]backend, 0),
}
for _, machine := range lb.machines.Machines {
if machine.VMRole == airshipv1.VMMaster {
name := machine.BMH.Name
namespace := machine.BMH.Namespace
ip, exists := machine.Data.IPOnInterface[lb.config.NodeInterface]
if !exists {
lb.logger.Info("Machine does not have backend interface to be forwarded to",
"interface", lb.config.NodeInterface,
"machine", namespace+"/"+name,
)
continue
}
p.Backends = append(p.Backends, backend{IP: ip, Name: machine.BMH.Name, Port: 6443})
}
}
secretData, err := generateTemplate(p)
if err != nil {
return nil, err
}
return &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: lb.sipName.Name + "-load-balancer",
Namespace: lb.sipName.Namespace,
},
Type: corev1.SecretTypeOpaque,
Data: map[string][]byte{
"haproxy.cfg": secretData,
},
}, nil
}
func (lb loadBalancer) generateService() (*corev1.Service, error) {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: lb.sipName.Name + "-load-balancer-service",
Namespace: lb.sipName.Namespace,
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "http",
Port: 6443,
NodePort: int32(lb.config.NodePort),
},
},
Selector: map[string]string{"lb-name": lb.sipName.Namespace + "-haproxy"},
Type: corev1.ServiceTypeNodePort,
},
}, nil
}
:::warning type proxy struct {
For the loadbalanced interface a **static asignment** via network data is FrontPort int
required. For now, we will not support updates to this field without manual Backends []backend
intervention. In other words, there is no expectation that the SIP operator }
watches `BareMetalHost` objects and reacts to changes in the future. The
expectation would instead to re-deliver the `SIPCluster` object to force a
no-op update to load balancer configuration is updated.
:::
type backend struct {
IP string
Name string
Port int
}
By extracting these IP address from the appropriate/defined interface for each type loadBalancer struct {
master node, we can build our loadbalancer service endpoint list to feed to client client.Client
haproxy. In other words, the SIP Cluster will now manufacture an haproxy sipName types.NamespacedName
configuration file that directs traffic to all IP endpoints found above over logger logr.Logger
port 6443. For example: config airshipv1.InfraConfig
machines *airshipvms.MachineList
}
func newLB(name, namespace string,
logger logr.Logger,
config airshipv1.InfraConfig,
machines *airshipvms.MachineList,
client client.Client) loadBalancer {
return loadBalancer{
sipName: types.NamespacedName{
Name: name,
Namespace: namespace,
},
logger: logger,
config: config,
machines: machines,
client: client,
}
}
``` gotpl func (lb loadBalancer) Finalize() error {
global // implete to delete loadbalancer
log /dev/stdout local0 return nil
log /dev/stdout local1 notice }
// Type type of the service
func (lb loadBalancer) Type() airshipv1.InfraService {
return airshipv1.LoadBalancerService
}
func generateTemplate(p proxy) ([]byte, error) {
tmpl, err := template.New("haproxy-config").Parse(defaultTemplate)
if err != nil {
return nil, err
}
w := bytes.NewBuffer([]byte{})
if err := tmpl.Execute(w, p); err != nil {
return nil, err
}
rendered := w.Bytes()
return rendered, nil
}
var defaultTemplate = `global
log stdout format raw local0
daemon daemon
defaults defaults
log global log global
@ -90,23 +247,11 @@ defaults
timeout client 50000 timeout client 50000
timeout server 50000 timeout server 50000
frontend control-plane frontend control-plane
bind *:6443 bind *:{{ .FrontPort }}
default_backend kube-apiservers default_backend kube-apiservers
backend kube-apiservers backend kube-apiservers
option httpchk GET /healthz option httpchk GET /healthz
{% for i in range(1, number_masters) %} {{- range .Backends }}
server {{ cluster_name }}-{{ i }} {{ vm_master_ip }}:6443 check check-ssl verify none {{- $backEnd := . }}
{% end %} server {{ $backEnd.Name }} {{ $backEnd.IP }}:{{ $backEnd.Port }} check check-ssl verify none
``` {{ end -}}`
This will be saved as a configmap and mounted into the cluster specific haproxy
daemonset across all undercloud control nodes.
We will then create a Kubernetes NodePort `Service` that will direct traffic on
the infrastructure `nodePort` defined in the SIP Cluster definition to these
haproxy workloads.
At this point, the SIP Cluster controller can now label the VMs appropriately
so they'll be scheduled by the Cluster-API process.
*/

View File

@ -1,13 +1,73 @@
package services_test package services_test
import ( import (
"path/filepath"
"testing" "testing"
airshipv1 "sipcluster/pkg/api/v1"
metal3 "github.com/metal3-io/baremetal-operator/apis/metal3.io/v1alpha1"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
) )
func TestServices(t *testing.T) { func TestServices(t *testing.T) {
RegisterFailHandler(Fail) RegisterFailHandler(Fail)
RunSpecs(t, "Services Suite") RunSpecs(t, "Services Suite")
} }
var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment
var logger = zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))
var _ = BeforeSuite(func(done Done) {
logf.SetLogger(logger)
By("bootstrapping test environment")
testEnv = &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
}
var err error
cfg, err = testEnv.Start()
Expect(err).ToNot(HaveOccurred())
Expect(cfg).ToNot(BeNil())
err = airshipv1.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
err = metal3.AddToScheme(scheme.Scheme)
Expect(err).NotTo(HaveOccurred())
k8sManager, err := ctrl.NewManager(cfg, ctrl.Options{
Scheme: scheme.Scheme,
MetricsBindAddress: "0",
})
Expect(err).ToNot(HaveOccurred())
k8sClient = k8sManager.GetClient()
Expect(k8sClient).ToNot(BeNil())
go func() {
err = k8sManager.Start(ctrl.SetupSignalHandler())
Expect(err).ToNot(HaveOccurred())
}()
close(done)
}, 60)
var _ = AfterSuite(func() {
By("tearing down the test environment")
err := testEnv.Stop()
Expect(err).ToNot(HaveOccurred())
})

View File

@ -0,0 +1,94 @@
package services_test
import (
"context"
airshipv1 "sipcluster/pkg/api/v1"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sipcluster/pkg/services"
"sipcluster/pkg/vbmh"
"sipcluster/testutil"
)
var _ = Describe("Service Set", func() {
Context("When new SIP cluster is created", func() {
It("Deploys services", func() {
By("Getting machine IPs and creating secrets, pods, and nodeport service")
bmh1, _ := testutil.CreateBMH(1, "default", "control-plane", 1)
bmh2, _ := testutil.CreateBMH(2, "default", "control-plane", 2)
m1 := &vbmh.Machine{
BMH: *bmh1,
Data: &vbmh.MachineData{
IPOnInterface: map[string]string{
"eno3": "192.168.0.1",
},
},
}
m2 := &vbmh.Machine{
BMH: *bmh2,
Data: &vbmh.MachineData{
IPOnInterface: map[string]string{
"eno3": "192.168.0.2",
},
},
}
sip := testutil.CreateSIPCluster("default", "default", 1, 1)
machineList := &vbmh.MachineList{
Machines: map[string]*vbmh.Machine{
bmh1.GetName(): m1,
bmh2.GetName(): m2,
},
}
set := services.NewServiceSet(logger, *sip, machineList, k8sClient)
serviceList := set.ServiceList()
Expect(serviceList).To(HaveLen(1))
Eventually(func() error {
return testDeployment(serviceList[0], sip)
}, 5, 1).Should(Succeed())
})
})
})
func testDeployment(sl services.InfraService, sip *airshipv1.SIPCluster) error {
err := sl.Deploy()
if err != nil {
return err
}
pod := &corev1.Pod{}
err = k8sClient.Get(context.Background(), types.NamespacedName{
Namespace: "default",
Name: sip.GetName() + "-load-balancer",
}, pod)
if err != nil {
return err
}
secret := &corev1.Secret{}
err = k8sClient.Get(context.Background(), types.NamespacedName{
Namespace: "default",
Name: sip.GetName() + "-load-balancer",
}, secret)
if err != nil {
return err
}
service := &corev1.Service{}
err = k8sClient.Get(context.Background(), types.NamespacedName{
Namespace: "default",
Name: sip.GetName() + "-load-balancer-service",
}, service)
if err != nil {
return err
}
return nil
}

140
pkg/services/set.go Normal file
View File

@ -0,0 +1,140 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package services
import (
"context"
"fmt"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
airshipv1 "sipcluster/pkg/api/v1"
airshipvms "sipcluster/pkg/vbmh"
)
// InfraService generalizes inftracture services
type InfraService interface {
Deploy() error
Finalize() error
Type() airshipv1.InfraService
}
// ServiceSet provides access to infrastructure services
type ServiceSet struct {
logger logr.Logger
sip airshipv1.SIPCluster
machines *airshipvms.MachineList
client client.Client
services map[airshipv1.InfraService]InfraService
}
// NewServiceSet returns new instance of ServiceSet
func NewServiceSet(
logger logr.Logger,
sip airshipv1.SIPCluster,
machines *airshipvms.MachineList,
client client.Client) ServiceSet {
logger = logger.WithValues("SIPCluster", types.NamespacedName{Name: sip.GetNamespace(), Namespace: sip.GetName()})
return ServiceSet{
logger: logger,
sip: sip,
client: client,
machines: machines,
}
}
// LoadBalancer returns loadbalancer service
func (ss ServiceSet) LoadBalancer() (InfraService, error) {
lb, ok := ss.services[airshipv1.LoadBalancerService]
if !ok {
ss.logger.Info("sip cluster doesn't have loadbalancer infrastructure service defined")
}
return lb, fmt.Errorf("loadbalancer service is not defined for sip cluster '%s'/'%s'",
ss.sip.GetNamespace(),
ss.sip.GetName())
}
func (ss ServiceSet) Finalize() error {
serviceNamespace := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: ss.sip.Spec.ClusterName,
},
}
return ss.client.Delete(context.TODO(), serviceNamespace)
}
func CreateNS (serviceNamespaceName string, c client.Client) error {
ns := &corev1.Namespace{}
key := client.ObjectKey{Name: serviceNamespaceName}
if err := c.Get(context.Background(), key, ns); err == nil {
// Namespace already exists
return nil
}
serviceNamespace := &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: corev1.SchemeGroupVersion.String(),
Kind: "Namespace",
},
ObjectMeta: metav1.ObjectMeta{
Name: serviceNamespaceName,
},
}
return c.Create(context.TODO(), serviceNamespace)
}
// ServiceList returns all services defined in Set
func (ss ServiceSet) ServiceList() []InfraService {
var serviceList []InfraService
for serviceType, serviceConfig := range ss.sip.Spec.InfraServices {
switch serviceType {
case airshipv1.LoadBalancerService:
ss.logger.Info("Service of type '%s' is defined", "service type", serviceType)
serviceList = append(serviceList,
newLB(ss.sip.GetName(),
ss.sip.Spec.ClusterName,
ss.logger,
serviceConfig,
ss.machines,
ss.client))
default:
ss.logger.Info("Service of type '%s' is unknown to SIPCluster controller", "service type", serviceType)
}
}
return serviceList
}
func applyRuntimeObject(key client.ObjectKey, obj client.Object, c client.Client) error {
ctx := context.Background()
switch err := c.Get(ctx, key, obj); {
case apierror.IsNotFound(err):
return c.Create(ctx, obj)
case err == nil:
return c.Update(ctx, obj)
default:
return err
}
}

View File

@ -120,7 +120,7 @@ var _ = Describe("MachineList", func() {
NodeLabels: map[string]string{ NodeLabels: map[string]string{
"test": "true", "test": "true",
}, },
NodePorts: []int{7000, 7001, 7002}, NodePort: 30000,
NodeInterface: "oam-ipv4", NodeInterface: "oam-ipv4",
}, },
} }

View File

@ -229,7 +229,12 @@ func CreateSIPCluster(name string, namespace string, masters int, workers int) *
}, },
}, },
}, },
InfraServices: map[airshipv1.InfraService]airshipv1.InfraConfig{}, InfraServices: map[airshipv1.InfraService]airshipv1.InfraConfig{
airshipv1.LoadBalancerService: {
NodeInterface: "eno3",
NodePort: 30000,
},
},
}, },
Status: airshipv1.SIPClusterStatus{}, Status: airshipv1.SIPClusterStatus{},
} }