![Pengfei Ni](/assets/img/avatar_default.png)
Adds kube-dns deployments for each tenant. And also adds loadbalancer for cluster DNS in each namespace. Implements: blueprint dns Change-Id: I2ee00806431cc4a3dfdf4c3c49b54892d4c2c98b Signed-off-by: Pengfei Ni <feiskyer@gmail.com>
303 lines
8.5 KiB
Go
303 lines
8.5 KiB
Go
/*
|
|
Copyright (c) 2017 OpenStack Foundation.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package proxy
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"reflect"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/golang/glog"
|
|
|
|
"git.openstack.org/openstack/stackube/pkg/util"
|
|
"k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
)
|
|
|
|
// servicePortName carries a namespace + name + portname. This is the unique
|
|
// identfier for a load-balanced service.
|
|
type servicePortName struct {
|
|
types.NamespacedName
|
|
Port string
|
|
}
|
|
|
|
func (spn servicePortName) String() string {
|
|
return fmt.Sprintf("%s:%s", spn.NamespacedName.String(), spn.Port)
|
|
}
|
|
|
|
// internal struct for string service information
|
|
type serviceInfo struct {
|
|
name string
|
|
clusterIP net.IP
|
|
port int
|
|
protocol v1.Protocol
|
|
nodePort int
|
|
serviceType v1.ServiceType
|
|
loadBalancerStatus v1.LoadBalancerStatus
|
|
sessionAffinityType v1.ServiceAffinity
|
|
stickyMaxAgeMinutes int
|
|
externalIPs []string
|
|
loadBalancerSourceRanges []string
|
|
onlyNodeLocalEndpoints bool
|
|
healthCheckNodePort int
|
|
// The following fields are computed and stored for performance reasons.
|
|
serviceNameString string
|
|
servicePortChainName string
|
|
serviceFirewallChainName string
|
|
serviceLBChainName string
|
|
}
|
|
|
|
// internal struct for endpoints information
|
|
type endpointsInfo struct {
|
|
endpoint string
|
|
isLocal bool
|
|
// The following fields we lazily compute and store here for performance
|
|
// reasons. If the protocol is the same as you expect it to be, then the
|
|
// chainName can be reused, otherwise it should be recomputed.
|
|
protocol string
|
|
chainName string
|
|
}
|
|
|
|
type namespaceInfo struct {
|
|
network string
|
|
router string
|
|
}
|
|
|
|
// Returns just the IP part of the endpoint.
|
|
func (e *endpointsInfo) IPPart() string {
|
|
if index := strings.Index(e.endpoint, ":"); index != -1 {
|
|
return e.endpoint[0:index]
|
|
}
|
|
return e.endpoint
|
|
}
|
|
|
|
// Returns the endpoint chain name for a given endpointsInfo.
|
|
func (e *endpointsInfo) endpointChain(svcNameString, protocol string) string {
|
|
if e.protocol != protocol {
|
|
e.protocol = protocol
|
|
e.chainName = servicePortEndpointChainName(svcNameString, protocol, e.endpoint)
|
|
}
|
|
return e.chainName
|
|
}
|
|
|
|
func (e *endpointsInfo) String() string {
|
|
return fmt.Sprintf("%v", *e)
|
|
}
|
|
|
|
// returns a new serviceInfo struct
|
|
func newServiceInfo(svcPortName servicePortName, port *v1.ServicePort, service *v1.Service) *serviceInfo {
|
|
onlyNodeLocalEndpoints := false
|
|
info := &serviceInfo{
|
|
name: service.Name,
|
|
clusterIP: net.ParseIP(service.Spec.ClusterIP),
|
|
port: int(port.Port),
|
|
protocol: port.Protocol,
|
|
nodePort: int(port.NodePort),
|
|
serviceType: service.Spec.Type,
|
|
// Deep-copy in case the service instance changes
|
|
loadBalancerStatus: *util.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer),
|
|
sessionAffinityType: service.Spec.SessionAffinity,
|
|
stickyMaxAgeMinutes: 180,
|
|
externalIPs: make([]string, len(service.Spec.ExternalIPs)),
|
|
loadBalancerSourceRanges: make([]string, len(service.Spec.LoadBalancerSourceRanges)),
|
|
onlyNodeLocalEndpoints: onlyNodeLocalEndpoints,
|
|
}
|
|
copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges)
|
|
copy(info.externalIPs, service.Spec.ExternalIPs)
|
|
|
|
if needsHealthCheck(service) {
|
|
p := getServiceHealthCheckNodePort(service)
|
|
if p == 0 {
|
|
glog.Errorf("Service %q has no healthcheck nodeport", svcPortName.NamespacedName.String())
|
|
} else {
|
|
info.healthCheckNodePort = int(p)
|
|
}
|
|
}
|
|
|
|
// Store the following for performance reasons.
|
|
protocol := strings.ToLower(string(info.protocol))
|
|
info.serviceNameString = svcPortName.String()
|
|
info.servicePortChainName = servicePortChainName(info.serviceNameString, protocol)
|
|
info.serviceFirewallChainName = serviceFirewallChainName(info.serviceNameString, protocol)
|
|
info.serviceLBChainName = serviceLBChainName(info.serviceNameString, protocol)
|
|
|
|
return info
|
|
}
|
|
|
|
type endpointsChange struct {
|
|
previous proxyEndpointsMap
|
|
current proxyEndpointsMap
|
|
}
|
|
|
|
type endpointsChangeMap struct {
|
|
lock sync.Mutex
|
|
hostname string
|
|
items map[types.NamespacedName]*endpointsChange
|
|
}
|
|
|
|
type serviceChange struct {
|
|
previous proxyServiceMap
|
|
current proxyServiceMap
|
|
}
|
|
|
|
type serviceChangeMap struct {
|
|
lock sync.Mutex
|
|
items map[types.NamespacedName]*serviceChange
|
|
}
|
|
|
|
type namespaceChange struct {
|
|
previous *namespaceInfo
|
|
current *namespaceInfo
|
|
}
|
|
|
|
type namespaceChangeMap struct {
|
|
lock sync.Mutex
|
|
items map[string]*namespaceChange
|
|
}
|
|
|
|
type updateEndpointMapResult struct {
|
|
hcEndpoints map[types.NamespacedName]int
|
|
staleEndpoints map[endpointServicePair]bool
|
|
staleServiceNames map[servicePortName]bool
|
|
}
|
|
|
|
type updateServiceMapResult struct {
|
|
hcServices map[types.NamespacedName]uint16
|
|
staleServices sets.String
|
|
}
|
|
|
|
type proxyServiceMap map[servicePortName]*serviceInfo
|
|
type proxyEndpointsMap map[servicePortName][]*endpointsInfo
|
|
|
|
func newEndpointsChangeMap(hostname string) endpointsChangeMap {
|
|
return endpointsChangeMap{
|
|
hostname: hostname,
|
|
items: make(map[types.NamespacedName]*endpointsChange),
|
|
}
|
|
}
|
|
|
|
func (ecm *endpointsChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Endpoints) bool {
|
|
ecm.lock.Lock()
|
|
defer ecm.lock.Unlock()
|
|
|
|
change, exists := ecm.items[*namespacedName]
|
|
if !exists {
|
|
change = &endpointsChange{}
|
|
change.previous = endpointsToEndpointsMap(previous, ecm.hostname)
|
|
ecm.items[*namespacedName] = change
|
|
}
|
|
change.current = endpointsToEndpointsMap(current, ecm.hostname)
|
|
if reflect.DeepEqual(change.previous, change.current) {
|
|
delete(ecm.items, *namespacedName)
|
|
}
|
|
return len(ecm.items) > 0
|
|
}
|
|
|
|
func newServiceChangeMap() serviceChangeMap {
|
|
return serviceChangeMap{
|
|
items: make(map[types.NamespacedName]*serviceChange),
|
|
}
|
|
}
|
|
|
|
func (scm *serviceChangeMap) update(namespacedName *types.NamespacedName, previous, current *v1.Service) bool {
|
|
scm.lock.Lock()
|
|
defer scm.lock.Unlock()
|
|
|
|
change, exists := scm.items[*namespacedName]
|
|
if !exists {
|
|
change = &serviceChange{}
|
|
change.previous = serviceToServiceMap(previous)
|
|
scm.items[*namespacedName] = change
|
|
}
|
|
change.current = serviceToServiceMap(current)
|
|
if reflect.DeepEqual(change.previous, change.current) {
|
|
delete(scm.items, *namespacedName)
|
|
}
|
|
return len(scm.items) > 0
|
|
}
|
|
|
|
func (sm *proxyServiceMap) merge(other proxyServiceMap) sets.String {
|
|
existingPorts := sets.NewString()
|
|
for svcPortName, info := range other {
|
|
existingPorts.Insert(svcPortName.Port)
|
|
_, exists := (*sm)[svcPortName]
|
|
if !exists {
|
|
glog.V(1).Infof("Adding new service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
|
|
} else {
|
|
glog.V(1).Infof("Updating existing service port %q at %s:%d/%s", svcPortName, info.clusterIP, info.port, info.protocol)
|
|
}
|
|
(*sm)[svcPortName] = info
|
|
}
|
|
return existingPorts
|
|
}
|
|
|
|
func (sm *proxyServiceMap) unmerge(other proxyServiceMap, existingPorts sets.String) {
|
|
for svcPortName := range other {
|
|
if existingPorts.Has(svcPortName.Port) {
|
|
continue
|
|
}
|
|
_, exists := (*sm)[svcPortName]
|
|
if exists {
|
|
glog.V(1).Infof("Removing service port %q", svcPortName)
|
|
delete(*sm, svcPortName)
|
|
} else {
|
|
glog.Errorf("Service port %q removed, but doesn't exists", svcPortName)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (em proxyEndpointsMap) merge(other proxyEndpointsMap) {
|
|
for svcPortName := range other {
|
|
em[svcPortName] = other[svcPortName]
|
|
}
|
|
}
|
|
|
|
func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) {
|
|
for svcPortName := range other {
|
|
delete(em, svcPortName)
|
|
}
|
|
}
|
|
|
|
func newNamespaceChangeMap() namespaceChangeMap {
|
|
return namespaceChangeMap{
|
|
items: make(map[string]*namespaceChange),
|
|
}
|
|
}
|
|
|
|
func (ncm *namespaceChangeMap) update(name string, previous, current *v1.Namespace) bool {
|
|
ncm.lock.Lock()
|
|
defer ncm.lock.Unlock()
|
|
|
|
change, exists := ncm.items[name]
|
|
if !exists {
|
|
change = &namespaceChange{}
|
|
change.previous = &namespaceInfo{network: name}
|
|
ncm.items[name] = change
|
|
}
|
|
if current != nil {
|
|
change.current = &namespaceInfo{network: name, router: change.previous.router}
|
|
}
|
|
|
|
if previous != nil && current != nil {
|
|
delete(ncm.items, name)
|
|
}
|
|
return len(ncm.items) > 0
|
|
}
|