diff --git a/.gitignore b/.gitignore index bb0cd97..1502faa 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,5 @@ # binaries _output +deployment/kubestack/kubestack +deployment/stackube-controller/stackube-controller +deployment/stackube-proxy/stackube-proxy \ No newline at end of file diff --git a/Makefile b/Makefile index ca93f1d..c8eb32a 100644 --- a/Makefile +++ b/Makefile @@ -77,6 +77,12 @@ docker: depend cp _output/stackube-proxy deployment/stackube-proxy sudo docker build -t stackube/stackube-proxy:v$(STACKUBE_PROXY_VERSION) ./deployment/stackube-proxy/ +.PHONY: push +push: + sudo docker push stackube/kubestack:v$(KUBESTACK_VERSION) + sudo docker push stackube/stackube-controller:v$(STACKUBE_VERSION) + sudo docker push stackube/stackube-proxy:v$(STACKUBE_PROXY_VERSION) + .PHONY: test test: test-unit diff --git a/cmd/stackube-controller/stackube-controller.go b/cmd/stackube-controller/stackube-controller.go index bc569c7..78dfb17 100644 --- a/cmd/stackube-controller/stackube-controller.go +++ b/cmd/stackube-controller/stackube-controller.go @@ -56,7 +56,7 @@ func startControllers(kubeClient *kubernetes.Clientset, } // Creates a new Network controller - networkController, err := network.NewNetworkController(osClient, kubeExtClient) + networkController, err := network.NewNetworkController(kubeClient, osClient, kubeExtClient) if err != nil { return err } diff --git a/deployment/stackube-controller/start.sh b/deployment/stackube-controller/start.sh index 903d917..c50b50d 100755 --- a/deployment/stackube-controller/start.sh +++ b/deployment/stackube-controller/start.sh @@ -50,4 +50,4 @@ if [ -z $USER_GATEWAY ];then USER_GATEWAY='10.244.0.1' fi -./stackube-controller --kubeconfig="" --user-cidr=${USER_CIDR} --user-gateway=${USER_GATEWAY} \ No newline at end of file +./stackube-controller --v=3 --kubeconfig="" --user-cidr=${USER_CIDR} --user-gateway=${USER_GATEWAY} \ No newline at end of file diff --git a/deployment/stackube-proxy.yaml b/deployment/stackube-proxy.yaml index 7acc710..a763043 100644 --- a/deployment/stackube-proxy.yaml +++ b/deployment/stackube-proxy.yaml @@ -45,6 +45,7 @@ spec: {"key":"CriticalAddonsOnly", "operator":"Exists"}] spec: hostNetwork: true + hostPID: true serviceAccountName: stackube-proxy dnsPolicy: ClusterFirst restartPolicy: Always diff --git a/deployment/stackube-proxy/start.sh b/deployment/stackube-proxy/start.sh index d0592f1..2853d49 100755 --- a/deployment/stackube-proxy/start.sh +++ b/deployment/stackube-proxy/start.sh @@ -41,4 +41,4 @@ mv $TMP_CONF $STACKUBE_CONFIG_PATH echo "Wrote stackube config: $(cat ${STACKUBE_CONFIG_PATH})" # Start stackube-proxy in-cluster. -./stackube-proxy --kubeconfig="" \ No newline at end of file +./stackube-proxy --kubeconfig="" --v=3 \ No newline at end of file diff --git a/doc/source/architecture.rst b/doc/source/architecture.rst new file mode 100644 index 0000000..e69de29 diff --git a/pkg/auth-controller/rbacmanager/rbac/rbac.go b/pkg/auth-controller/rbacmanager/rbac/rbac.go index af66f15..4e94378 100644 --- a/pkg/auth-controller/rbacmanager/rbac/rbac.go +++ b/pkg/auth-controller/rbacmanager/rbac/rbac.go @@ -66,6 +66,32 @@ func GenerateRoleBinding(namespace, tenant string) *v1beta1.RoleBinding { return roleBinding } +func GenerateServiceAccountRoleBinding(namespace, tenant string) *v1beta1.RoleBinding { + subject := v1beta1.Subject{ + Kind: "ServiceAccount", + Name: "default", + Namespace: namespace, + } + roleRef := v1beta1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "Role", + Name: "default-role", + } + roleBinding := &v1beta1.RoleBinding{ + TypeMeta: metav1.TypeMeta{ + Kind: "RoleBinding", + APIVersion: "rbac.authorization.k8s.io/v1beta1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: tenant + "-rolebinding-sa", + Namespace: namespace, + }, + Subjects: []v1beta1.Subject{subject}, + RoleRef: roleRef, + } + return roleBinding +} + func GenerateClusterRole() *v1beta1.ClusterRole { policyRule := v1beta1.PolicyRule{ Verbs: []string{v1beta1.VerbAll}, diff --git a/pkg/auth-controller/rbacmanager/rbac_controller.go b/pkg/auth-controller/rbacmanager/rbac_controller.go index ba00965..f7e3613 100644 --- a/pkg/auth-controller/rbacmanager/rbac_controller.go +++ b/pkg/auth-controller/rbacmanager/rbac_controller.go @@ -183,6 +183,7 @@ func (c *Controller) syncRBAC(ns *apiv1.Namespace) error { return nil } rbacClient := c.k8sclient.Rbac() + // Create role for tenant role := rbac.GenerateRoleByNamespace(ns.Name) _, err := rbacClient.Roles(ns.Name).Create(role) @@ -191,6 +192,7 @@ func (c *Controller) syncRBAC(ns *apiv1.Namespace) error { return err } glog.V(4).Infof("Created default-role in namespace %s for tenant %s", ns.Name, ns.Name) + // Create rolebinding for tenant roleBinding := rbac.GenerateRoleBinding(ns.Name, ns.Name) _, err = rbacClient.RoleBindings(ns.Name).Create(roleBinding) @@ -198,6 +200,13 @@ func (c *Controller) syncRBAC(ns *apiv1.Namespace) error { glog.Errorf("Failed create %s-rolebindings in namespace %s for tenant %s: %v", ns.Name, ns.Name, ns.Name, err) return err } + saRoleBinding := rbac.GenerateServiceAccountRoleBinding(ns.Name, ns.Name) + _, err = rbacClient.RoleBindings(ns.Name).Create(saRoleBinding) + if err != nil && !apierrors.IsAlreadyExists(err) { + glog.Errorf("Failed create %s-rolebindings-sa in namespace %s for tenant %s: %v", ns.Name, ns.Name, ns.Name, err) + return err + } + glog.V(4).Infof("Created %s-rolebindings in namespace %s for tenant %s", ns.Name, ns.Name, ns.Name) return nil } diff --git a/pkg/network-controller/manifests.go b/pkg/network-controller/manifests.go new file mode 100644 index 0000000..7de64ae --- /dev/null +++ b/pkg/network-controller/manifests.go @@ -0,0 +1,184 @@ +/* +Copyright (c) 2017 OpenStack Foundation. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package network + +const ( + kubeDNSDeployment = ` +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + labels: + k8s-app: kube-dns + name: kube-dns + namespace: {{ .Namespace }} +spec: + replicas: 1 + selector: + matchLabels: + k8s-app: kube-dns + strategy: + rollingUpdate: + maxSurge: 10% + maxUnavailable: 0 + type: RollingUpdate + template: + metadata: + annotations: + scheduler.alpha.kubernetes.io/critical-pod: "" + labels: + k8s-app: kube-dns + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: beta.kubernetes.io/arch + operator: In + values: + - amd64 + containers: + - args: + - --domain={{ .DNSDomain }}. + - --dns-port=10053 + - --namespace=$(POD_NAMESPACE) + - --config-dir=/kube-dns-config + - --v=2 + env: + - name: PROMETHEUS_PORT + value: "10055" + - name: KUBERNETES_SERVICE_HOST + value: "{{ .KubernetesHost }}" + - name: KUBERNETES_SERVICE_PORT + value: "{{ .KubernetesPort }}" + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + image: {{ .KubeDNSImage }} + imagePullPolicy: IfNotPresent + name: kubedns + ports: + - containerPort: 10053 + name: dns-local + protocol: UDP + - containerPort: 10053 + name: dns-tcp-local + protocol: TCP + - containerPort: 10055 + name: metrics + protocol: TCP + resources: + limits: + cpu: 100m + memory: 64Mi + volumeMounts: + - mountPath: /kube-dns-config + name: kube-dns-config + - args: + - -v=2 + - -logtostderr + - -configDir=/etc/k8s/dns/dnsmasq-nanny + - -restartDnsmasq=true + - -- + - -k + - --cache-size=1000 + - --log-facility=- + - --server=/{{ .DNSDomain }}/127.0.0.1#10053 + - --server=/in-addr.arpa/127.0.0.1#10053 + - --server=/ip6.arpa/127.0.0.1#10053 + image: {{ .DNSMasqImage }} + imagePullPolicy: IfNotPresent + name: dnsmasq + ports: + - containerPort: 53 + name: dns + protocol: UDP + - containerPort: 53 + name: dns-tcp + protocol: TCP + resources: + limits: + cpu: 150m + memory: 64Mi + volumeMounts: + - mountPath: /etc/k8s/dns/dnsmasq-nanny + name: kube-dns-config + - command: + - /bin/sh + - -c + - /sidecar --v=2 --logtostderr --probe=kubedns,127.0.0.1:10053,$(hostname -i | tr '.' '-').$(POD_NAMESPACE).pod.{{ .DNSDomain }},5,A --probe=dnsmasq,127.0.0.1:53,$(hostname -i | tr '.' '-').$(POD_NAMESPACE).pod.{{ .DNSDomain }},5,A + env: + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + image: {{ .SidecarImage }} + imagePullPolicy: IfNotPresent + name: sidecar + ports: + - containerPort: 10054 + name: metrics + protocol: TCP + resources: + limits: + cpu: 150m + memory: 128Mi + requests: + cpu: 10m + memory: 20Mi + dnsPolicy: Default + restartPolicy: Always + terminationGracePeriodSeconds: 30 + tolerations: + - effect: NoSchedule + key: node-role.kubernetes.io/master + - key: CriticalAddonsOnly + operator: Exists + volumes: + - configMap: + defaultMode: 420 + name: kube-dns + optional: true + name: kube-dns-config +` + + kubeDNSService = ` +apiVersion: v1 +kind: Service +metadata: + labels: + k8s-app: kube-dns + kubernetes.io/name: KubeDNS + name: kube-dns + namespace: {{ .Namespace }} +spec: + ports: + - name: dns + port: 53 + protocol: UDP + targetPort: 53 + - name: dns-tcp + port: 53 + protocol: TCP + targetPort: 53 + selector: + k8s-app: kube-dns + sessionAffinity: None + type: ClusterIP +` +) diff --git a/pkg/network-controller/network_controller.go b/pkg/network-controller/network_controller.go index 80d4c50..5c220b4 100644 --- a/pkg/network-controller/network_controller.go +++ b/pkg/network-controller/network_controller.go @@ -18,13 +18,21 @@ package network import ( "fmt" + "os" + "time" "github.com/golang/glog" apiv1 "k8s.io/api/core/v1" + v1beta1 "k8s.io/api/extensions/v1beta1" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apierrors "k8s.io/apimachinery/pkg/api/errors" + apismetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + kuberuntime "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1" @@ -33,8 +41,16 @@ import ( "git.openstack.org/openstack/stackube/pkg/util" ) +const ( + defaultDNSDomain = "cluster.local" + defaultKubeDNSImage = "stackube/k8s-dns-kube-dns-amd64:1.14.4" + defaultDNSMasqImage = "stackube/k8s-dns-dnsmasq-nanny-amd64:1.14.4" + defaultSideCarImage = "stackube/k8s-dns-sidecar-amd64:1.14.4" +) + // Watcher is an network of watching on resource create/update/delete events type NetworkController struct { + k8sclient *kubernetes.Clientset kubeCRDClient *kubecrd.CRDClient driver *openstack.Client networkInformer cache.Controller @@ -53,7 +69,7 @@ func (c *NetworkController) Run(stopCh <-chan struct{}) error { return nil } -func NewNetworkController(osClient *openstack.Client, kubeExtClient *apiextensionsclient.Clientset) (*NetworkController, error) { +func NewNetworkController(kubeClient *kubernetes.Clientset, osClient *openstack.Client, kubeExtClient *apiextensionsclient.Clientset) (*NetworkController, error) { // initialize CRD if it does not exist _, err := kubecrd.CreateNetworkCRD(kubeExtClient) if err != nil && !apierrors.IsAlreadyExists(err) { @@ -66,6 +82,7 @@ func NewNetworkController(osClient *openstack.Client, kubeExtClient *apiextensio apiv1.NamespaceAll, fields.Everything()) networkController := &NetworkController{ + k8sclient: kubeClient, kubeCRDClient: osClient.CRDClient, driver: osClient, } @@ -102,7 +119,23 @@ func (c *NetworkController) onAdd(obj interface{}) { // This will: // 1. Create Network in Neutron // 2. Update Network CRD object status to Active or Failed - c.addNetworkToDriver(networkCopy) + err = c.addNetworkToDriver(networkCopy) + if err != nil { + glog.Errorf("Add network to driver failed: %v", err) + return + } + + // create kube-dns in this namespace. + namespace := networkCopy.Namespace + if err := c.createKubeDNSDeployment(namespace); err != nil { + glog.Errorf("Create kube-dns deployment failed: %v", err) + return + } + + if err := c.createKubeDNSService(namespace); err != nil { + glog.Errorf("Create kube-dns service failed: %v", err) + return + } } func (c *NetworkController) onUpdate(oldObj, newObj interface{}) { @@ -110,16 +143,113 @@ func (c *NetworkController) onUpdate(oldObj, newObj interface{}) { } func (c *NetworkController) onDelete(obj interface{}) { - if net, ok := obj.(*crv1.Network); ok { - glog.V(4).Infof("NetworkController: network %s deleted", net.Name) - if net.Spec.NetworkID == "" { - networkName := util.BuildNetworkName(net.GetNamespace(), net.GetName()) - err := c.driver.DeleteNetwork(networkName) - if err != nil { - glog.Errorf("NetworkController: delete network %s failed in networkprovider: %v", networkName, err) - } else { - glog.V(4).Infof("NetworkController: network %s deleted in networkprovider", networkName) - } + net, ok := obj.(*crv1.Network) + if !ok { + glog.Warningf("Receiving an unkown object: %v", obj) + return + } + + glog.V(4).Infof("NetworkController: network %s deleted", net.Name) + + // Delete kube-dns deployment. + if err := c.deleteDeployment(net.Namespace, "kube-dns"); err != nil { + glog.Warningf("error on deleting kube-dns deployment: %v", err) + } + // Delete kube-dns services for non-system namespaces. + if !util.IsSystemNamespace(net.Namespace) { + if err := c.k8sclient.CoreV1Client.Services(net.Namespace).Delete("kube-dns", apismetav1.NewDeleteOptions(0)); err != nil { + glog.Warningf("error on deleting kube-dns service: %v", err) + } + } + + // Delete neutron network created by stackube. + if net.Spec.NetworkID == "" { + networkName := util.BuildNetworkName(net.GetNamespace(), net.GetName()) + err := c.driver.DeleteNetwork(networkName) + if err != nil { + glog.Errorf("NetworkController: delete network %s failed in networkprovider: %v", networkName, err) + } else { + glog.V(4).Infof("NetworkController: network %s deleted in networkprovider", networkName) } } } + +func (c *NetworkController) createKubeDNSDeployment(namespace string) error { + tempArgs := struct{ Namespace, DNSDomain, KubeDNSImage, DNSMasqImage, SidecarImage, KubernetesHost, KubernetesPort string }{ + Namespace: namespace, + DNSDomain: defaultDNSDomain, + KubeDNSImage: defaultKubeDNSImage, + DNSMasqImage: defaultDNSMasqImage, + SidecarImage: defaultSideCarImage, + } + if host := os.Getenv("KUBERNETES_SERVICE_HOST"); host != "" { + tempArgs.KubernetesHost = host + } + if port := os.Getenv("KUBERNETES_SERVICE_PORT"); port != "" { + tempArgs.KubernetesPort = port + } + dnsDeploymentBytes, err := parseTemplate(kubeDNSDeployment, tempArgs) + kubeDNSDeploy := &v1beta1.Deployment{} + if err = kuberuntime.DecodeInto(scheme.Codecs.UniversalDecoder(), dnsDeploymentBytes, kubeDNSDeploy); err != nil { + return fmt.Errorf("unable to decode kube-dns deployment %v", err) + } + _, err = c.k8sclient.ExtensionsV1beta1Client.Deployments(namespace).Create(kubeDNSDeploy) + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("unable to create a new kube-dns deployment: %v", err) + } + + if _, err = c.k8sclient.ExtensionsV1beta1Client.Deployments(namespace).Update(kubeDNSDeploy); err != nil { + return fmt.Errorf("unable to update the kube-dns deployment: %v", err) + } + } + + return nil +} + +func (c *NetworkController) deleteDeployment(namespace, name string) error { + if err := c.k8sclient.ExtensionsV1beta1Client.Deployments(namespace).Delete(name, apismetav1.NewDeleteOptions(0)); err != nil { + return err + } + + err := wait.Poll(500*time.Millisecond, 60*time.Second, func() (bool, error) { + _, err := c.k8sclient.ExtensionsV1beta1Client.Deployments(namespace).Get(name, apismetav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return true, nil + } + + return false, err + } + + return false, nil + }) + if err != nil { + return err + } + + return nil +} + +func (c *NetworkController) createKubeDNSService(namespace string) error { + tempArgs := struct{ Namespace string }{ + Namespace: namespace, + } + dnsServiceBytes, err := parseTemplate(kubeDNSService, tempArgs) + dnsService := &apiv1.Service{} + if err = kuberuntime.DecodeInto(scheme.Codecs.UniversalDecoder(), dnsServiceBytes, dnsService); err != nil { + return fmt.Errorf("unable to decode kube-dns service %v", err) + } + _, err = c.k8sclient.CoreV1Client.Services(namespace).Create(dnsService) + if err != nil { + if !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("unable to create a new kube-dns service: %v", err) + } + + if _, err = c.k8sclient.CoreV1Client.Services(namespace).Update(dnsService); err != nil { + return fmt.Errorf("unable to update the kube-dns service: %v", err) + } + } + + return nil +} diff --git a/pkg/network-controller/network_controller_helper.go b/pkg/network-controller/network_controller_helper.go index 0d3d4d7..a93b8df 100644 --- a/pkg/network-controller/network_controller_helper.go +++ b/pkg/network-controller/network_controller_helper.go @@ -17,6 +17,9 @@ limitations under the License. package network import ( + "bytes" + "fmt" + "html/template" "time" crv1 "git.openstack.org/openstack/stackube/pkg/apis/v1" @@ -32,7 +35,7 @@ const ( subnetSuffix = "subnet" ) -func (c *NetworkController) addNetworkToDriver(kubeNetwork *crv1.Network) { +func (c *NetworkController) addNetworkToDriver(kubeNetwork *crv1.Network) error { // The tenant name is the same with namespace, let's get tenantID by tenantName tenantName := kubeNetwork.GetNamespace() tenantID, err := c.driver.GetTenantIDFromName(tenantName) @@ -55,8 +58,7 @@ func (c *NetworkController) addNetworkToDriver(kubeNetwork *crv1.Network) { }) } if err != nil || tenantID == "" { - glog.Errorf("failed to fetch tenantID for tenantName: %v, error: %v abort! \n", tenantName, err) - return + return fmt.Errorf("failed to fetch tenantID for tenantName: %v, error: %v abort! \n", tenantName, err) } networkName := util.BuildNetworkName(tenantName, kubeNetwork.GetName()) @@ -84,12 +86,12 @@ func (c *NetworkController) addNetworkToDriver(kubeNetwork *crv1.Network) { check, err := c.driver.CheckTenantID(driverNetwork.TenantID) if err != nil { glog.Errorf("[NetworkController]: check tenantID failed: %v", err) + return err } if !check { - glog.Warningf("[NetworkController]: tenantID %s doesn't exist in network provider", driverNetwork.TenantID) kubeNetwork.Status.State = crv1.NetworkFailed c.kubeCRDClient.UpdateNetwork(kubeNetwork) - return + return fmt.Errorf("tenantID %s doesn't exist in network provider", driverNetwork.TenantID) } // Check if provider network id exist @@ -124,4 +126,18 @@ func (c *NetworkController) addNetworkToDriver(kubeNetwork *crv1.Network) { kubeNetwork.Status.State = newNetworkStatus c.kubeCRDClient.UpdateNetwork(kubeNetwork) + return nil +} + +func parseTemplate(strtmpl string, obj interface{}) ([]byte, error) { + var buf bytes.Buffer + tmpl, err := template.New("template").Parse(strtmpl) + if err != nil { + return nil, fmt.Errorf("error when parsing template: %v", err) + } + err = tmpl.Execute(&buf, obj) + if err != nil { + return nil, fmt.Errorf("error when executing template: %v", err) + } + return buf.Bytes(), nil } diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 0915f0b..0fadb5f 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -19,6 +19,7 @@ package proxy import ( "bytes" "fmt" + "net" "strconv" "strings" "sync" @@ -26,6 +27,8 @@ import ( "time" "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" @@ -47,6 +50,7 @@ const ( ) type Proxier struct { + clusterDNS string kubeClientset *kubernetes.Clientset osClient *openstack.Client factory informers.SharedInformerFactory @@ -97,11 +101,17 @@ func NewProxier(kubeConfig, openstackConfig string) (*Proxier, error) { return nil, fmt.Errorf("failed to build clientset: %v", err) } + clusterDNS, err := getClusterDNS(clientset) + if err != nil { + return nil, fmt.Errorf("failed to get cluster DNS: %v", err) + } + factory := informers.NewSharedInformerFactory(clientset, defaultResyncPeriod) proxier := &Proxier{ kubeClientset: clientset, osClient: osClient, factory: factory, + clusterDNS: clusterDNS, endpointsChanges: newEndpointsChangeMap(""), serviceChanges: newServiceChangeMap(), namespaceChanges: newNamespaceChangeMap(), @@ -562,7 +572,7 @@ func (p *Proxier) syncProxyRules() { "-A", ChainSKPrerouting, "-m", "comment", "--comment", svcNameString, "-m", protocol, "-p", protocol, - "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), + "-d", fmt.Sprintf("%s/32", p.getServiceIP(svcInfo)), "--dport", strconv.Itoa(svcInfo.port), } @@ -589,3 +599,36 @@ func (p *Proxier) syncProxyRules() { } } } + +func (p *Proxier) getServiceIP(serviceInfo *serviceInfo) string { + if serviceInfo.name == "kube-dns" { + return p.clusterDNS + } + + return serviceInfo.clusterIP.String() +} + +func getClusterDNS(client *kubernetes.Clientset) (string, error) { + dnssvc, err := client.CoreV1().Services(metav1.NamespaceSystem).Get("kube-dns", metav1.GetOptions{}) + if err == nil && len(dnssvc.Spec.ClusterIP) > 0 { + return dnssvc.Spec.ClusterIP, nil + } + + if apierrors.IsNotFound(err) { + // get from default namespace. + k8ssvc, err := client.CoreV1().Services(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{}) + if err != nil { + return "", fmt.Errorf("couldn't fetch information about the kubernetes service: %v", err) + } + + // Build an IP by taking the kubernetes service's clusterIP and appending a "0" and checking that it's valid + dnsIP := net.ParseIP(fmt.Sprintf("%s0", k8ssvc.Spec.ClusterIP)) + if dnsIP == nil { + return "", fmt.Errorf("could not parse dns ip %q: %v", dnsIP, err) + } + + return dnsIP.String(), nil + } + + return "", err +} diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index f3d8ee5..fb9b4b2 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -44,6 +44,7 @@ func (spn servicePortName) String() string { // internal struct for string service information type serviceInfo struct { + name string clusterIP net.IP port int protocol v1.Protocol @@ -104,6 +105,7 @@ func (e *endpointsInfo) String() string { func newServiceInfo(svcPortName servicePortName, port *v1.ServicePort, service *v1.Service) *serviceInfo { onlyNodeLocalEndpoints := false info := &serviceInfo{ + name: service.Name, clusterIP: net.ParseIP(service.Spec.ClusterIP), port: int(port.Port), protocol: port.Protocol, @@ -254,9 +256,6 @@ func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts sets.Str _, exists := (*sm)[svcPortName] if exists { glog.V(1).Infof("Removing service port %q", svcPortName) - //if info.protocol == v1.ProtocolUDP { - // staleServices.Insert(info.clusterIP.String()) - //} delete(*sm, svcPortName) } else { glog.Errorf("Service port %q removed, but doesn't exists", svcPortName)