Add kube-dns

Adds kube-dns deployments for each tenant. And also adds loadbalancer
for cluster DNS in each namespace.

Implements: blueprint dns
Change-Id: I2ee00806431cc4a3dfdf4c3c49b54892d4c2c98b
Signed-off-by: Pengfei Ni <feiskyer@gmail.com>
This commit is contained in:
Pengfei Ni 2017-08-03 16:35:55 +08:00
parent ea45b75fbc
commit 50322401cd
14 changed files with 441 additions and 24 deletions

3
.gitignore vendored
View File

@ -1,2 +1,5 @@
# binaries # binaries
_output _output
deployment/kubestack/kubestack
deployment/stackube-controller/stackube-controller
deployment/stackube-proxy/stackube-proxy

View File

@ -77,6 +77,12 @@ docker: depend
cp _output/stackube-proxy deployment/stackube-proxy cp _output/stackube-proxy deployment/stackube-proxy
sudo docker build -t stackube/stackube-proxy:v$(STACKUBE_PROXY_VERSION) ./deployment/stackube-proxy/ sudo docker build -t stackube/stackube-proxy:v$(STACKUBE_PROXY_VERSION) ./deployment/stackube-proxy/
.PHONY: push
push:
sudo docker push stackube/kubestack:v$(KUBESTACK_VERSION)
sudo docker push stackube/stackube-controller:v$(STACKUBE_VERSION)
sudo docker push stackube/stackube-proxy:v$(STACKUBE_PROXY_VERSION)
.PHONY: test .PHONY: test
test: test-unit test: test-unit

View File

@ -56,7 +56,7 @@ func startControllers(kubeClient *kubernetes.Clientset,
} }
// Creates a new Network controller // Creates a new Network controller
networkController, err := network.NewNetworkController(osClient, kubeExtClient) networkController, err := network.NewNetworkController(kubeClient, osClient, kubeExtClient)
if err != nil { if err != nil {
return err return err
} }

View File

@ -50,4 +50,4 @@ if [ -z $USER_GATEWAY ];then
USER_GATEWAY='10.244.0.1' USER_GATEWAY='10.244.0.1'
fi fi
./stackube-controller --kubeconfig="" --user-cidr=${USER_CIDR} --user-gateway=${USER_GATEWAY} ./stackube-controller --v=3 --kubeconfig="" --user-cidr=${USER_CIDR} --user-gateway=${USER_GATEWAY}

View File

@ -45,6 +45,7 @@ spec:
{"key":"CriticalAddonsOnly", "operator":"Exists"}] {"key":"CriticalAddonsOnly", "operator":"Exists"}]
spec: spec:
hostNetwork: true hostNetwork: true
hostPID: true
serviceAccountName: stackube-proxy serviceAccountName: stackube-proxy
dnsPolicy: ClusterFirst dnsPolicy: ClusterFirst
restartPolicy: Always restartPolicy: Always

View File

@ -41,4 +41,4 @@ mv $TMP_CONF $STACKUBE_CONFIG_PATH
echo "Wrote stackube config: $(cat ${STACKUBE_CONFIG_PATH})" echo "Wrote stackube config: $(cat ${STACKUBE_CONFIG_PATH})"
# Start stackube-proxy in-cluster. # Start stackube-proxy in-cluster.
./stackube-proxy --kubeconfig="" ./stackube-proxy --kubeconfig="" --v=3

View File

View File

@ -66,6 +66,32 @@ func GenerateRoleBinding(namespace, tenant string) *v1beta1.RoleBinding {
return roleBinding return roleBinding
} }
func GenerateServiceAccountRoleBinding(namespace, tenant string) *v1beta1.RoleBinding {
subject := v1beta1.Subject{
Kind: "ServiceAccount",
Name: "default",
Namespace: namespace,
}
roleRef := v1beta1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: "Role",
Name: "default-role",
}
roleBinding := &v1beta1.RoleBinding{
TypeMeta: metav1.TypeMeta{
Kind: "RoleBinding",
APIVersion: "rbac.authorization.k8s.io/v1beta1",
},
ObjectMeta: metav1.ObjectMeta{
Name: tenant + "-rolebinding-sa",
Namespace: namespace,
},
Subjects: []v1beta1.Subject{subject},
RoleRef: roleRef,
}
return roleBinding
}
func GenerateClusterRole() *v1beta1.ClusterRole { func GenerateClusterRole() *v1beta1.ClusterRole {
policyRule := v1beta1.PolicyRule{ policyRule := v1beta1.PolicyRule{
Verbs: []string{v1beta1.VerbAll}, Verbs: []string{v1beta1.VerbAll},

View File

@ -183,6 +183,7 @@ func (c *Controller) syncRBAC(ns *apiv1.Namespace) error {
return nil return nil
} }
rbacClient := c.k8sclient.Rbac() rbacClient := c.k8sclient.Rbac()
// Create role for tenant // Create role for tenant
role := rbac.GenerateRoleByNamespace(ns.Name) role := rbac.GenerateRoleByNamespace(ns.Name)
_, err := rbacClient.Roles(ns.Name).Create(role) _, err := rbacClient.Roles(ns.Name).Create(role)
@ -191,6 +192,7 @@ func (c *Controller) syncRBAC(ns *apiv1.Namespace) error {
return err return err
} }
glog.V(4).Infof("Created default-role in namespace %s for tenant %s", ns.Name, ns.Name) glog.V(4).Infof("Created default-role in namespace %s for tenant %s", ns.Name, ns.Name)
// Create rolebinding for tenant // Create rolebinding for tenant
roleBinding := rbac.GenerateRoleBinding(ns.Name, ns.Name) roleBinding := rbac.GenerateRoleBinding(ns.Name, ns.Name)
_, err = rbacClient.RoleBindings(ns.Name).Create(roleBinding) _, err = rbacClient.RoleBindings(ns.Name).Create(roleBinding)
@ -198,6 +200,13 @@ func (c *Controller) syncRBAC(ns *apiv1.Namespace) error {
glog.Errorf("Failed create %s-rolebindings in namespace %s for tenant %s: %v", ns.Name, ns.Name, ns.Name, err) glog.Errorf("Failed create %s-rolebindings in namespace %s for tenant %s: %v", ns.Name, ns.Name, ns.Name, err)
return err return err
} }
saRoleBinding := rbac.GenerateServiceAccountRoleBinding(ns.Name, ns.Name)
_, err = rbacClient.RoleBindings(ns.Name).Create(saRoleBinding)
if err != nil && !apierrors.IsAlreadyExists(err) {
glog.Errorf("Failed create %s-rolebindings-sa in namespace %s for tenant %s: %v", ns.Name, ns.Name, ns.Name, err)
return err
}
glog.V(4).Infof("Created %s-rolebindings in namespace %s for tenant %s", ns.Name, ns.Name, ns.Name) glog.V(4).Infof("Created %s-rolebindings in namespace %s for tenant %s", ns.Name, ns.Name, ns.Name)
return nil return nil
} }

View File

@ -0,0 +1,184 @@
/*
Copyright (c) 2017 OpenStack Foundation.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package network
const (
kubeDNSDeployment = `
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
labels:
k8s-app: kube-dns
name: kube-dns
namespace: {{ .Namespace }}
spec:
replicas: 1
selector:
matchLabels:
k8s-app: kube-dns
strategy:
rollingUpdate:
maxSurge: 10%
maxUnavailable: 0
type: RollingUpdate
template:
metadata:
annotations:
scheduler.alpha.kubernetes.io/critical-pod: ""
labels:
k8s-app: kube-dns
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: beta.kubernetes.io/arch
operator: In
values:
- amd64
containers:
- args:
- --domain={{ .DNSDomain }}.
- --dns-port=10053
- --namespace=$(POD_NAMESPACE)
- --config-dir=/kube-dns-config
- --v=2
env:
- name: PROMETHEUS_PORT
value: "10055"
- name: KUBERNETES_SERVICE_HOST
value: "{{ .KubernetesHost }}"
- name: KUBERNETES_SERVICE_PORT
value: "{{ .KubernetesPort }}"
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: {{ .KubeDNSImage }}
imagePullPolicy: IfNotPresent
name: kubedns
ports:
- containerPort: 10053
name: dns-local
protocol: UDP
- containerPort: 10053
name: dns-tcp-local
protocol: TCP
- containerPort: 10055
name: metrics
protocol: TCP
resources:
limits:
cpu: 100m
memory: 64Mi
volumeMounts:
- mountPath: /kube-dns-config
name: kube-dns-config
- args:
- -v=2
- -logtostderr
- -configDir=/etc/k8s/dns/dnsmasq-nanny
- -restartDnsmasq=true
- --
- -k
- --cache-size=1000
- --log-facility=-
- --server=/{{ .DNSDomain }}/127.0.0.1#10053
- --server=/in-addr.arpa/127.0.0.1#10053
- --server=/ip6.arpa/127.0.0.1#10053
image: {{ .DNSMasqImage }}
imagePullPolicy: IfNotPresent
name: dnsmasq
ports:
- containerPort: 53
name: dns
protocol: UDP
- containerPort: 53
name: dns-tcp
protocol: TCP
resources:
limits:
cpu: 150m
memory: 64Mi
volumeMounts:
- mountPath: /etc/k8s/dns/dnsmasq-nanny
name: kube-dns-config
- command:
- /bin/sh
- -c
- /sidecar --v=2 --logtostderr --probe=kubedns,127.0.0.1:10053,$(hostname -i | tr '.' '-').$(POD_NAMESPACE).pod.{{ .DNSDomain }},5,A --probe=dnsmasq,127.0.0.1:53,$(hostname -i | tr '.' '-').$(POD_NAMESPACE).pod.{{ .DNSDomain }},5,A
env:
- name: POD_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
image: {{ .SidecarImage }}
imagePullPolicy: IfNotPresent
name: sidecar
ports:
- containerPort: 10054
name: metrics
protocol: TCP
resources:
limits:
cpu: 150m
memory: 128Mi
requests:
cpu: 10m
memory: 20Mi
dnsPolicy: Default
restartPolicy: Always
terminationGracePeriodSeconds: 30
tolerations:
- effect: NoSchedule
key: node-role.kubernetes.io/master
- key: CriticalAddonsOnly
operator: Exists
volumes:
- configMap:
defaultMode: 420
name: kube-dns
optional: true
name: kube-dns-config
`
kubeDNSService = `
apiVersion: v1
kind: Service
metadata:
labels:
k8s-app: kube-dns
kubernetes.io/name: KubeDNS
name: kube-dns
namespace: {{ .Namespace }}
spec:
ports:
- name: dns
port: 53
protocol: UDP
targetPort: 53
- name: dns-tcp
port: 53
protocol: TCP
targetPort: 53
selector:
k8s-app: kube-dns
sessionAffinity: None
type: ClusterIP
`
)

View File

@ -18,13 +18,21 @@ package network
import ( import (
"fmt" "fmt"
"os"
"time"
"github.com/golang/glog" "github.com/golang/glog"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
v1beta1 "k8s.io/api/extensions/v1beta1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
apismetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1" crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1"
@ -33,8 +41,16 @@ import (
"git.openstack.org/openstack/stackube/pkg/util" "git.openstack.org/openstack/stackube/pkg/util"
) )
const (
defaultDNSDomain = "cluster.local"
defaultKubeDNSImage = "stackube/k8s-dns-kube-dns-amd64:1.14.4"
defaultDNSMasqImage = "stackube/k8s-dns-dnsmasq-nanny-amd64:1.14.4"
defaultSideCarImage = "stackube/k8s-dns-sidecar-amd64:1.14.4"
)
// Watcher is an network of watching on resource create/update/delete events // Watcher is an network of watching on resource create/update/delete events
type NetworkController struct { type NetworkController struct {
k8sclient *kubernetes.Clientset
kubeCRDClient *kubecrd.CRDClient kubeCRDClient *kubecrd.CRDClient
driver *openstack.Client driver *openstack.Client
networkInformer cache.Controller networkInformer cache.Controller
@ -53,7 +69,7 @@ func (c *NetworkController) Run(stopCh <-chan struct{}) error {
return nil return nil
} }
func NewNetworkController(osClient *openstack.Client, kubeExtClient *apiextensionsclient.Clientset) (*NetworkController, error) { func NewNetworkController(kubeClient *kubernetes.Clientset, osClient *openstack.Client, kubeExtClient *apiextensionsclient.Clientset) (*NetworkController, error) {
// initialize CRD if it does not exist // initialize CRD if it does not exist
_, err := kubecrd.CreateNetworkCRD(kubeExtClient) _, err := kubecrd.CreateNetworkCRD(kubeExtClient)
if err != nil && !apierrors.IsAlreadyExists(err) { if err != nil && !apierrors.IsAlreadyExists(err) {
@ -66,6 +82,7 @@ func NewNetworkController(osClient *openstack.Client, kubeExtClient *apiextensio
apiv1.NamespaceAll, apiv1.NamespaceAll,
fields.Everything()) fields.Everything())
networkController := &NetworkController{ networkController := &NetworkController{
k8sclient: kubeClient,
kubeCRDClient: osClient.CRDClient, kubeCRDClient: osClient.CRDClient,
driver: osClient, driver: osClient,
} }
@ -102,7 +119,23 @@ func (c *NetworkController) onAdd(obj interface{}) {
// This will: // This will:
// 1. Create Network in Neutron // 1. Create Network in Neutron
// 2. Update Network CRD object status to Active or Failed // 2. Update Network CRD object status to Active or Failed
c.addNetworkToDriver(networkCopy) err = c.addNetworkToDriver(networkCopy)
if err != nil {
glog.Errorf("Add network to driver failed: %v", err)
return
}
// create kube-dns in this namespace.
namespace := networkCopy.Namespace
if err := c.createKubeDNSDeployment(namespace); err != nil {
glog.Errorf("Create kube-dns deployment failed: %v", err)
return
}
if err := c.createKubeDNSService(namespace); err != nil {
glog.Errorf("Create kube-dns service failed: %v", err)
return
}
} }
func (c *NetworkController) onUpdate(oldObj, newObj interface{}) { func (c *NetworkController) onUpdate(oldObj, newObj interface{}) {
@ -110,16 +143,113 @@ func (c *NetworkController) onUpdate(oldObj, newObj interface{}) {
} }
func (c *NetworkController) onDelete(obj interface{}) { func (c *NetworkController) onDelete(obj interface{}) {
if net, ok := obj.(*crv1.Network); ok { net, ok := obj.(*crv1.Network)
glog.V(4).Infof("NetworkController: network %s deleted", net.Name) if !ok {
if net.Spec.NetworkID == "" { glog.Warningf("Receiving an unkown object: %v", obj)
networkName := util.BuildNetworkName(net.GetNamespace(), net.GetName()) return
err := c.driver.DeleteNetwork(networkName) }
if err != nil {
glog.Errorf("NetworkController: delete network %s failed in networkprovider: %v", networkName, err) glog.V(4).Infof("NetworkController: network %s deleted", net.Name)
} else {
glog.V(4).Infof("NetworkController: network %s deleted in networkprovider", networkName) // Delete kube-dns deployment.
} if err := c.deleteDeployment(net.Namespace, "kube-dns"); err != nil {
glog.Warningf("error on deleting kube-dns deployment: %v", err)
}
// Delete kube-dns services for non-system namespaces.
if !util.IsSystemNamespace(net.Namespace) {
if err := c.k8sclient.CoreV1Client.Services(net.Namespace).Delete("kube-dns", apismetav1.NewDeleteOptions(0)); err != nil {
glog.Warningf("error on deleting kube-dns service: %v", err)
}
}
// Delete neutron network created by stackube.
if net.Spec.NetworkID == "" {
networkName := util.BuildNetworkName(net.GetNamespace(), net.GetName())
err := c.driver.DeleteNetwork(networkName)
if err != nil {
glog.Errorf("NetworkController: delete network %s failed in networkprovider: %v", networkName, err)
} else {
glog.V(4).Infof("NetworkController: network %s deleted in networkprovider", networkName)
} }
} }
} }
func (c *NetworkController) createKubeDNSDeployment(namespace string) error {
tempArgs := struct{ Namespace, DNSDomain, KubeDNSImage, DNSMasqImage, SidecarImage, KubernetesHost, KubernetesPort string }{
Namespace: namespace,
DNSDomain: defaultDNSDomain,
KubeDNSImage: defaultKubeDNSImage,
DNSMasqImage: defaultDNSMasqImage,
SidecarImage: defaultSideCarImage,
}
if host := os.Getenv("KUBERNETES_SERVICE_HOST"); host != "" {
tempArgs.KubernetesHost = host
}
if port := os.Getenv("KUBERNETES_SERVICE_PORT"); port != "" {
tempArgs.KubernetesPort = port
}
dnsDeploymentBytes, err := parseTemplate(kubeDNSDeployment, tempArgs)
kubeDNSDeploy := &v1beta1.Deployment{}
if err = kuberuntime.DecodeInto(scheme.Codecs.UniversalDecoder(), dnsDeploymentBytes, kubeDNSDeploy); err != nil {
return fmt.Errorf("unable to decode kube-dns deployment %v", err)
}
_, err = c.k8sclient.ExtensionsV1beta1Client.Deployments(namespace).Create(kubeDNSDeploy)
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("unable to create a new kube-dns deployment: %v", err)
}
if _, err = c.k8sclient.ExtensionsV1beta1Client.Deployments(namespace).Update(kubeDNSDeploy); err != nil {
return fmt.Errorf("unable to update the kube-dns deployment: %v", err)
}
}
return nil
}
func (c *NetworkController) deleteDeployment(namespace, name string) error {
if err := c.k8sclient.ExtensionsV1beta1Client.Deployments(namespace).Delete(name, apismetav1.NewDeleteOptions(0)); err != nil {
return err
}
err := wait.Poll(500*time.Millisecond, 60*time.Second, func() (bool, error) {
_, err := c.k8sclient.ExtensionsV1beta1Client.Deployments(namespace).Get(name, apismetav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return true, nil
}
return false, err
}
return false, nil
})
if err != nil {
return err
}
return nil
}
func (c *NetworkController) createKubeDNSService(namespace string) error {
tempArgs := struct{ Namespace string }{
Namespace: namespace,
}
dnsServiceBytes, err := parseTemplate(kubeDNSService, tempArgs)
dnsService := &apiv1.Service{}
if err = kuberuntime.DecodeInto(scheme.Codecs.UniversalDecoder(), dnsServiceBytes, dnsService); err != nil {
return fmt.Errorf("unable to decode kube-dns service %v", err)
}
_, err = c.k8sclient.CoreV1Client.Services(namespace).Create(dnsService)
if err != nil {
if !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("unable to create a new kube-dns service: %v", err)
}
if _, err = c.k8sclient.CoreV1Client.Services(namespace).Update(dnsService); err != nil {
return fmt.Errorf("unable to update the kube-dns service: %v", err)
}
}
return nil
}

View File

@ -17,6 +17,9 @@ limitations under the License.
package network package network
import ( import (
"bytes"
"fmt"
"html/template"
"time" "time"
crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1" crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1"
@ -32,7 +35,7 @@ const (
subnetSuffix = "subnet" subnetSuffix = "subnet"
) )
func (c *NetworkController) addNetworkToDriver(kubeNetwork *crv1.Network) { func (c *NetworkController) addNetworkToDriver(kubeNetwork *crv1.Network) error {
// The tenant name is the same with namespace, let's get tenantID by tenantName // The tenant name is the same with namespace, let's get tenantID by tenantName
tenantName := kubeNetwork.GetNamespace() tenantName := kubeNetwork.GetNamespace()
tenantID, err := c.driver.GetTenantIDFromName(tenantName) tenantID, err := c.driver.GetTenantIDFromName(tenantName)
@ -55,8 +58,7 @@ func (c *NetworkController) addNetworkToDriver(kubeNetwork *crv1.Network) {
}) })
} }
if err != nil || tenantID == "" { if err != nil || tenantID == "" {
glog.Errorf("failed to fetch tenantID for tenantName: %v, error: %v abort! \n", tenantName, err) return fmt.Errorf("failed to fetch tenantID for tenantName: %v, error: %v abort! \n", tenantName, err)
return
} }
networkName := util.BuildNetworkName(tenantName, kubeNetwork.GetName()) networkName := util.BuildNetworkName(tenantName, kubeNetwork.GetName())
@ -84,12 +86,12 @@ func (c *NetworkController) addNetworkToDriver(kubeNetwork *crv1.Network) {
check, err := c.driver.CheckTenantID(driverNetwork.TenantID) check, err := c.driver.CheckTenantID(driverNetwork.TenantID)
if err != nil { if err != nil {
glog.Errorf("[NetworkController]: check tenantID failed: %v", err) glog.Errorf("[NetworkController]: check tenantID failed: %v", err)
return err
} }
if !check { if !check {
glog.Warningf("[NetworkController]: tenantID %s doesn't exist in network provider", driverNetwork.TenantID)
kubeNetwork.Status.State = crv1.NetworkFailed kubeNetwork.Status.State = crv1.NetworkFailed
c.kubeCRDClient.UpdateNetwork(kubeNetwork) c.kubeCRDClient.UpdateNetwork(kubeNetwork)
return return fmt.Errorf("tenantID %s doesn't exist in network provider", driverNetwork.TenantID)
} }
// Check if provider network id exist // Check if provider network id exist
@ -124,4 +126,18 @@ func (c *NetworkController) addNetworkToDriver(kubeNetwork *crv1.Network) {
kubeNetwork.Status.State = newNetworkStatus kubeNetwork.Status.State = newNetworkStatus
c.kubeCRDClient.UpdateNetwork(kubeNetwork) c.kubeCRDClient.UpdateNetwork(kubeNetwork)
return nil
}
func parseTemplate(strtmpl string, obj interface{}) ([]byte, error) {
var buf bytes.Buffer
tmpl, err := template.New("template").Parse(strtmpl)
if err != nil {
return nil, fmt.Errorf("error when parsing template: %v", err)
}
err = tmpl.Execute(&buf, obj)
if err != nil {
return nil, fmt.Errorf("error when executing template: %v", err)
}
return buf.Bytes(), nil
} }

View File

@ -19,6 +19,7 @@ package proxy
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"net"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -26,6 +27,8 @@ import (
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
@ -47,6 +50,7 @@ const (
) )
type Proxier struct { type Proxier struct {
clusterDNS string
kubeClientset *kubernetes.Clientset kubeClientset *kubernetes.Clientset
osClient *openstack.Client osClient *openstack.Client
factory informers.SharedInformerFactory factory informers.SharedInformerFactory
@ -97,11 +101,17 @@ func NewProxier(kubeConfig, openstackConfig string) (*Proxier, error) {
return nil, fmt.Errorf("failed to build clientset: %v", err) return nil, fmt.Errorf("failed to build clientset: %v", err)
} }
clusterDNS, err := getClusterDNS(clientset)
if err != nil {
return nil, fmt.Errorf("failed to get cluster DNS: %v", err)
}
factory := informers.NewSharedInformerFactory(clientset, defaultResyncPeriod) factory := informers.NewSharedInformerFactory(clientset, defaultResyncPeriod)
proxier := &Proxier{ proxier := &Proxier{
kubeClientset: clientset, kubeClientset: clientset,
osClient: osClient, osClient: osClient,
factory: factory, factory: factory,
clusterDNS: clusterDNS,
endpointsChanges: newEndpointsChangeMap(""), endpointsChanges: newEndpointsChangeMap(""),
serviceChanges: newServiceChangeMap(), serviceChanges: newServiceChangeMap(),
namespaceChanges: newNamespaceChangeMap(), namespaceChanges: newNamespaceChangeMap(),
@ -562,7 +572,7 @@ func (p *Proxier) syncProxyRules() {
"-A", ChainSKPrerouting, "-A", ChainSKPrerouting,
"-m", "comment", "--comment", svcNameString, "-m", "comment", "--comment", svcNameString,
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), "-d", fmt.Sprintf("%s/32", p.getServiceIP(svcInfo)),
"--dport", strconv.Itoa(svcInfo.port), "--dport", strconv.Itoa(svcInfo.port),
} }
@ -589,3 +599,36 @@ func (p *Proxier) syncProxyRules() {
} }
} }
} }
func (p *Proxier) getServiceIP(serviceInfo *serviceInfo) string {
if serviceInfo.name == "kube-dns" {
return p.clusterDNS
}
return serviceInfo.clusterIP.String()
}
func getClusterDNS(client *kubernetes.Clientset) (string, error) {
dnssvc, err := client.CoreV1().Services(metav1.NamespaceSystem).Get("kube-dns", metav1.GetOptions{})
if err == nil && len(dnssvc.Spec.ClusterIP) > 0 {
return dnssvc.Spec.ClusterIP, nil
}
if apierrors.IsNotFound(err) {
// get from default namespace.
k8ssvc, err := client.CoreV1().Services(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("couldn't fetch information about the kubernetes service: %v", err)
}
// Build an IP by taking the kubernetes service's clusterIP and appending a "0" and checking that it's valid
dnsIP := net.ParseIP(fmt.Sprintf("%s0", k8ssvc.Spec.ClusterIP))
if dnsIP == nil {
return "", fmt.Errorf("could not parse dns ip %q: %v", dnsIP, err)
}
return dnsIP.String(), nil
}
return "", err
}

View File

@ -44,6 +44,7 @@ func (spn servicePortName) String() string {
// internal struct for string service information // internal struct for string service information
type serviceInfo struct { type serviceInfo struct {
name string
clusterIP net.IP clusterIP net.IP
port int port int
protocol v1.Protocol protocol v1.Protocol
@ -104,6 +105,7 @@ func (e *endpointsInfo) String() string {
func newServiceInfo(svcPortName servicePortName, port *v1.ServicePort, service *v1.Service) *serviceInfo { func newServiceInfo(svcPortName servicePortName, port *v1.ServicePort, service *v1.Service) *serviceInfo {
onlyNodeLocalEndpoints := false onlyNodeLocalEndpoints := false
info := &serviceInfo{ info := &serviceInfo{
name: service.Name,
clusterIP: net.ParseIP(service.Spec.ClusterIP), clusterIP: net.ParseIP(service.Spec.ClusterIP),
port: int(port.Port), port: int(port.Port),
protocol: port.Protocol, protocol: port.Protocol,
@ -254,9 +256,6 @@ func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts sets.Str
_, exists := (*sm)[svcPortName] _, exists := (*sm)[svcPortName]
if exists { if exists {
glog.V(1).Infof("Removing service port %q", svcPortName) glog.V(1).Infof("Removing service port %q", svcPortName)
//if info.protocol == v1.ProtocolUDP {
// staleServices.Insert(info.clusterIP.String())
//}
delete(*sm, svcPortName) delete(*sm, svcPortName)
} else { } else {
glog.Errorf("Service port %q removed, but doesn't exists", svcPortName) glog.Errorf("Service port %q removed, but doesn't exists", svcPortName)