Support Router Advertisement Daemon (radvd) for IPv6

Launch radvd from inside l3 agent when any router port has an IPv6 address. If
slaac is used for IPv6 addresses, advertise the prefix associated with the port;
otherwise, advertise default route only.

Change-Id: Ib8b0b3e71f7af9afa769c41357c66f88f4326807
Implements: blueprint neutron-ipv6-radvd-ra
Co-Authored-By: Henry Gessau <gessau@cisco.com>
This commit is contained in:
Robert Li 2014-07-11 11:02:19 -04:00 committed by Henry Gessau
parent 8ca05af369
commit b092899b44
8 changed files with 344 additions and 88 deletions

View File

@ -14,6 +14,7 @@ arping: CommandFilter, arping, root
# l3_agent
sysctl: CommandFilter, sysctl, root
route: CommandFilter, route, root
radvd: CommandFilter, radvd, root
# metadata proxy
metadata_proxy: CommandFilter, neutron-ns-metadata-proxy, root
@ -26,6 +27,8 @@ metadata_proxy_local_quantum: CommandFilter, /usr/local/bin/quantum-ns-metadata-
kill_metadata: KillFilter, root, /usr/bin/python, -9
kill_metadata7: KillFilter, root, /usr/bin/python2.7, -9
kill_metadata6: KillFilter, root, /usr/bin/python2.6, -9
kill_radvd_usr: KillFilter, root, /usr/sbin/radvd, -9, -HUP
kill_radvd: KillFilter, root, /sbin/radvd, -9, -HUP
# ip_lib
ip: IpFilter, ip, root

View File

@ -29,6 +29,7 @@ from neutron.agent.linux import interface
from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_manager
from neutron.agent.linux import ovs_lib # noqa
from neutron.agent.linux import ra
from neutron.agent import rpc as agent_rpc
from neutron.common import config as common_config
from neutron.common import constants as l3_constants
@ -427,6 +428,7 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
if self.conf.enable_metadata_proxy:
self._destroy_metadata_proxy(ns[len(NS_PREFIX):], ns)
ra.disable_ipv6_ra(ns[len(NS_PREFIX):], ns, self.root_helper)
try:
self._destroy_router_namespace(ns)
except RuntimeError:
@ -579,15 +581,31 @@ class L3NATAgent(firewall_l3_agent.FWaaSL3AgentRpcCallback, manager.Manager):
p['id'] not in existing_port_ids]
old_ports = [p for p in ri.internal_ports if
p['id'] not in current_port_ids]
new_ipv6_port = False
old_ipv6_port = False
for p in new_ports:
self._set_subnet_info(p)
self.internal_network_added(ri, p['network_id'], p['id'],
p['ip_cidr'], p['mac_address'])
ri.internal_ports.append(p)
if (not new_ipv6_port and
netaddr.IPNetwork(p['subnet']['cidr']).version == 6):
new_ipv6_port = True
for p in old_ports:
self.internal_network_removed(ri, p['id'], p['ip_cidr'])
ri.internal_ports.remove(p)
if (not old_ipv6_port and
netaddr.IPNetwork(p['subnet']['cidr']).version == 6):
old_ipv6_port = True
if new_ipv6_port or old_ipv6_port:
ra.enable_ipv6_ra(ri.router_id,
ri.ns_name,
internal_ports,
self.get_internal_device_name,
self.root_helper)
existing_devices = self._get_existing_devices(ri)
current_internal_devs = set([n for n in existing_devices

View File

@ -14,8 +14,6 @@
#
# @author: Mark McClain, DreamHost
import os
from oslo.config import cfg
from neutron.agent.linux import ip_lib
@ -38,25 +36,40 @@ class ProcessManager(object):
Note: The manager expects uuid to be in cmdline.
"""
def __init__(self, conf, uuid, root_helper='sudo', namespace=None):
def __init__(self, conf, uuid, root_helper='sudo',
namespace=None, service=None):
self.conf = conf
self.uuid = uuid
self.root_helper = root_helper
self.namespace = namespace
if service:
self.service_pid_fname = 'pid.' + service
else:
self.service_pid_fname = 'pid'
def enable(self, cmd_callback):
def enable(self, cmd_callback, reload_cfg=False):
if not self.active:
cmd = cmd_callback(self.get_pid_file_name(ensure_pids_dir=True))
ip_wrapper = ip_lib.IPWrapper(self.root_helper, self.namespace)
ip_wrapper.netns.execute(cmd)
elif reload_cfg:
self.reload_cfg()
def disable(self):
def reload_cfg(self):
self.disable('HUP')
def disable(self, sig='9'):
pid = self.pid
if self.active:
cmd = ['kill', '-9', pid]
cmd = ['kill', '-%s' % (sig), pid]
utils.execute(cmd, self.root_helper)
# In the case of shutting down, remove the pid file
if sig == '9':
utils.remove_conf_file(self.conf.external_pids,
self.uuid,
self.service_pid_fname)
elif pid:
LOG.debug(_('Process for %(uuid)s pid %(pid)d is stale, ignoring '
'command'), {'uuid': self.uuid, 'pid': pid})
@ -65,28 +78,18 @@ class ProcessManager(object):
def get_pid_file_name(self, ensure_pids_dir=False):
"""Returns the file name for a given kind of config file."""
pids_dir = os.path.abspath(os.path.normpath(self.conf.external_pids))
if ensure_pids_dir and not os.path.isdir(pids_dir):
os.makedirs(pids_dir, 0o755)
return os.path.join(pids_dir, self.uuid + '.pid')
return utils.get_conf_file_name(self.conf.external_pids,
self.uuid,
self.service_pid_fname,
ensure_pids_dir)
@property
def pid(self):
"""Last known pid for this external process spawned for this uuid."""
file_name = self.get_pid_file_name()
msg = _('Error while reading %s')
try:
with open(file_name, 'r') as f:
return int(f.read())
except IOError:
msg = _('Unable to access %s')
except ValueError:
msg = _('Unable to convert value in %s')
LOG.debug(msg, file_name)
return None
return utils.get_value_from_conf_file(self.conf.external_pids,
self.uuid,
self.service_pid_fname,
int)
@property
def active(self):

122
neutron/agent/linux/ra.py Normal file
View File

@ -0,0 +1,122 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from oslo.config import cfg
import six
from neutron.agent.linux import external_process
from neutron.agent.linux import utils
from neutron.common import constants
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
OPTS = [
cfg.StrOpt('ra_confs',
default='$state_path/ra',
help=_('Location to store IPv6 RA config files')),
]
cfg.CONF.register_opts(OPTS)
prefix_fmt = """interface %s
{
AdvSendAdvert on;
MinRtrAdvInterval 3;
MaxRtrAdvInterval 10;
prefix %s
{
AdvOnLink on;
AdvAutonomous on;
};
};
"""
default_fmt = """interface %s
{
AdvSendAdvert on;
MinRtrAdvInterval 3;
MaxRtrAdvInterval 10;
};
"""
def _is_slaac(ra_mode):
return (ra_mode == constants.IPV6_SLAAC or
ra_mode == constants.DHCPV6_STATELESS)
def _generate_radvd_conf(router_id, router_ports, dev_name_helper):
radvd_conf = utils.get_conf_file_name(cfg.CONF.ra_confs,
router_id,
'radvd.conf',
True)
buf = six.StringIO()
for p in router_ports:
if netaddr.IPNetwork(p['subnet']['cidr']).version == 6:
interface_name = dev_name_helper(p['id'])
if _is_slaac(p['subnet']['ipv6_ra_mode']):
conf_str = prefix_fmt % (interface_name,
p['subnet']['cidr'])
else:
conf_str = default_fmt % interface_name
buf.write('%s' % conf_str)
utils.replace_file(radvd_conf, buf.getvalue())
return radvd_conf
def _spawn_radvd(router_id, radvd_conf, router_ns, root_helper):
def callback(pid_file):
radvd_cmd = ['radvd',
'-C', '%s' % radvd_conf,
'-p', '%s' % pid_file]
return radvd_cmd
radvd = external_process.ProcessManager(cfg.CONF,
router_id,
root_helper,
router_ns,
'radvd')
radvd.enable(callback, True)
LOG.debug("radvd enabled for router %s", router_id)
def enable_ipv6_ra(router_id, router_ns, router_ports,
dev_name_helper, root_helper):
for p in router_ports:
if netaddr.IPNetwork(p['subnet']['cidr']).version == 6:
break
else:
# Kill the daemon if it's running
disable_ipv6_ra(router_id, router_ns, root_helper)
return
LOG.debug("Enable IPv6 RA for router %s", router_id)
radvd_conf = _generate_radvd_conf(router_id, router_ports, dev_name_helper)
_spawn_radvd(router_id, radvd_conf, router_ns, root_helper)
def disable_ipv6_ra(router_id, router_ns, root_helper):
radvd = external_process.ProcessManager(cfg.CONF,
router_id,
root_helper,
router_ns,
'radvd')
radvd.disable()
utils.remove_conf_files(cfg.CONF.ra_confs, router_id)
LOG.debug("radvd disabled for router %s", router_id)

View File

@ -18,6 +18,7 @@
import fcntl
import os
import shlex
import shutil
import socket
import struct
import tempfile
@ -126,3 +127,51 @@ def find_child_pids(pid):
ctxt.reraise = False
return []
return [x.strip() for x in raw_pids.split('\n') if x.strip()]
def _get_conf_dir(cfg_root, uuid, ensure_conf_dir):
confs_dir = os.path.abspath(os.path.normpath(cfg_root))
conf_dir = os.path.join(confs_dir, uuid)
if ensure_conf_dir:
if not os.path.isdir(conf_dir):
os.makedirs(conf_dir, 0o755)
return conf_dir
def get_conf_file_name(cfg_root, uuid, cfg_file, ensure_conf_dir=False):
"""Returns the file name for a given kind of config file."""
conf_dir = _get_conf_dir(cfg_root, uuid, ensure_conf_dir)
return os.path.join(conf_dir, cfg_file)
def get_value_from_conf_file(cfg_root, uuid, cfg_file, converter=None):
"""A helper function to read a value from one of a config file."""
file_name = get_conf_file_name(cfg_root, uuid, cfg_file)
msg = _('Error while reading %s')
try:
with open(file_name, 'r') as f:
try:
return converter and converter(f.read()) or f.read()
except ValueError:
msg = _('Unable to convert value in %s')
except IOError:
msg = _('Unable to access %s')
LOG.debug(msg % file_name)
return None
def remove_conf_files(cfg_root, uuid):
conf_dir = _get_conf_dir(cfg_root, uuid, False)
shutil.rmtree(conf_dir, ignore_errors=True)
def remove_conf_file(cfg_root, uuid, cfg_file):
"""Remove a config file. Remove the directory if this is the last file."""
conf_file = get_conf_file_name(cfg_root, uuid, cfg_file)
if os.path.exists(conf_file):
os.unlink(conf_file)
conf_dir = _get_conf_dir(cfg_root, uuid, False)
if not os.listdir(conf_dir):
shutil.rmtree(conf_dir, ignore_errors=True)

View File

@ -1007,7 +1007,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
network_ids = set(p['network_id'] for p, _ in each_port_with_ip())
filters = {'network_id': [id for id in network_ids]}
fields = ['id', 'cidr', 'gateway_ip', 'network_id']
fields = ['id', 'cidr', 'gateway_ip',
'network_id', 'ipv6_ra_mode']
subnets_by_network = dict((id, []) for id in network_ids)
for subnet in self._core_plugin.get_subnets(context, filters, fields):
@ -1018,7 +1019,8 @@ class L3_NAT_db_mixin(l3.RouterPluginBase):
for subnet in subnets_by_network[port['network_id']]:
subnet_info = {'id': subnet['id'],
'cidr': subnet['cidr'],
'gateway_ip': subnet['gateway_ip']}
'gateway_ip': subnet['gateway_ip'],
'ipv6_ra_mode': subnet['ipv6_ra_mode']}
if subnet['id'] == fixed_ip['subnet_id']:
port['subnet'] = subnet_info

View File

@ -121,7 +121,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
def setUp(self):
super(TestBasicRouterOperations, self).setUp()
self.conf = cfg.ConfigOpts()
self.conf = agent_config.setup_conf()
self.conf.register_opts(base_config.core_opts)
self.conf.register_opts(l3_agent.L3NATAgent.OPTS)
agent_config.register_interface_driver_opts_helper(self.conf)
@ -141,6 +141,10 @@ class TestBasicRouterOperations(base.BaseTestCase):
'neutron.agent.linux.utils.execute')
self.utils_exec = self.utils_exec_p.start()
self.utils_replace_file_p = mock.patch(
'neutron.agent.linux.utils.replace_file')
self.utils_replace_file = self.utils_replace_file_p.start()
self.external_process_p = mock.patch(
'neutron.agent.linux.external_process.ProcessManager')
self.external_process = self.external_process_p.start()
@ -441,6 +445,38 @@ class TestBasicRouterOperations(base.BaseTestCase):
else:
self.assertIn(r.rule, expected_rules)
@staticmethod
def _router_append_interface(router, count=1, ip_version=4,
ra_mode=None, addr_mode=None):
if ip_version == 4:
ip_pool = '35.4.%i.4'
cidr_pool = '35.4.%i.0/24'
gw_pool = '35.4.%i.1'
elif ip_version == 6:
ip_pool = 'fd01:%x::6'
cidr_pool = 'fd01:%x::/64'
gw_pool = 'fd01:%x::1'
else:
raise ValueError("Invalid ip_version: %s" % ip_version)
interfaces = router[l3_constants.INTERFACE_KEY]
current = sum(
[netaddr.IPNetwork(p['subnet']['cidr']).version == ip_version
for p in interfaces])
for i in range(current, current + count):
interfaces.append(
{'id': _uuid(),
'network_id': _uuid(),
'admin_state_up': True,
'fixed_ips': [{'ip_address': ip_pool % i,
'subnet_id': _uuid()}],
'mac_address': 'ca:fe:de:ad:be:ef',
'subnet': {'cidr': cidr_pool % i,
'gateway_ip': gw_pool % i,
'ipv6_ra_mode': ra_mode,
'ipv6_address_mode': addr_mode}})
def _prepare_router_data(self, ip_version=4,
enable_snat=None, num_internal_ports=1):
if ip_version == 4:
@ -451,6 +487,8 @@ class TestBasicRouterOperations(base.BaseTestCase):
ip_addr = 'fd00::4'
cidr = 'fd00::/64'
gateway_ip = 'fd00::1'
else:
raise ValueError("Invalid ip_version: %s" % ip_version)
router_id = _uuid()
ex_gw_port = {'id': _uuid(),
@ -459,22 +497,15 @@ class TestBasicRouterOperations(base.BaseTestCase):
'subnet_id': _uuid()}],
'subnet': {'cidr': cidr,
'gateway_ip': gateway_ip}}
int_ports = []
for i in range(num_internal_ports):
int_ports.append({'id': _uuid(),
'network_id': _uuid(),
'admin_state_up': True,
'fixed_ips': [{'ip_address': '35.4.%s.4' % i,
'subnet_id': _uuid()}],
'mac_address': 'ca:fe:de:ad:be:ef',
'subnet': {'cidr': '35.4.%s.0/24' % i,
'gateway_ip': '35.4.%s.1' % i}})
router = {
'id': router_id,
l3_constants.INTERFACE_KEY: int_ports,
l3_constants.INTERFACE_KEY: [],
'routes': [],
'gw_port': ex_gw_port}
self._router_append_interface(router, count=num_internal_ports,
ip_version=ip_version)
if enable_snat is not None:
router['enable_snat'] = enable_snat
return router
@ -725,15 +756,7 @@ class TestBasicRouterOperations(base.BaseTestCase):
agent.process_router(ri)
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# Add an interface and reprocess
router[l3_constants.INTERFACE_KEY].append(
{'id': _uuid(),
'network_id': _uuid(),
'admin_state_up': True,
'fixed_ips': [{'ip_address': '35.4.1.4',
'subnet_id': _uuid()}],
'mac_address': 'ca:fe:de:ad:be:ef',
'subnet': {'cidr': '35.4.1.0/24',
'gateway_ip': '35.4.1.1'}})
self._router_append_interface(router)
# Reassign the router object to RouterInfo
ri.router = router
agent.process_router(ri)
@ -772,9 +795,9 @@ class TestBasicRouterOperations(base.BaseTestCase):
self.assertFalse(external_gateway_nat_rules.called)
self.assertEqual(orig_nat_rules, new_nat_rules)
def test_process_router_ipv6_interface_added(self):
def _process_router_ipv6_interface_added(
self, router, ra_mode=None, addr_mode=None):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router = self._prepare_router_data()
ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
self.conf.use_namespaces, router=router)
agent.external_gateway_added = mock.Mock()
@ -782,23 +805,53 @@ class TestBasicRouterOperations(base.BaseTestCase):
agent.process_router(ri)
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# Add an IPv6 interface and reprocess
router[l3_constants.INTERFACE_KEY].append(
{'id': _uuid(),
'network_id': _uuid(),
'admin_state_up': True,
'fixed_ips': [{'ip_address': 'fd00::2',
'subnet_id': _uuid()}],
'mac_address': 'ca:fe:de:ad:be:ef',
'subnet': {'cidr': 'fd00::/64',
'gateway_ip': 'fd00::1'}})
self._router_append_interface(router, count=1, ip_version=6,
ra_mode=ra_mode, addr_mode=addr_mode)
# Reassign the router object to RouterInfo
ri.router = router
agent.process_router(ri)
# For some reason set logic does not work well with
# IpTablesRule instances
# IPv4 NAT rules should not be changed by adding an IPv6 interface
nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules
if r not in orig_nat_rules]
self.assertFalse(nat_rules_delta)
return ri
def _expected_call_lookup_ri_process(self, ri, process):
"""Expected call if a process is looked up in a router instance."""
return [mock.call(cfg.CONF,
ri.router['id'],
self.conf.root_helper,
ri.ns_name,
process)]
def _assert_ri_process_enabled(self, ri, process):
"""Verify that process was enabled for a router instance."""
expected_calls = self._expected_call_lookup_ri_process(ri, process)
expected_calls.append(mock.call().enable(mock.ANY, True))
self.assertEqual(expected_calls, self.external_process.mock_calls)
def _assert_ri_process_disabled(self, ri, process):
"""Verify that process was disabled for a router instance."""
expected_calls = self._expected_call_lookup_ri_process(ri, process)
expected_calls.append(mock.call().disable())
self.assertEqual(expected_calls, self.external_process.mock_calls)
def test_process_router_ipv6_interface_added(self):
router = self._prepare_router_data()
ri = self._process_router_ipv6_interface_added(router)
self._assert_ri_process_enabled(ri, 'radvd')
# Expect radvd configured without prefix
self.assertNotIn('prefix',
self.utils_replace_file.call_args[0][1].split())
def test_process_router_ipv6_slaac_interface_added(self):
router = self._prepare_router_data()
ri = self._process_router_ipv6_interface_added(
router, ra_mode=l3_constants.IPV6_SLAAC)
self._assert_ri_process_enabled(ri, 'radvd')
# Expect radvd configured with prefix
self.assertIn('prefix',
self.utils_replace_file.call_args[0][1].split())
def test_process_router_ipv6v4_interface_added(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
@ -810,28 +863,12 @@ class TestBasicRouterOperations(base.BaseTestCase):
agent.process_router(ri)
orig_nat_rules = ri.iptables_manager.ipv4['nat'].rules[:]
# Add an IPv4 and IPv6 interface and reprocess
router[l3_constants.INTERFACE_KEY].append(
{'id': _uuid(),
'network_id': _uuid(),
'admin_state_up': True,
'fixed_ips': [{'ip_address': '35.4.1.4',
'subnet_id': _uuid()}],
'mac_address': 'ca:fe:de:ad:be:ef',
'subnet': {'cidr': '35.4.1.0/24',
'gateway_ip': '35.4.1.1'}})
router[l3_constants.INTERFACE_KEY].append(
{'id': _uuid(),
'network_id': _uuid(),
'admin_state_up': True,
'fixed_ips': [{'ip_address': 'fd00::2',
'subnet_id': _uuid()}],
'mac_address': 'ca:fe:de:ad:be:ef',
'subnet': {'cidr': 'fd00::/64',
'gateway_ip': 'fd00::1'}})
self._router_append_interface(router, count=1, ip_version=4)
self._router_append_interface(router, count=1, ip_version=6)
# Reassign the router object to RouterInfo
ri.router = router
agent.process_router(ri)
self._assert_ri_process_enabled(ri, 'radvd')
# For some reason set logic does not work well with
# IpTablesRule instances
nat_rules_delta = [r for r in ri.iptables_manager.ipv4['nat'].rules
@ -862,6 +899,25 @@ class TestBasicRouterOperations(base.BaseTestCase):
# send_arp is called both times process_router is called
self.assertEqual(self.send_arp.call_count, 2)
def test_process_router_ipv6_interface_removed(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router = self._prepare_router_data()
ri = l3_agent.RouterInfo(router['id'], self.conf.root_helper,
self.conf.use_namespaces, router=router)
agent.external_gateway_added = mock.Mock()
ri.router = router
agent.process_router(ri)
# Add an IPv6 interface and reprocess
self._router_append_interface(router, count=1, ip_version=6)
agent.process_router(ri)
self._assert_ri_process_enabled(ri, 'radvd')
# Reset the calls so we can check for disable radvd
self.external_process.reset_mock()
# Remove the IPv6 interface and reprocess
del router[l3_constants.INTERFACE_KEY][1]
agent.process_router(ri)
self._assert_ri_process_disabled(ri, 'radvd')
def test_process_router_internal_network_added_unexpected_error(self):
agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
router = self._prepare_router_data()
@ -1358,7 +1414,10 @@ class TestBasicRouterOperations(base.BaseTestCase):
ns_list = agent._list_namespaces()
agent._cleanup_namespaces(ns_list, [r['id'] for r in router_list])
self.assertEqual(pm.disable.call_count, len(stale_namespace_list))
# Expect process manager to disable two processes (metadata_proxy
# and radvd) per stale namespace.
expected_pm_disables = 2 * len(stale_namespace_list)
self.assertEqual(expected_pm_disables, pm.disable.call_count)
self.assertEqual(agent._destroy_router_namespace.call_count,
len(stale_namespace_list))
expected_args = [mock.call(ns) for ns in stale_namespace_list]

View File

@ -120,27 +120,27 @@ class TestProcessManager(base.BaseTestCase):
debug.assert_called_once_with(mock.ANY, mock.ANY)
def test_get_pid_file_name_existing(self):
with mock.patch.object(ep.os.path, 'isdir') as isdir:
with mock.patch.object(ep.utils.os.path, 'isdir') as isdir:
isdir.return_value = True
manager = ep.ProcessManager(self.conf, 'uuid')
retval = manager.get_pid_file_name(ensure_pids_dir=True)
self.assertEqual(retval, '/var/path/uuid.pid')
self.assertEqual(retval, '/var/path/uuid/pid')
def test_get_pid_file_name_not_existing(self):
with mock.patch.object(ep.os.path, 'isdir') as isdir:
with mock.patch.object(ep.os, 'makedirs') as makedirs:
with mock.patch.object(ep.utils.os.path, 'isdir') as isdir:
with mock.patch.object(ep.utils.os, 'makedirs') as makedirs:
isdir.return_value = False
manager = ep.ProcessManager(self.conf, 'uuid')
retval = manager.get_pid_file_name(ensure_pids_dir=True)
self.assertEqual(retval, '/var/path/uuid.pid')
makedirs.assert_called_once_with('/var/path', 0o755)
self.assertEqual(retval, '/var/path/uuid/pid')
makedirs.assert_called_once_with('/var/path/uuid', 0o755)
def test_get_pid_file_name_default(self):
with mock.patch.object(ep.os.path, 'isdir') as isdir:
with mock.patch.object(ep.utils.os.path, 'isdir') as isdir:
isdir.return_value = True
manager = ep.ProcessManager(self.conf, 'uuid')
retval = manager.get_pid_file_name(ensure_pids_dir=False)
self.assertEqual(retval, '/var/path/uuid.pid')
self.assertEqual(retval, '/var/path/uuid/pid')
self.assertFalse(isdir.called)
def test_pid(self):