ba51dc0665
Add functional testing to the ipset_manager module to verify it works as expected in combination with iptables matching. Implements: blueprint add-ipset-to-security Change-Id: Iec791ec30f87f6c00805f1d52c23b84aa7bc19de
83 lines
3.1 KiB
Python
83 lines
3.1 KiB
Python
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from neutron.agent.linux import utils as linux_utils
|
|
from neutron.common import utils
|
|
|
|
|
|
class IpsetManager(object):
|
|
"""Wrapper for ipset."""
|
|
|
|
def __init__(self, execute=None, root_helper=None, namespace=None):
|
|
self.execute = execute or linux_utils.execute
|
|
self.root_helper = root_helper
|
|
self.namespace = namespace
|
|
|
|
@utils.synchronized('ipset', external=True)
|
|
def create_ipset_chain(self, chain_name, ethertype):
|
|
cmd = ['ipset', 'create', '-exist', chain_name, 'hash:ip', 'family',
|
|
self._get_ipset_chain_type(ethertype)]
|
|
self._apply(cmd)
|
|
|
|
@utils.synchronized('ipset', external=True)
|
|
def add_member_to_ipset_chain(self, chain_name, member_ip):
|
|
cmd = ['ipset', 'add', '-exist', chain_name, member_ip]
|
|
self._apply(cmd)
|
|
|
|
@utils.synchronized('ipset', external=True)
|
|
def refresh_ipset_chain_by_name(self, chain_name, member_ips, ethertype):
|
|
new_chain_name = chain_name + '-new'
|
|
chain_type = self._get_ipset_chain_type(ethertype)
|
|
process_input = ["create %s hash:ip family %s" % (new_chain_name,
|
|
chain_type)]
|
|
for ip in member_ips:
|
|
process_input.append("add %s %s" % (new_chain_name, ip))
|
|
|
|
self._restore_ipset_chains(process_input)
|
|
self._swap_ipset_chains(new_chain_name, chain_name)
|
|
self._destroy_ipset_chain(new_chain_name)
|
|
|
|
@utils.synchronized('ipset', external=True)
|
|
def del_ipset_chain_member(self, chain_name, member_ip):
|
|
cmd = ['ipset', 'del', chain_name, member_ip]
|
|
self._apply(cmd)
|
|
|
|
@utils.synchronized('ipset', external=True)
|
|
def destroy_ipset_chain_by_name(self, chain_name):
|
|
self._destroy_ipset_chain(chain_name)
|
|
|
|
def _apply(self, cmd, input=None):
|
|
input = '\n'.join(input) if input else None
|
|
cmd_ns = []
|
|
if self.namespace:
|
|
cmd_ns.extend(['ip', 'netns', 'exec', self.namespace])
|
|
cmd_ns.extend(cmd)
|
|
self.execute(cmd_ns,
|
|
root_helper=self.root_helper,
|
|
process_input=input)
|
|
|
|
def _get_ipset_chain_type(self, ethertype):
|
|
return 'inet6' if ethertype == 'IPv6' else 'inet'
|
|
|
|
def _restore_ipset_chains(self, process_input):
|
|
cmd = ['ipset', 'restore', '-exist']
|
|
self._apply(cmd, process_input)
|
|
|
|
def _swap_ipset_chains(self, src_chain, dest_chain):
|
|
cmd = ['ipset', 'swap', src_chain, dest_chain]
|
|
self._apply(cmd)
|
|
|
|
def _destroy_ipset_chain(self, chain_name):
|
|
cmd = ['ipset', 'destroy', chain_name]
|
|
self._apply(cmd)
|