Fix system namespace

Create tenant and network for 3 namespaces:
default, kube-system, kube-public

They will share "default" tenant and "kube-default-default-network"

Change-Id: I4cb195d544a5a8d6d3fdd356a3a89582daaafdbf
Implements: blueprint 	auth-controller
This commit is contained in:
Harry Zhang 2017-07-13 13:10:43 +08:00
parent 69dc3b3a6b
commit e2f1a14c1d
11 changed files with 183 additions and 63 deletions

View File

@ -24,17 +24,31 @@ var (
"path to kubernetes admin config file")
cloudconfig = pflag.String("cloudconfig", "/etc/stackube.conf",
"path to stackube config file")
systemCIDR = pflag.String("system-cidr", "10.10.10.10/24", "system Pod network CIDR")
systemGateway = pflag.String("system-gateway", "10.10.10.1", "system Pod network gateway")
)
func startControllers(kubeconfig, cloudconfig string) error {
// Creates a new tenant controller
tc, err := tenant.New(kubeconfig, cloudconfig)
// Creates a new Tenant controller
tc, err := tenant.NewTenantController(kubeconfig, cloudconfig)
if err != nil {
return err
}
// Creates a new Network controller
nc, err := network.NewNetworkController(
kubeconfig, cloudconfig)
if err != nil {
return err
}
// Creates a new RBAC controller
rm, err := rbacmanager.New(kubeconfig)
rm, err := rbacmanager.New(kubeconfig,
tc.GetTenantClient(),
nc.GetNetworkClient(),
*systemCIDR,
*systemGateway,
)
if err != nil {
return err
}
@ -46,14 +60,8 @@ func startControllers(kubeconfig, cloudconfig string) error {
wg.Go(func() error { return tc.Run(ctx.Done()) })
wg.Go(func() error { return rm.Run(ctx.Done()) })
networkController, err := network.NewNetworkController(
kubeconfig, cloudconfig)
if err != nil {
return err
}
// start network controller
wg.Go(func() error { return networkController.Run(ctx.Done()) })
wg.Go(func() error { return nc.Run(ctx.Done()) })
term := make(chan os.Signal)
signal.Notify(term, os.Interrupt, syscall.SIGTERM)

View File

@ -69,12 +69,13 @@ Test
password: "password"
# for now network should have the same name as namespace (to enforce 1:1 mapping)
$ cat test-network.yaml
apiVersion: "stackube.kubernetes.io/v1"
kind: Network
metadata:
name: test-net
name: test
namespace: test
spec:
cidr: "192.168.0.0/24"

View File

@ -44,7 +44,7 @@ Start installation:
Setup environment variables for kubectl and openstack client:
::
export KUBECONFIG=$HOME/admin.conf
export KUBECONFIG=/etc/kubernetes/admin.conf
source openrc admin admin
================

View File

@ -11,7 +11,6 @@ import (
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
apiv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest"
)
@ -47,12 +46,13 @@ func CreateTenantCRD(clientset apiextensionsclient.Interface) (*apiextensionsv1b
}
}
func WaitForTenantInstanceProcessed(kubeClient *rest.RESTClient, name string) error {
func WaitForTenantInstanceProcessed(tenantClient *rest.RESTClient, name string) error {
return wait.Poll(100*time.Millisecond, 10*time.Second, func() (bool, error) {
var tenant crv1.Tenant
err := kubeClient.Get().
err := tenantClient.Get().
Resource(crv1.TenantResourcePlural).
Namespace(apiv1.NamespaceDefault).
// namespace and tenant has same name
Namespace(name).
Name(name).
Do().Into(&tenant)

View File

@ -4,15 +4,18 @@ import (
"fmt"
"time"
crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1"
"git.openstack.org/openstack/stackube/pkg/auth-controller/rbacmanager/rbac"
"git.openstack.org/openstack/stackube/pkg/util"
"github.com/golang/glog"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
@ -22,13 +25,22 @@ const (
)
type Controller struct {
kclient *kubernetes.Clientset
nsInf cache.SharedIndexInformer
queue workqueue.RateLimitingInterface
kclient *kubernetes.Clientset
nsInf cache.SharedIndexInformer
queue workqueue.RateLimitingInterface
tenantClient *rest.RESTClient
networkClient *rest.RESTClient
systemCIDR string
systemGateway string
}
// New creates a new RBAC controller.
func New(kubeconfig string) (*Controller, error) {
func New(kubeconfig string,
tenantClient *rest.RESTClient,
networkClient *rest.RESTClient,
systemCIDR string,
systemGateway string,
) (*Controller, error) {
cfg, err := util.NewClusterConfig(kubeconfig)
if err != nil {
return nil, fmt.Errorf("init cluster config failed: %v", err)
@ -39,8 +51,12 @@ func New(kubeconfig string) (*Controller, error) {
}
o := &Controller{
kclient: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "rbacmanager"),
kclient: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "rbacmanager"),
tenantClient: tenantClient,
networkClient: networkClient,
systemCIDR: systemCIDR,
systemGateway: systemGateway,
}
o.nsInf = cache.NewSharedIndexInformer(
@ -127,10 +143,63 @@ func (c *Controller) handleNamespaceAdd(obj interface{}) {
if !ok {
return
}
// check if this is a system reserved namespace
if util.IsSystemNamespace(key) {
if err := c.initSystemReservedTenantNetwork(); err != nil {
glog.Error(err)
return
}
}
glog.V(4).Infof("Added namespace %s", key)
c.enqueue(key)
}
func (c *Controller) initSystemReservedTenantNetwork() error {
tenant := &crv1.Tenant{
ObjectMeta: metav1.ObjectMeta{
Name: util.SystemTenant,
},
Spec: crv1.TenantSpec{
UserName: util.SystemTenant,
Password: util.SystemPassword,
},
}
err := c.tenantClient.Post().
Namespace(util.SystemTenant).
Resource(crv1.TenantResourcePlural).
Body(tenant).
Do().Error()
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create system Tenant: %v", err)
}
// NOTE(harry): we do not support update Network, so although configurable,
// user can not update CIDR by changing the configuration, unless manually delete
// that system network. We may need to document this.
network := &crv1.Network{
ObjectMeta: metav1.ObjectMeta{
Name: util.SystemNetwork,
},
Spec: crv1.NetworkSpec{
CIDR: c.systemCIDR,
Gateway: c.systemGateway,
},
}
// network controller will always check if Tenant is ready so we will not wait here
err = c.networkClient.Post().
Resource(crv1.NetworkResourcePlural).
Namespace(util.SystemTenant).
Body(network).
Do().Error()
if err != nil && !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("failed to create system Network: %v", err)
}
return nil
}
func (c *Controller) handleNamespaceDelete(obj interface{}) {
key, ok := c.keyFunc(obj)
if !ok {

View File

@ -29,8 +29,8 @@ type TenantController struct {
openstackClient *openstack.Client
}
// New creates a new tenant controller.
func New(kubeconfig, cloudconfig string) (*TenantController, error) {
// NewTenantController creates a new tenant controller.
func NewTenantController(kubeconfig, cloudconfig string) (*TenantController, error) {
// Create OpenStack client from config
openStackClient, err := openstack.NewClient(cloudconfig)
if err != nil {
@ -85,6 +85,10 @@ func buildConfig(kubeconfig string) (*rest.Config, error) {
return rest.InClusterConfig()
}
func (c *TenantController) GetTenantClient() *rest.RESTClient {
return c.tenantClient
}
// Run the controller.
func (c *TenantController) Run(stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
@ -112,7 +116,7 @@ func (c *TenantController) Run(stopCh <-chan struct{}) error {
func (c *TenantController) onAdd(obj interface{}) {
tenant := obj.(*crv1.Tenant)
glog.V(3).Infof("Tenant controller received new object %q\n", tenant)
glog.V(3).Infof("Tenant controller received new object %v\n", tenant)
copyObj, err := c.tenantScheme.Copy(tenant)
if err != nil {
@ -134,7 +138,7 @@ func (c *TenantController) onDelete(obj interface{}) {
return
}
glog.V(3).Infof("Tenant controller received deleted tenant %q\n", tenant)
glog.V(3).Infof("Tenant controller received deleted tenant %v\n", tenant)
deleteOptions := &apismetav1.DeleteOptions{
TypeMeta: apismetav1.TypeMeta{

View File

@ -11,7 +11,6 @@ import (
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
apiv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/rest"
)
@ -47,12 +46,12 @@ func CreateNetworkCRD(clientset apiextensionsclient.Interface) (*apiextensionsv1
}
}
func WaitForNetworkInstanceProcessed(kubeClient *rest.RESTClient, name string) error {
func WaitForNetworkInstanceProcessed(networkClient *rest.RESTClient, name, namespace string) error {
return wait.Poll(100*time.Millisecond, 10*time.Second, func() (bool, error) {
var network crv1.Network
err := kubeClient.Get().
err := networkClient.Get().
Resource(crv1.NetworkResourcePlural).
Namespace(apiv1.NamespaceDefault).
Namespace(namespace).
Name(name).
Do().Into(&network)

View File

@ -15,7 +15,7 @@ import (
crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1"
crdClient "git.openstack.org/openstack/stackube/pkg/network-controller/client"
driver "git.openstack.org/openstack/stackube/pkg/openstack"
osDriver "git.openstack.org/openstack/stackube/pkg/openstack"
"git.openstack.org/openstack/stackube/pkg/util"
"github.com/golang/glog"
@ -23,16 +23,20 @@ import (
// Watcher is an network of watching on resource create/update/delete events
type NetworkController struct {
NetworkClient *rest.RESTClient
NetworkScheme *runtime.Scheme
Driver *driver.Client
networkClient *rest.RESTClient
networkScheme *runtime.Scheme
driver *osDriver.Client
}
func (c *NetworkController) GetNetworkClient() *rest.RESTClient {
return c.networkClient
}
func (c *NetworkController) Run(stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
source := cache.NewListWatchFromClient(
c.NetworkClient,
c.networkClient,
crv1.NetworkResourcePlural,
apiv1.NamespaceAll,
fields.Everything())
@ -49,6 +53,7 @@ func (c *NetworkController) Run(stopCh <-chan struct{}) error {
go networkInformer.Run(stopCh)
<-stopCh
return nil
}
@ -61,7 +66,7 @@ func buildConfig(kubeconfig string) (*rest.Config, error) {
func NewNetworkController(kubeconfig, openstackConfigFile string) (*NetworkController, error) {
// Create OpenStack client from config
openstack, err := driver.NewClient(openstackConfigFile)
openstack, err := osDriver.NewClient(openstackConfigFile)
if err != nil {
return nil, fmt.Errorf("Couldn't initialize openstack: %#v", err)
}
@ -89,21 +94,22 @@ func NewNetworkController(kubeconfig, openstackConfigFile string) (*NetworkContr
}
networkController := &NetworkController{
NetworkClient: networkClient,
NetworkScheme: networkScheme,
Driver: openstack,
networkClient: networkClient,
networkScheme: networkScheme,
driver: openstack,
}
return networkController, nil
}
func (c *NetworkController) onAdd(obj interface{}) {
network := obj.(*crv1.Network)
glog.Infof("[NETWORK CONTROLLER] OnAdd %s\n", network.ObjectMeta.SelfLink)
// glog.Infof("[NETWORK CONTROLLER] OnAdd %\n", network.ObjectMeta.SelfLink)
glog.Infof("[NETWORK CONTROLLER] OnAdd %#v\n", network)
// NEVER modify objects from the store. It's a read-only, local cache.
// You can use networkScheme.Copy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance
copyObj, err := c.NetworkScheme.Copy(network)
copyObj, err := c.networkScheme.Copy(network)
if err != nil {
glog.Errorf("ERROR creating a deep copy of network object: %v\n", err)
return
@ -113,8 +119,8 @@ func (c *NetworkController) onAdd(obj interface{}) {
// This will:
// 1. Create Network in Neutron
// 2. Update Network TRP object status to Active or Failed
c.addNetworkToNeutron(networkCopy)
// 2. Update Network CRD object status to Active or Failed
c.addNetworkToDriver(networkCopy)
}
func (c *NetworkController) onUpdate(oldObj, newObj interface{}) {
@ -126,7 +132,7 @@ func (c *NetworkController) onDelete(obj interface{}) {
glog.V(4).Infof("NetworkController: network %s deleted", net.Name)
if net.Spec.NetworkID == "" {
networkName := util.BuildNetworkName(net.GetNamespace(), net.GetName())
err := c.Driver.DeleteNetwork(networkName)
err := c.driver.DeleteNetwork(networkName)
if err != nil {
glog.Errorf("NetworkController: delete network %s failed in networkprovider: %v", networkName, err)
} else {

View File

@ -3,13 +3,12 @@ package network
import (
"time"
"k8s.io/apimachinery/pkg/util/wait"
crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1"
drivertypes "git.openstack.org/openstack/stackube/pkg/openstack/types"
"git.openstack.org/openstack/stackube/pkg/util"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/wait"
)
const (
@ -17,22 +16,26 @@ const (
subnetSuffix = "subnet"
)
func (c *NetworkController) addNetworkToNeutron(kubeNetwork *crv1.Network) {
func (c *NetworkController) addNetworkToDriver(kubeNetwork *crv1.Network) {
// The tenant name is the same with namespace, let's get tenantID by tenantName
tenantName := kubeNetwork.GetNamespace()
tenantID, err := c.Driver.GetTenantIDFromName(tenantName)
if err != nil {
// Retry for a while if failed
tenantID, err := c.driver.GetTenantIDFromName(tenantName)
// Retry for a while if fetch tenantID failed or tenantID not found,
// this is normally caused by cloud provider processing
if err != nil || tenantID == "" {
err = wait.Poll(2*time.Second, 10*time.Second, func() (bool, error) {
glog.V(3).Infof("failed to fetch tenantID for tenantName: %v, retrying\n", tenantName)
if tenantID, err = c.Driver.GetTenantIDFromName(kubeNetwork.GetNamespace()); err != nil {
tenantID, err = c.driver.GetTenantIDFromName(kubeNetwork.GetNamespace())
if err != nil || tenantID == "" {
glog.V(5).Infof("failed to fetch tenantID for tenantName: %v, retrying\n", tenantName)
return false, nil
}
return true, nil
})
}
if err != nil {
if err != nil || tenantID == "" {
glog.Errorf("failed to fetch tenantID for tenantName: %v, abort! \n", tenantName)
return
} else {
glog.V(3).Infof("Got tenantID: %v for tenantName: %v", tenantID, tenantName)
}
@ -59,7 +62,7 @@ func (c *NetworkController) addNetworkToNeutron(kubeNetwork *crv1.Network) {
glog.V(4).Infof("[NetworkController]: adding network %s", driverNetwork.Name)
// Check if tenant id exist
check, err := c.Driver.CheckTenantID(driverNetwork.TenantID)
check, err := c.driver.CheckTenantID(driverNetwork.TenantID)
if err != nil {
glog.Errorf("[NetworkController]: check tenantID failed: %v", err)
}
@ -72,7 +75,7 @@ func (c *NetworkController) addNetworkToNeutron(kubeNetwork *crv1.Network) {
// Check if provider network id exist
if kubeNetwork.Spec.NetworkID != "" {
_, err := c.Driver.GetNetworkByID(kubeNetwork.Spec.NetworkID)
_, err := c.driver.GetNetworkByID(kubeNetwork.Spec.NetworkID)
if err != nil {
glog.Warningf("[NetworkController]: network %s doesn't exit in network provider", kubeNetwork.Spec.NetworkID)
newNetworkStatus = crv1.NetworkFailed
@ -83,12 +86,12 @@ func (c *NetworkController) addNetworkToNeutron(kubeNetwork *crv1.Network) {
newNetworkStatus = crv1.NetworkFailed
} else {
// Check if provider network has already created
_, err := c.Driver.GetNetwork(networkName)
_, err := c.driver.GetNetwork(networkName)
if err == nil {
glog.Infof("[NetworkController]: network %s has already created", networkName)
} else if err.Error() == util.ErrNotFound.Error() {
// Create a new network by network provider
err := c.Driver.CreateNetwork(driverNetwork)
err := c.driver.CreateNetwork(driverNetwork)
if err != nil {
glog.Warningf("[NetworkController]: create network %s failed: %v", driverNetwork.Name, err)
newNetworkStatus = crv1.NetworkFailed
@ -106,9 +109,9 @@ func (c *NetworkController) addNetworkToNeutron(kubeNetwork *crv1.Network) {
// updateNetwork updates Network CRD object by given object
func (c *NetworkController) updateNetwork(network *crv1.Network) {
err := c.NetworkClient.Put().
Name(network.ObjectMeta.Name).
Namespace(network.ObjectMeta.Namespace).
err := c.networkClient.Put().
Name(network.Name).
Namespace(network.Namespace).
Resource(crv1.NetworkResourcePlural).
Body(network).
Do().

View File

@ -144,6 +144,9 @@ func readConfig(config string) (Config, error) {
}
func (c *Client) GetTenantIDFromName(tenantName string) (string, error) {
if util.IsSystemNamespace(tenantName) {
tenantName = util.SystemTenant
}
var tenantID string
err := tenants.List(c.Identity, nil).EachPage(func(page pagination.Page) (bool, error) {
tenantList, err1 := tenants.ExtractTenants(page)

View File

@ -2,23 +2,50 @@ package util
import (
"errors"
apiv1 "k8s.io/client-go/pkg/api/v1"
)
const (
namePrefix = "kube"
SystemTenant = apiv1.NamespaceDefault
SystemPassword = "password"
SystemNetwork = apiv1.NamespaceDefault
)
var ErrNotFound = errors.New("NotFound")
var ErrMultipleResults = errors.New("MultipleResults")
func BuildNetworkName(namespace, name string) string {
return namePrefix + "_" + namespace + "_" + name
if IsSystemNamespace(namespace) {
namespace = SystemTenant
}
return namePrefix + "-" + namespace + "-" + name
}
func BuildLoadBalancerName(namespace, name string) string {
return namePrefix + "_" + namespace + "_" + name
if IsSystemNamespace(namespace) {
namespace = SystemTenant
}
return namePrefix + "-" + namespace + "-" + name
}
func BuildPortName(namespace, podName string) string {
return namePrefix + "_" + namespace + "_" + podName
if IsSystemNamespace(namespace) {
namespace = SystemTenant
}
return namePrefix + "-" + namespace + "-" + podName
}
func IsSystemNamespace(ns string) bool {
switch ns {
case
"default",
"kube-system",
"kube-public":
return true
}
return false
}