parent
7d0fdfc24d
commit
85a33cec25
@ -1,4 +1,4 @@
|
|||||||
branch: lp:charm-helpers
|
branch: lp:~hopem/charm-helpers/lp1497517
|
||||||
destination: hooks/charmhelpers
|
destination: hooks/charmhelpers
|
||||||
include:
|
include:
|
||||||
- core
|
- core
|
||||||
|
@ -23,7 +23,7 @@ import socket
|
|||||||
from functools import partial
|
from functools import partial
|
||||||
|
|
||||||
from charmhelpers.core.hookenv import unit_get
|
from charmhelpers.core.hookenv import unit_get
|
||||||
from charmhelpers.fetch import apt_install
|
from charmhelpers.fetch import apt_install, apt_update
|
||||||
from charmhelpers.core.hookenv import (
|
from charmhelpers.core.hookenv import (
|
||||||
log,
|
log,
|
||||||
WARNING,
|
WARNING,
|
||||||
@ -32,13 +32,15 @@ from charmhelpers.core.hookenv import (
|
|||||||
try:
|
try:
|
||||||
import netifaces
|
import netifaces
|
||||||
except ImportError:
|
except ImportError:
|
||||||
apt_install('python-netifaces')
|
apt_update(fatal=True)
|
||||||
|
apt_install('python-netifaces', fatal=True)
|
||||||
import netifaces
|
import netifaces
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import netaddr
|
import netaddr
|
||||||
except ImportError:
|
except ImportError:
|
||||||
apt_install('python-netaddr')
|
apt_update(fatal=True)
|
||||||
|
apt_install('python-netaddr', fatal=True)
|
||||||
import netaddr
|
import netaddr
|
||||||
|
|
||||||
|
|
||||||
|
@ -44,20 +44,31 @@ class OpenStackAmuletDeployment(AmuletDeployment):
|
|||||||
Determine if the local branch being tested is derived from its
|
Determine if the local branch being tested is derived from its
|
||||||
stable or next (dev) branch, and based on this, use the corresonding
|
stable or next (dev) branch, and based on this, use the corresonding
|
||||||
stable or next branches for the other_services."""
|
stable or next branches for the other_services."""
|
||||||
|
|
||||||
|
# Charms outside the lp:~openstack-charmers namespace
|
||||||
base_charms = ['mysql', 'mongodb', 'nrpe']
|
base_charms = ['mysql', 'mongodb', 'nrpe']
|
||||||
|
|
||||||
|
# Force these charms to current series even when using an older series.
|
||||||
|
# ie. Use trusty/nrpe even when series is precise, as the P charm
|
||||||
|
# does not possess the necessary external master config and hooks.
|
||||||
|
force_series_current = ['nrpe']
|
||||||
|
|
||||||
if self.series in ['precise', 'trusty']:
|
if self.series in ['precise', 'trusty']:
|
||||||
base_series = self.series
|
base_series = self.series
|
||||||
else:
|
else:
|
||||||
base_series = self.current_next
|
base_series = self.current_next
|
||||||
|
|
||||||
if self.stable:
|
for svc in other_services:
|
||||||
for svc in other_services:
|
if svc['name'] in force_series_current:
|
||||||
|
base_series = self.current_next
|
||||||
|
# If a location has been explicitly set, use it
|
||||||
|
if svc.get('location'):
|
||||||
|
continue
|
||||||
|
if self.stable:
|
||||||
temp = 'lp:charms/{}/{}'
|
temp = 'lp:charms/{}/{}'
|
||||||
svc['location'] = temp.format(base_series,
|
svc['location'] = temp.format(base_series,
|
||||||
svc['name'])
|
svc['name'])
|
||||||
else:
|
else:
|
||||||
for svc in other_services:
|
|
||||||
if svc['name'] in base_charms:
|
if svc['name'] in base_charms:
|
||||||
temp = 'lp:charms/{}/{}'
|
temp = 'lp:charms/{}/{}'
|
||||||
svc['location'] = temp.format(base_series,
|
svc['location'] = temp.format(base_series,
|
||||||
@ -66,6 +77,7 @@ class OpenStackAmuletDeployment(AmuletDeployment):
|
|||||||
temp = 'lp:~openstack-charmers/charms/{}/{}/next'
|
temp = 'lp:~openstack-charmers/charms/{}/{}/next'
|
||||||
svc['location'] = temp.format(self.current_next,
|
svc['location'] = temp.format(self.current_next,
|
||||||
svc['name'])
|
svc['name'])
|
||||||
|
|
||||||
return other_services
|
return other_services
|
||||||
|
|
||||||
def _add_services(self, this_service, other_services):
|
def _add_services(self, this_service, other_services):
|
||||||
@ -77,21 +89,23 @@ class OpenStackAmuletDeployment(AmuletDeployment):
|
|||||||
|
|
||||||
services = other_services
|
services = other_services
|
||||||
services.append(this_service)
|
services.append(this_service)
|
||||||
|
|
||||||
|
# Charms which should use the source config option
|
||||||
use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
|
use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
|
||||||
'ceph-osd', 'ceph-radosgw']
|
'ceph-osd', 'ceph-radosgw']
|
||||||
# Most OpenStack subordinate charms do not expose an origin option
|
|
||||||
# as that is controlled by the principle.
|
# Charms which can not use openstack-origin, ie. many subordinates
|
||||||
ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
|
no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
|
||||||
|
|
||||||
if self.openstack:
|
if self.openstack:
|
||||||
for svc in services:
|
for svc in services:
|
||||||
if svc['name'] not in use_source + ignore:
|
if svc['name'] not in use_source + no_origin:
|
||||||
config = {'openstack-origin': self.openstack}
|
config = {'openstack-origin': self.openstack}
|
||||||
self.d.configure(svc['name'], config)
|
self.d.configure(svc['name'], config)
|
||||||
|
|
||||||
if self.source:
|
if self.source:
|
||||||
for svc in services:
|
for svc in services:
|
||||||
if svc['name'] in use_source and svc['name'] not in ignore:
|
if svc['name'] in use_source and svc['name'] not in no_origin:
|
||||||
config = {'source': self.source}
|
config = {'source': self.source}
|
||||||
self.d.configure(svc['name'], config)
|
self.d.configure(svc['name'], config)
|
||||||
|
|
||||||
|
@ -27,6 +27,7 @@ import glanceclient.v1.client as glance_client
|
|||||||
import heatclient.v1.client as heat_client
|
import heatclient.v1.client as heat_client
|
||||||
import keystoneclient.v2_0 as keystone_client
|
import keystoneclient.v2_0 as keystone_client
|
||||||
import novaclient.v1_1.client as nova_client
|
import novaclient.v1_1.client as nova_client
|
||||||
|
import pika
|
||||||
import swiftclient
|
import swiftclient
|
||||||
|
|
||||||
from charmhelpers.contrib.amulet.utils import (
|
from charmhelpers.contrib.amulet.utils import (
|
||||||
@ -602,3 +603,361 @@ class OpenStackAmuletUtils(AmuletUtils):
|
|||||||
self.log.debug('Ceph {} samples (OK): '
|
self.log.debug('Ceph {} samples (OK): '
|
||||||
'{}'.format(sample_type, samples))
|
'{}'.format(sample_type, samples))
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# rabbitmq/amqp specific helpers:
|
||||||
|
def add_rmq_test_user(self, sentry_units,
|
||||||
|
username="testuser1", password="changeme"):
|
||||||
|
"""Add a test user via the first rmq juju unit, check connection as
|
||||||
|
the new user against all sentry units.
|
||||||
|
|
||||||
|
:param sentry_units: list of sentry unit pointers
|
||||||
|
:param username: amqp user name, default to testuser1
|
||||||
|
:param password: amqp user password
|
||||||
|
:returns: None if successful. Raise on error.
|
||||||
|
"""
|
||||||
|
self.log.debug('Adding rmq user ({})...'.format(username))
|
||||||
|
|
||||||
|
# Check that user does not already exist
|
||||||
|
cmd_user_list = 'rabbitmqctl list_users'
|
||||||
|
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
|
||||||
|
if username in output:
|
||||||
|
self.log.warning('User ({}) already exists, returning '
|
||||||
|
'gracefully.'.format(username))
|
||||||
|
return
|
||||||
|
|
||||||
|
perms = '".*" ".*" ".*"'
|
||||||
|
cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
|
||||||
|
'rabbitmqctl set_permissions {} {}'.format(username, perms)]
|
||||||
|
|
||||||
|
# Add user via first unit
|
||||||
|
for cmd in cmds:
|
||||||
|
output, _ = self.run_cmd_unit(sentry_units[0], cmd)
|
||||||
|
|
||||||
|
# Check connection against the other sentry_units
|
||||||
|
self.log.debug('Checking user connect against units...')
|
||||||
|
for sentry_unit in sentry_units:
|
||||||
|
connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
|
||||||
|
username=username,
|
||||||
|
password=password)
|
||||||
|
connection.close()
|
||||||
|
|
||||||
|
def delete_rmq_test_user(self, sentry_units, username="testuser1"):
|
||||||
|
"""Delete a rabbitmq user via the first rmq juju unit.
|
||||||
|
|
||||||
|
:param sentry_units: list of sentry unit pointers
|
||||||
|
:param username: amqp user name, default to testuser1
|
||||||
|
:param password: amqp user password
|
||||||
|
:returns: None if successful or no such user.
|
||||||
|
"""
|
||||||
|
self.log.debug('Deleting rmq user ({})...'.format(username))
|
||||||
|
|
||||||
|
# Check that the user exists
|
||||||
|
cmd_user_list = 'rabbitmqctl list_users'
|
||||||
|
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
|
||||||
|
|
||||||
|
if username not in output:
|
||||||
|
self.log.warning('User ({}) does not exist, returning '
|
||||||
|
'gracefully.'.format(username))
|
||||||
|
return
|
||||||
|
|
||||||
|
# Delete the user
|
||||||
|
cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
|
||||||
|
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
|
||||||
|
|
||||||
|
def get_rmq_cluster_status(self, sentry_unit):
|
||||||
|
"""Execute rabbitmq cluster status command on a unit and return
|
||||||
|
the full output.
|
||||||
|
|
||||||
|
:param unit: sentry unit
|
||||||
|
:returns: String containing console output of cluster status command
|
||||||
|
"""
|
||||||
|
cmd = 'rabbitmqctl cluster_status'
|
||||||
|
output, _ = self.run_cmd_unit(sentry_unit, cmd)
|
||||||
|
self.log.debug('{} cluster_status:\n{}'.format(
|
||||||
|
sentry_unit.info['unit_name'], output))
|
||||||
|
return str(output)
|
||||||
|
|
||||||
|
def get_rmq_cluster_running_nodes(self, sentry_unit):
|
||||||
|
"""Parse rabbitmqctl cluster_status output string, return list of
|
||||||
|
running rabbitmq cluster nodes.
|
||||||
|
|
||||||
|
:param unit: sentry unit
|
||||||
|
:returns: List containing node names of running nodes
|
||||||
|
"""
|
||||||
|
# NOTE(beisner): rabbitmqctl cluster_status output is not
|
||||||
|
# json-parsable, do string chop foo, then json.loads that.
|
||||||
|
str_stat = self.get_rmq_cluster_status(sentry_unit)
|
||||||
|
if 'running_nodes' in str_stat:
|
||||||
|
pos_start = str_stat.find("{running_nodes,") + 15
|
||||||
|
pos_end = str_stat.find("]},", pos_start) + 1
|
||||||
|
str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
|
||||||
|
run_nodes = json.loads(str_run_nodes)
|
||||||
|
return run_nodes
|
||||||
|
else:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def validate_rmq_cluster_running_nodes(self, sentry_units):
|
||||||
|
"""Check that all rmq unit hostnames are represented in the
|
||||||
|
cluster_status output of all units.
|
||||||
|
|
||||||
|
:param host_names: dict of juju unit names to host names
|
||||||
|
:param units: list of sentry unit pointers (all rmq units)
|
||||||
|
:returns: None if successful, otherwise return error message
|
||||||
|
"""
|
||||||
|
host_names = self.get_unit_hostnames(sentry_units)
|
||||||
|
errors = []
|
||||||
|
|
||||||
|
# Query every unit for cluster_status running nodes
|
||||||
|
for query_unit in sentry_units:
|
||||||
|
query_unit_name = query_unit.info['unit_name']
|
||||||
|
running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
|
||||||
|
|
||||||
|
# Confirm that every unit is represented in the queried unit's
|
||||||
|
# cluster_status running nodes output.
|
||||||
|
for validate_unit in sentry_units:
|
||||||
|
val_host_name = host_names[validate_unit.info['unit_name']]
|
||||||
|
val_node_name = 'rabbit@{}'.format(val_host_name)
|
||||||
|
|
||||||
|
if val_node_name not in running_nodes:
|
||||||
|
errors.append('Cluster member check failed on {}: {} not '
|
||||||
|
'in {}\n'.format(query_unit_name,
|
||||||
|
val_node_name,
|
||||||
|
running_nodes))
|
||||||
|
if errors:
|
||||||
|
return ''.join(errors)
|
||||||
|
|
||||||
|
def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
|
||||||
|
"""Check a single juju rmq unit for ssl and port in the config file."""
|
||||||
|
host = sentry_unit.info['public-address']
|
||||||
|
unit_name = sentry_unit.info['unit_name']
|
||||||
|
|
||||||
|
conf_file = '/etc/rabbitmq/rabbitmq.config'
|
||||||
|
conf_contents = str(self.file_contents_safe(sentry_unit,
|
||||||
|
conf_file, max_wait=16))
|
||||||
|
# Checks
|
||||||
|
conf_ssl = 'ssl' in conf_contents
|
||||||
|
conf_port = str(port) in conf_contents
|
||||||
|
|
||||||
|
# Port explicitly checked in config
|
||||||
|
if port and conf_port and conf_ssl:
|
||||||
|
self.log.debug('SSL is enabled @{}:{} '
|
||||||
|
'({})'.format(host, port, unit_name))
|
||||||
|
return True
|
||||||
|
elif port and not conf_port and conf_ssl:
|
||||||
|
self.log.debug('SSL is enabled @{} but not on port {} '
|
||||||
|
'({})'.format(host, port, unit_name))
|
||||||
|
return False
|
||||||
|
# Port not checked (useful when checking that ssl is disabled)
|
||||||
|
elif not port and conf_ssl:
|
||||||
|
self.log.debug('SSL is enabled @{}:{} '
|
||||||
|
'({})'.format(host, port, unit_name))
|
||||||
|
return True
|
||||||
|
elif not port and not conf_ssl:
|
||||||
|
self.log.debug('SSL not enabled @{}:{} '
|
||||||
|
'({})'.format(host, port, unit_name))
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
msg = ('Unknown condition when checking SSL status @{}:{} '
|
||||||
|
'({})'.format(host, port, unit_name))
|
||||||
|
amulet.raise_status(amulet.FAIL, msg)
|
||||||
|
|
||||||
|
def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
|
||||||
|
"""Check that ssl is enabled on rmq juju sentry units.
|
||||||
|
|
||||||
|
:param sentry_units: list of all rmq sentry units
|
||||||
|
:param port: optional ssl port override to validate
|
||||||
|
:returns: None if successful, otherwise return error message
|
||||||
|
"""
|
||||||
|
for sentry_unit in sentry_units:
|
||||||
|
if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
|
||||||
|
return ('Unexpected condition: ssl is disabled on unit '
|
||||||
|
'({})'.format(sentry_unit.info['unit_name']))
|
||||||
|
return None
|
||||||
|
|
||||||
|
def validate_rmq_ssl_disabled_units(self, sentry_units):
|
||||||
|
"""Check that ssl is enabled on listed rmq juju sentry units.
|
||||||
|
|
||||||
|
:param sentry_units: list of all rmq sentry units
|
||||||
|
:returns: True if successful. Raise on error.
|
||||||
|
"""
|
||||||
|
for sentry_unit in sentry_units:
|
||||||
|
if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
|
||||||
|
return ('Unexpected condition: ssl is enabled on unit '
|
||||||
|
'({})'.format(sentry_unit.info['unit_name']))
|
||||||
|
return None
|
||||||
|
|
||||||
|
def configure_rmq_ssl_on(self, sentry_units, deployment,
|
||||||
|
port=None, max_wait=60):
|
||||||
|
"""Turn ssl charm config option on, with optional non-default
|
||||||
|
ssl port specification. Confirm that it is enabled on every
|
||||||
|
unit.
|
||||||
|
|
||||||
|
:param sentry_units: list of sentry units
|
||||||
|
:param deployment: amulet deployment object pointer
|
||||||
|
:param port: amqp port, use defaults if None
|
||||||
|
:param max_wait: maximum time to wait in seconds to confirm
|
||||||
|
:returns: None if successful. Raise on error.
|
||||||
|
"""
|
||||||
|
self.log.debug('Setting ssl charm config option: on')
|
||||||
|
|
||||||
|
# Enable RMQ SSL
|
||||||
|
config = {'ssl': 'on'}
|
||||||
|
if port:
|
||||||
|
config['ssl_port'] = port
|
||||||
|
|
||||||
|
deployment.configure('rabbitmq-server', config)
|
||||||
|
|
||||||
|
# Confirm
|
||||||
|
tries = 0
|
||||||
|
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
|
||||||
|
while ret and tries < (max_wait / 4):
|
||||||
|
time.sleep(4)
|
||||||
|
self.log.debug('Attempt {}: {}'.format(tries, ret))
|
||||||
|
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
|
||||||
|
tries += 1
|
||||||
|
|
||||||
|
if ret:
|
||||||
|
amulet.raise_status(amulet.FAIL, ret)
|
||||||
|
|
||||||
|
def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
|
||||||
|
"""Turn ssl charm config option off, confirm that it is disabled
|
||||||
|
on every unit.
|
||||||
|
|
||||||
|
:param sentry_units: list of sentry units
|
||||||
|
:param deployment: amulet deployment object pointer
|
||||||
|
:param max_wait: maximum time to wait in seconds to confirm
|
||||||
|
:returns: None if successful. Raise on error.
|
||||||
|
"""
|
||||||
|
self.log.debug('Setting ssl charm config option: off')
|
||||||
|
|
||||||
|
# Disable RMQ SSL
|
||||||
|
config = {'ssl': 'off'}
|
||||||
|
deployment.configure('rabbitmq-server', config)
|
||||||
|
|
||||||
|
# Confirm
|
||||||
|
tries = 0
|
||||||
|
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
|
||||||
|
while ret and tries < (max_wait / 4):
|
||||||
|
time.sleep(4)
|
||||||
|
self.log.debug('Attempt {}: {}'.format(tries, ret))
|
||||||
|
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
|
||||||
|
tries += 1
|
||||||
|
|
||||||
|
if ret:
|
||||||
|
amulet.raise_status(amulet.FAIL, ret)
|
||||||
|
|
||||||
|
def connect_amqp_by_unit(self, sentry_unit, ssl=False,
|
||||||
|
port=None, fatal=True,
|
||||||
|
username="testuser1", password="changeme"):
|
||||||
|
"""Establish and return a pika amqp connection to the rabbitmq service
|
||||||
|
running on a rmq juju unit.
|
||||||
|
|
||||||
|
:param sentry_unit: sentry unit pointer
|
||||||
|
:param ssl: boolean, default to False
|
||||||
|
:param port: amqp port, use defaults if None
|
||||||
|
:param fatal: boolean, default to True (raises on connect error)
|
||||||
|
:param username: amqp user name, default to testuser1
|
||||||
|
:param password: amqp user password
|
||||||
|
:returns: pika amqp connection pointer or None if failed and non-fatal
|
||||||
|
"""
|
||||||
|
host = sentry_unit.info['public-address']
|
||||||
|
unit_name = sentry_unit.info['unit_name']
|
||||||
|
|
||||||
|
# Default port logic if port is not specified
|
||||||
|
if ssl and not port:
|
||||||
|
port = 5671
|
||||||
|
elif not ssl and not port:
|
||||||
|
port = 5672
|
||||||
|
|
||||||
|
self.log.debug('Connecting to amqp on {}:{} ({}) as '
|
||||||
|
'{}...'.format(host, port, unit_name, username))
|
||||||
|
|
||||||
|
try:
|
||||||
|
credentials = pika.PlainCredentials(username, password)
|
||||||
|
parameters = pika.ConnectionParameters(host=host, port=port,
|
||||||
|
credentials=credentials,
|
||||||
|
ssl=ssl,
|
||||||
|
connection_attempts=3,
|
||||||
|
retry_delay=5,
|
||||||
|
socket_timeout=1)
|
||||||
|
connection = pika.BlockingConnection(parameters)
|
||||||
|
assert connection.server_properties['product'] == 'RabbitMQ'
|
||||||
|
self.log.debug('Connect OK')
|
||||||
|
return connection
|
||||||
|
except Exception as e:
|
||||||
|
msg = ('amqp connection failed to {}:{} as '
|
||||||
|
'{} ({})'.format(host, port, username, str(e)))
|
||||||
|
if fatal:
|
||||||
|
amulet.raise_status(amulet.FAIL, msg)
|
||||||
|
else:
|
||||||
|
self.log.warn(msg)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def publish_amqp_message_by_unit(self, sentry_unit, message,
|
||||||
|
queue="test", ssl=False,
|
||||||
|
username="testuser1",
|
||||||
|
password="changeme",
|
||||||
|
port=None):
|
||||||
|
"""Publish an amqp message to a rmq juju unit.
|
||||||
|
|
||||||
|
:param sentry_unit: sentry unit pointer
|
||||||
|
:param message: amqp message string
|
||||||
|
:param queue: message queue, default to test
|
||||||
|
:param username: amqp user name, default to testuser1
|
||||||
|
:param password: amqp user password
|
||||||
|
:param ssl: boolean, default to False
|
||||||
|
:param port: amqp port, use defaults if None
|
||||||
|
:returns: None. Raises exception if publish failed.
|
||||||
|
"""
|
||||||
|
self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
|
||||||
|
message))
|
||||||
|
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
|
||||||
|
port=port,
|
||||||
|
username=username,
|
||||||
|
password=password)
|
||||||
|
|
||||||
|
# NOTE(beisner): extra debug here re: pika hang potential:
|
||||||
|
# https://github.com/pika/pika/issues/297
|
||||||
|
# https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
|
||||||
|
self.log.debug('Defining channel...')
|
||||||
|
channel = connection.channel()
|
||||||
|
self.log.debug('Declaring queue...')
|
||||||
|
channel.queue_declare(queue=queue, auto_delete=False, durable=True)
|
||||||
|
self.log.debug('Publishing message...')
|
||||||
|
channel.basic_publish(exchange='', routing_key=queue, body=message)
|
||||||
|
self.log.debug('Closing channel...')
|
||||||
|
channel.close()
|
||||||
|
self.log.debug('Closing connection...')
|
||||||
|
connection.close()
|
||||||
|
|
||||||
|
def get_amqp_message_by_unit(self, sentry_unit, queue="test",
|
||||||
|
username="testuser1",
|
||||||
|
password="changeme",
|
||||||
|
ssl=False, port=None):
|
||||||
|
"""Get an amqp message from a rmq juju unit.
|
||||||
|
|
||||||
|
:param sentry_unit: sentry unit pointer
|
||||||
|
:param queue: message queue, default to test
|
||||||
|
:param username: amqp user name, default to testuser1
|
||||||
|
:param password: amqp user password
|
||||||
|
:param ssl: boolean, default to False
|
||||||
|
:param port: amqp port, use defaults if None
|
||||||
|
:returns: amqp message body as string. Raise if get fails.
|
||||||
|
"""
|
||||||
|
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
|
||||||
|
port=port,
|
||||||
|
username=username,
|
||||||
|
password=password)
|
||||||
|
channel = connection.channel()
|
||||||
|
method_frame, _, body = channel.basic_get(queue)
|
||||||
|
|
||||||
|
if method_frame:
|
||||||
|
self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
|
||||||
|
body))
|
||||||
|
channel.basic_ack(method_frame.delivery_tag)
|
||||||
|
channel.close()
|
||||||
|
connection.close()
|
||||||
|
return body
|
||||||
|
else:
|
||||||
|
msg = 'No message retrieved.'
|
||||||
|
amulet.raise_status(amulet.FAIL, msg)
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
# You should have received a copy of the GNU Lesser General Public License
|
# You should have received a copy of the GNU Lesser General Public License
|
||||||
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
|
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import glob
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
@ -194,10 +195,50 @@ def config_flags_parser(config_flags):
|
|||||||
class OSContextGenerator(object):
|
class OSContextGenerator(object):
|
||||||
"""Base class for all context generators."""
|
"""Base class for all context generators."""
|
||||||
interfaces = []
|
interfaces = []
|
||||||
|
related = False
|
||||||
|
complete = False
|
||||||
|
missing_data = []
|
||||||
|
|
||||||
def __call__(self):
|
def __call__(self):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def context_complete(self, ctxt):
|
||||||
|
"""Check for missing data for the required context data.
|
||||||
|
Set self.missing_data if it exists and return False.
|
||||||
|
Set self.complete if no missing data and return True.
|
||||||
|
"""
|
||||||
|
# Fresh start
|
||||||
|
self.complete = False
|
||||||
|
self.missing_data = []
|
||||||
|
for k, v in six.iteritems(ctxt):
|
||||||
|
if v is None or v == '':
|
||||||
|
if k not in self.missing_data:
|
||||||
|
self.missing_data.append(k)
|
||||||
|
|
||||||
|
if self.missing_data:
|
||||||
|
self.complete = False
|
||||||
|
log('Missing required data: %s' % ' '.join(self.missing_data), level=INFO)
|
||||||
|
else:
|
||||||
|
self.complete = True
|
||||||
|
return self.complete
|
||||||
|
|
||||||
|
def get_related(self):
|
||||||
|
"""Check if any of the context interfaces have relation ids.
|
||||||
|
Set self.related and return True if one of the interfaces
|
||||||
|
has relation ids.
|
||||||
|
"""
|
||||||
|
# Fresh start
|
||||||
|
self.related = False
|
||||||
|
try:
|
||||||
|
for interface in self.interfaces:
|
||||||
|
if relation_ids(interface):
|
||||||
|
self.related = True
|
||||||
|
return self.related
|
||||||
|
except AttributeError as e:
|
||||||
|
log("{} {}"
|
||||||
|
"".format(self, e), 'INFO')
|
||||||
|
return self.related
|
||||||
|
|
||||||
|
|
||||||
class SharedDBContext(OSContextGenerator):
|
class SharedDBContext(OSContextGenerator):
|
||||||
interfaces = ['shared-db']
|
interfaces = ['shared-db']
|
||||||
@ -213,6 +254,7 @@ class SharedDBContext(OSContextGenerator):
|
|||||||
self.database = database
|
self.database = database
|
||||||
self.user = user
|
self.user = user
|
||||||
self.ssl_dir = ssl_dir
|
self.ssl_dir = ssl_dir
|
||||||
|
self.rel_name = self.interfaces[0]
|
||||||
|
|
||||||
def __call__(self):
|
def __call__(self):
|
||||||
self.database = self.database or config('database')
|
self.database = self.database or config('database')
|
||||||
@ -246,6 +288,7 @@ class SharedDBContext(OSContextGenerator):
|
|||||||
password_setting = self.relation_prefix + '_password'
|
password_setting = self.relation_prefix + '_password'
|
||||||
|
|
||||||
for rid in relation_ids(self.interfaces[0]):
|
for rid in relation_ids(self.interfaces[0]):
|
||||||
|
self.related = True
|
||||||
for unit in related_units(rid):
|
for unit in related_units(rid):
|
||||||
rdata = relation_get(rid=rid, unit=unit)
|
rdata = relation_get(rid=rid, unit=unit)
|
||||||
host = rdata.get('db_host')
|
host = rdata.get('db_host')
|
||||||
@ -257,7 +300,7 @@ class SharedDBContext(OSContextGenerator):
|
|||||||
'database_password': rdata.get(password_setting),
|
'database_password': rdata.get(password_setting),
|
||||||
'database_type': 'mysql'
|
'database_type': 'mysql'
|
||||||
}
|
}
|
||||||
if context_complete(ctxt):
|
if self.context_complete(ctxt):
|
||||||
db_ssl(rdata, ctxt, self.ssl_dir)
|
db_ssl(rdata, ctxt, self.ssl_dir)
|
||||||
return ctxt
|
return ctxt
|
||||||
return {}
|
return {}
|
||||||
@ -278,6 +321,7 @@ class PostgresqlDBContext(OSContextGenerator):
|
|||||||
|
|
||||||
ctxt = {}
|
ctxt = {}
|
||||||
for rid in relation_ids(self.interfaces[0]):
|
for rid in relation_ids(self.interfaces[0]):
|
||||||
|
self.related = True
|
||||||
for unit in related_units(rid):
|
for unit in related_units(rid):
|
||||||
rel_host = relation_get('host', rid=rid, unit=unit)
|
rel_host = relation_get('host', rid=rid, unit=unit)
|
||||||
rel_user = relation_get('user', rid=rid, unit=unit)
|
rel_user = relation_get('user', rid=rid, unit=unit)
|
||||||
@ -287,7 +331,7 @@ class PostgresqlDBContext(OSContextGenerator):
|
|||||||
'database_user': rel_user,
|
'database_user': rel_user,
|
||||||
'database_password': rel_passwd,
|
'database_password': rel_passwd,
|
||||||
'database_type': 'postgresql'}
|
'database_type': 'postgresql'}
|
||||||
if context_complete(ctxt):
|
if self.context_complete(ctxt):
|
||||||
return ctxt
|
return ctxt
|
||||||
|
|
||||||
return {}
|
return {}
|
||||||
@ -348,6 +392,7 @@ class IdentityServiceContext(OSContextGenerator):
|
|||||||
ctxt['signing_dir'] = cachedir
|
ctxt['signing_dir'] = cachedir
|
||||||
|
|
||||||
for rid in relation_ids(self.rel_name):
|
for rid in relation_ids(self.rel_name):
|
||||||
|
self.related = True
|
||||||
for unit in related_units(rid):
|
for unit in related_units(rid):
|
||||||
rdata = relation_get(rid=rid, unit=unit)
|
rdata = relation_get(rid=rid, unit=unit)
|
||||||
serv_host = rdata.get('service_host')
|
serv_host = rdata.get('service_host')
|
||||||
@ -366,7 +411,7 @@ class IdentityServiceContext(OSContextGenerator):
|
|||||||
'service_protocol': svc_protocol,
|
'service_protocol': svc_protocol,
|
||||||
'auth_protocol': auth_protocol})
|
'auth_protocol': auth_protocol})
|
||||||
|
|
||||||
if context_complete(ctxt):
|
if self.context_complete(ctxt):
|
||||||
# NOTE(jamespage) this is required for >= icehouse
|
# NOTE(jamespage) this is required for >= icehouse
|
||||||
# so a missing value just indicates keystone needs
|
# so a missing value just indicates keystone needs
|
||||||
# upgrading
|
# upgrading
|
||||||
@ -405,6 +450,7 @@ class AMQPContext(OSContextGenerator):
|
|||||||
ctxt = {}
|
ctxt = {}
|
||||||
for rid in relation_ids(self.rel_name):
|
for rid in relation_ids(self.rel_name):
|
||||||
ha_vip_only = False
|
ha_vip_only = False
|
||||||
|
self.related = True
|
||||||
for unit in related_units(rid):
|
for unit in related_units(rid):
|
||||||
if relation_get('clustered', rid=rid, unit=unit):
|
if relation_get('clustered', rid=rid, unit=unit):
|
||||||
ctxt['clustered'] = True
|
ctxt['clustered'] = True
|
||||||
@ -437,7 +483,7 @@ class AMQPContext(OSContextGenerator):
|
|||||||
ha_vip_only = relation_get('ha-vip-only',
|
ha_vip_only = relation_get('ha-vip-only',
|
||||||
rid=rid, unit=unit) is not None
|
rid=rid, unit=unit) is not None
|
||||||
|
|
||||||
if context_complete(ctxt):
|
if self.context_complete(ctxt):
|
||||||
if 'rabbit_ssl_ca' in ctxt:
|
if 'rabbit_ssl_ca' in ctxt:
|
||||||
if not self.ssl_dir:
|
if not self.ssl_dir:
|
||||||
log("Charm not setup for ssl support but ssl ca "
|
log("Charm not setup for ssl support but ssl ca "
|
||||||
@ -469,7 +515,7 @@ class AMQPContext(OSContextGenerator):
|
|||||||
ctxt['oslo_messaging_flags'] = config_flags_parser(
|
ctxt['oslo_messaging_flags'] = config_flags_parser(
|
||||||
oslo_messaging_flags)
|
oslo_messaging_flags)
|
||||||
|
|
||||||
if not context_complete(ctxt):
|
if not self.complete:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
return ctxt
|
return ctxt
|
||||||
@ -485,13 +531,15 @@ class CephContext(OSContextGenerator):
|
|||||||
|
|
||||||
log('Generating template context for ceph', level=DEBUG)
|
log('Generating template context for ceph', level=DEBUG)
|
||||||
mon_hosts = []
|
mon_hosts = []
|
||||||
auth = None
|
ctxt = {
|
||||||
key = None
|
'use_syslog': str(config('use-syslog')).lower()
|
||||||
use_syslog = str(config('use-syslog')).lower()
|
}
|
||||||
for rid in relation_ids('ceph'):
|
for rid in relation_ids('ceph'):
|
||||||
for unit in related_units(rid):
|
for unit in related_units(rid):
|
||||||
auth = relation_get('auth', rid=rid, unit=unit)
|
if not ctxt.get('auth'):
|
||||||
key = relation_get('key', rid=rid, unit=unit)
|
ctxt['auth'] = relation_get('auth', rid=rid, unit=unit)
|
||||||
|
if not ctxt.get('key'):
|
||||||
|
ctxt['key'] = relation_get('key', rid=rid, unit=unit)
|
||||||
ceph_pub_addr = relation_get('ceph-public-address', rid=rid,
|
ceph_pub_addr = relation_get('ceph-public-address', rid=rid,
|
||||||
unit=unit)
|
unit=unit)
|
||||||
unit_priv_addr = relation_get('private-address', rid=rid,
|
unit_priv_addr = relation_get('private-address', rid=rid,
|
||||||
@ -500,15 +548,12 @@ class CephContext(OSContextGenerator):
|
|||||||
ceph_addr = format_ipv6_addr(ceph_addr) or ceph_addr
|
ceph_addr = format_ipv6_addr(ceph_addr) or ceph_addr
|
||||||
mon_hosts.append(ceph_addr)
|
mon_hosts.append(ceph_addr)
|
||||||
|
|
||||||
ctxt = {'mon_hosts': ' '.join(sorted(mon_hosts)),
|
ctxt['mon_hosts'] = ' '.join(sorted(mon_hosts))
|
||||||
'auth': auth,
|
|
||||||
'key': key,
|
|
||||||
'use_syslog': use_syslog}
|
|
||||||
|
|
||||||
if not os.path.isdir('/etc/ceph'):
|
if not os.path.isdir('/etc/ceph'):
|
||||||
os.mkdir('/etc/ceph')
|
os.mkdir('/etc/ceph')
|
||||||
|
|
||||||
if not context_complete(ctxt):
|
if not self.context_complete(ctxt):
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
ensure_packages(['ceph-common'])
|
ensure_packages(['ceph-common'])
|
||||||
@ -1334,8 +1379,17 @@ class PhyNICMTUContext(DataPortContext):
|
|||||||
ports = mappings.values()
|
ports = mappings.values()
|
||||||
napi_settings = NeutronAPIContext()()
|
napi_settings = NeutronAPIContext()()
|
||||||
mtu = napi_settings.get('network_device_mtu')
|
mtu = napi_settings.get('network_device_mtu')
|
||||||
|
all_ports = []
|
||||||
|
# If any of ports is a vlan device, its underlying device must have
|
||||||
|
# mtu applied first.
|
||||||
|
for port in ports:
|
||||||
|
for lport in glob.glob("/sys/class/net/%s/lower_*" % port):
|
||||||
|
lport = os.path.basename(lport)
|
||||||
|
all_ports.append(lport.split('_')[1])
|
||||||
|
|
||||||
|
all_ports.extend(ports)
|
||||||
if mtu:
|
if mtu:
|
||||||
ctxt["devs"] = '\\n'.join(ports)
|
ctxt["devs"] = '\\n'.join(all_ports)
|
||||||
ctxt['mtu'] = mtu
|
ctxt['mtu'] = mtu
|
||||||
|
|
||||||
return ctxt
|
return ctxt
|
||||||
@ -1367,6 +1421,6 @@ class NetworkServiceContext(OSContextGenerator):
|
|||||||
'auth_protocol':
|
'auth_protocol':
|
||||||
rdata.get('auth_protocol') or 'http',
|
rdata.get('auth_protocol') or 'http',
|
||||||
}
|
}
|
||||||
if context_complete(ctxt):
|
if self.context_complete(ctxt):
|
||||||
return ctxt
|
return ctxt
|
||||||
return {}
|
return {}
|
||||||
|
@ -18,7 +18,7 @@ import os
|
|||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
from charmhelpers.fetch import apt_install
|
from charmhelpers.fetch import apt_install, apt_update
|
||||||
from charmhelpers.core.hookenv import (
|
from charmhelpers.core.hookenv import (
|
||||||
log,
|
log,
|
||||||
ERROR,
|
ERROR,
|
||||||
@ -29,6 +29,7 @@ from charmhelpers.contrib.openstack.utils import OPENSTACK_CODENAMES
|
|||||||
try:
|
try:
|
||||||
from jinja2 import FileSystemLoader, ChoiceLoader, Environment, exceptions
|
from jinja2 import FileSystemLoader, ChoiceLoader, Environment, exceptions
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
apt_update(fatal=True)
|
||||||
apt_install('python-jinja2', fatal=True)
|
apt_install('python-jinja2', fatal=True)
|
||||||
from jinja2 import FileSystemLoader, ChoiceLoader, Environment, exceptions
|
from jinja2 import FileSystemLoader, ChoiceLoader, Environment, exceptions
|
||||||
|
|
||||||
@ -112,7 +113,7 @@ class OSConfigTemplate(object):
|
|||||||
|
|
||||||
def complete_contexts(self):
|
def complete_contexts(self):
|
||||||
'''
|
'''
|
||||||
Return a list of interfaces that have atisfied contexts.
|
Return a list of interfaces that have satisfied contexts.
|
||||||
'''
|
'''
|
||||||
if self._complete_contexts:
|
if self._complete_contexts:
|
||||||
return self._complete_contexts
|
return self._complete_contexts
|
||||||
@ -293,3 +294,30 @@ class OSConfigRenderer(object):
|
|||||||
[interfaces.extend(i.complete_contexts())
|
[interfaces.extend(i.complete_contexts())
|
||||||
for i in six.itervalues(self.templates)]
|
for i in six.itervalues(self.templates)]
|
||||||
return interfaces
|
return interfaces
|
||||||
|
|
||||||
|
def get_incomplete_context_data(self, interfaces):
|
||||||
|
'''
|
||||||
|
Return dictionary of relation status of interfaces and any missing
|
||||||
|
required context data. Example:
|
||||||
|
{'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
|
||||||
|
'zeromq-configuration': {'related': False}}
|
||||||
|
'''
|
||||||
|
incomplete_context_data = {}
|
||||||
|
|
||||||
|
for i in six.itervalues(self.templates):
|
||||||
|
for context in i.contexts:
|
||||||
|
for interface in interfaces:
|
||||||
|
related = False
|
||||||
|
if interface in context.interfaces:
|
||||||
|
related = context.get_related()
|
||||||
|
missing_data = context.missing_data
|
||||||
|
if missing_data:
|
||||||
|
incomplete_context_data[interface] = {'missing_data': missing_data}
|
||||||
|
if related:
|
||||||
|
if incomplete_context_data.get(interface):
|
||||||
|
incomplete_context_data[interface].update({'related': True})
|
||||||
|
else:
|
||||||
|
incomplete_context_data[interface] = {'related': True}
|
||||||
|
else:
|
||||||
|
incomplete_context_data[interface] = {'related': False}
|
||||||
|
return incomplete_context_data
|
||||||
|
@ -25,6 +25,7 @@ import sys
|
|||||||
import re
|
import re
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
import traceback
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from charmhelpers.contrib.network import ip
|
from charmhelpers.contrib.network import ip
|
||||||
@ -34,12 +35,16 @@ from charmhelpers.core import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
from charmhelpers.core.hookenv import (
|
from charmhelpers.core.hookenv import (
|
||||||
|
action_fail,
|
||||||
|
action_set,
|
||||||
config,
|
config,
|
||||||
log as juju_log,
|
log as juju_log,
|
||||||
charm_dir,
|
charm_dir,
|
||||||
INFO,
|
INFO,
|
||||||
relation_ids,
|
relation_ids,
|
||||||
relation_set
|
relation_set,
|
||||||
|
status_set,
|
||||||
|
hook_name
|
||||||
)
|
)
|
||||||
|
|
||||||
from charmhelpers.contrib.storage.linux.lvm import (
|
from charmhelpers.contrib.storage.linux.lvm import (
|
||||||
@ -114,6 +119,7 @@ SWIFT_CODENAMES = OrderedDict([
|
|||||||
('2.2.1', 'kilo'),
|
('2.2.1', 'kilo'),
|
||||||
('2.2.2', 'kilo'),
|
('2.2.2', 'kilo'),
|
||||||
('2.3.0', 'liberty'),
|
('2.3.0', 'liberty'),
|
||||||
|
('2.4.0', 'liberty'),
|
||||||
])
|
])
|
||||||
|
|
||||||
# >= Liberty version->codename mapping
|
# >= Liberty version->codename mapping
|
||||||
@ -142,6 +148,9 @@ PACKAGE_CODENAMES = {
|
|||||||
'glance-common': OrderedDict([
|
'glance-common': OrderedDict([
|
||||||
('11.0.0', 'liberty'),
|
('11.0.0', 'liberty'),
|
||||||
]),
|
]),
|
||||||
|
'openstack-dashboard': OrderedDict([
|
||||||
|
('8.0.0', 'liberty'),
|
||||||
|
]),
|
||||||
}
|
}
|
||||||
|
|
||||||
DEFAULT_LOOPBACK_SIZE = '5G'
|
DEFAULT_LOOPBACK_SIZE = '5G'
|
||||||
@ -745,3 +754,217 @@ def git_yaml_value(projects_yaml, key):
|
|||||||
return projects[key]
|
return projects[key]
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def os_workload_status(configs, required_interfaces, charm_func=None):
|
||||||
|
"""
|
||||||
|
Decorator to set workload status based on complete contexts
|
||||||
|
"""
|
||||||
|
def wrap(f):
|
||||||
|
@wraps(f)
|
||||||
|
def wrapped_f(*args, **kwargs):
|
||||||
|
# Run the original function first
|
||||||
|
f(*args, **kwargs)
|
||||||
|
# Set workload status now that contexts have been
|
||||||
|
# acted on
|
||||||
|
set_os_workload_status(configs, required_interfaces, charm_func)
|
||||||
|
return wrapped_f
|
||||||
|
return wrap
|
||||||
|
|
||||||
|
|
||||||
|
def set_os_workload_status(configs, required_interfaces, charm_func=None):
|
||||||
|
"""
|
||||||
|
Set workload status based on complete contexts.
|
||||||
|
status-set missing or incomplete contexts
|
||||||
|
and juju-log details of missing required data.
|
||||||
|
charm_func is a charm specific function to run checking
|
||||||
|
for charm specific requirements such as a VIP setting.
|
||||||
|
"""
|
||||||
|
incomplete_rel_data = incomplete_relation_data(configs, required_interfaces)
|
||||||
|
state = 'active'
|
||||||
|
missing_relations = []
|
||||||
|
incomplete_relations = []
|
||||||
|
message = None
|
||||||
|
charm_state = None
|
||||||
|
charm_message = None
|
||||||
|
|
||||||
|
for generic_interface in incomplete_rel_data.keys():
|
||||||
|
related_interface = None
|
||||||
|
missing_data = {}
|
||||||
|
# Related or not?
|
||||||
|
for interface in incomplete_rel_data[generic_interface]:
|
||||||
|
if incomplete_rel_data[generic_interface][interface].get('related'):
|
||||||
|
related_interface = interface
|
||||||
|
missing_data = incomplete_rel_data[generic_interface][interface].get('missing_data')
|
||||||
|
# No relation ID for the generic_interface
|
||||||
|
if not related_interface:
|
||||||
|
juju_log("{} relation is missing and must be related for "
|
||||||
|
"functionality. ".format(generic_interface), 'WARN')
|
||||||
|
state = 'blocked'
|
||||||
|
if generic_interface not in missing_relations:
|
||||||
|
missing_relations.append(generic_interface)
|
||||||
|
else:
|
||||||
|
# Relation ID exists but no related unit
|
||||||
|
if not missing_data:
|
||||||
|
# Edge case relation ID exists but departing
|
||||||
|
if ('departed' in hook_name() or 'broken' in hook_name()) \
|
||||||
|
and related_interface in hook_name():
|
||||||
|
state = 'blocked'
|
||||||
|
if generic_interface not in missing_relations:
|
||||||
|
missing_relations.append(generic_interface)
|
||||||
|
juju_log("{} relation's interface, {}, "
|
||||||
|
"relationship is departed or broken "
|
||||||
|
"and is required for functionality."
|
||||||
|
"".format(generic_interface, related_interface), "WARN")
|
||||||
|
# Normal case relation ID exists but no related unit
|
||||||
|
# (joining)
|
||||||
|
else:
|
||||||
|
juju_log("{} relations's interface, {}, is related but has "
|
||||||
|
"no units in the relation."
|
||||||
|
"".format(generic_interface, related_interface), "INFO")
|
||||||
|
# Related unit exists and data missing on the relation
|
||||||
|
else:
|
||||||
|
juju_log("{} relation's interface, {}, is related awaiting "
|
||||||
|
"the following data from the relationship: {}. "
|
||||||
|
"".format(generic_interface, related_interface,
|
||||||
|
", ".join(missing_data)), "INFO")
|
||||||
|
if state != 'blocked':
|
||||||
|
state = 'waiting'
|
||||||
|
if generic_interface not in incomplete_relations \
|
||||||
|
and generic_interface not in missing_relations:
|
||||||
|
incomplete_relations.append(generic_interface)
|
||||||
|
|
||||||
|
if missing_relations:
|
||||||
|
message = "Missing relations: {}".format(", ".join(missing_relations))
|
||||||
|
if incomplete_relations:
|
||||||
|
message += "; incomplete relations: {}" \
|
||||||
|
"".format(", ".join(incomplete_relations))
|
||||||
|
state = 'blocked'
|
||||||
|
elif incomplete_relations:
|
||||||
|
message = "Incomplete relations: {}" \
|
||||||
|
"".format(", ".join(incomplete_relations))
|
||||||
|
state = 'waiting'
|
||||||
|
|
||||||
|
# Run charm specific checks
|
||||||
|
if charm_func:
|
||||||
|
charm_state, charm_message = charm_func(configs)
|
||||||
|
if charm_state != 'active' and charm_state != 'unknown':
|
||||||
|
state = workload_state_compare(state, charm_state)
|
||||||
|
if message:
|
||||||
|
message = "{} {}".format(message, charm_message)
|
||||||
|
else:
|
||||||
|
message = charm_message
|
||||||
|
|
||||||
|
# Set to active if all requirements have been met
|
||||||
|
if state == 'active':
|
||||||
|
message = "Unit is ready"
|
||||||
|
juju_log(message, "INFO")
|
||||||
|
|
||||||
|
status_set(state, message)
|
||||||
|
|
||||||
|
|
||||||
|
def workload_state_compare(current_workload_state, workload_state):
|
||||||
|
""" Return highest priority of two states"""
|
||||||
|
hierarchy = {'unknown': -1,
|
||||||
|
'active': 0,
|
||||||
|
'maintenance': 1,
|
||||||
|
'waiting': 2,
|
||||||
|
'blocked': 3,
|
||||||
|
}
|
||||||
|
|
||||||
|
if hierarchy.get(workload_state) is None:
|
||||||
|
workload_state = 'unknown'
|
||||||
|
if hierarchy.get(current_workload_state) is None:
|
||||||
|
current_workload_state = 'unknown'
|
||||||
|
|
||||||
|
# Set workload_state based on hierarchy of statuses
|
||||||
|
if hierarchy.get(current_workload_state) > hierarchy.get(workload_state):
|
||||||
|
return current_workload_state
|
||||||
|
else:
|
||||||
|
return workload_state
|
||||||
|
|
||||||
|
|
||||||
|
def incomplete_relation_data(configs, required_interfaces):
|
||||||
|
"""
|
||||||
|
Check complete contexts against required_interfaces
|
||||||
|
Return dictionary of incomplete relation data.
|
||||||
|
|
||||||
|
configs is an OSConfigRenderer object with configs registered
|
||||||
|
|
||||||
|
required_interfaces is a dictionary of required general interfaces
|
||||||
|
with dictionary values of possible specific interfaces.
|
||||||
|
Example:
|
||||||
|
required_interfaces = {'database': ['shared-db', 'pgsql-db']}
|
||||||
|
|
||||||
|
The interface is said to be satisfied if anyone of the interfaces in the
|
||||||
|
list has a complete context.
|
||||||
|
|
||||||
|
Return dictionary of incomplete or missing required contexts with relation
|
||||||
|
status of interfaces and any missing data points. Example:
|
||||||
|
{'message':
|
||||||
|
{'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
|
||||||
|
'zeromq-configuration': {'related': False}},
|
||||||
|
'identity':
|
||||||
|
{'identity-service': {'related': False}},
|
||||||
|
'database':
|
||||||
|
{'pgsql-db': {'related': False},
|
||||||
|
'shared-db': {'related': True}}}
|
||||||
|
"""
|
||||||
|
complete_ctxts = configs.complete_contexts()
|
||||||
|
incomplete_relations = []
|
||||||
|
for svc_type in required_interfaces.keys():
|
||||||
|
# Avoid duplicates
|
||||||
|
found_ctxt = False
|
||||||
|
for interface in required_interfaces[svc_type]:
|
||||||
|
if interface in complete_ctxts:
|
||||||
|
found_ctxt = True
|
||||||
|
if not found_ctxt:
|
||||||
|
incomplete_relations.append(svc_type)
|
||||||
|
incomplete_context_data = {}
|
||||||
|
for i in incomplete_relations:
|
||||||
|
incomplete_context_data[i] = configs.get_incomplete_context_data(required_interfaces[i])
|
||||||
|
return incomplete_context_data
|
||||||
|
|
||||||
|
|
||||||
|
def do_action_openstack_upgrade(package, upgrade_callback, configs):
|
||||||
|
"""Perform action-managed OpenStack upgrade.
|
||||||
|
|
||||||
|
Upgrades packages to the configured openstack-origin version and sets
|
||||||
|
the corresponding action status as a result.
|
||||||
|
|
||||||
|
If the charm was installed from source we cannot upgrade it.
|
||||||
|
For backwards compatibility a config flag (action-managed-upgrade) must
|
||||||
|
be set for this code to run, otherwise a full service level upgrade will
|
||||||
|
fire on config-changed.
|
||||||
|
|
||||||
|
@param package: package name for determining if upgrade available
|
||||||
|
@param upgrade_callback: function callback to charm's upgrade function
|
||||||
|
@param configs: templating object derived from OSConfigRenderer class
|
||||||
|
|
||||||
|
@return: True if upgrade successful; False if upgrade failed or skipped
|
||||||
|
"""
|
||||||
|
ret = False
|
||||||
|
|
||||||
|
if git_install_requested():
|
||||||
|
action_set({'outcome': 'installed from source, skipped upgrade.'})
|
||||||
|
else:
|
||||||
|
if openstack_upgrade_available(package):
|
||||||
|
if config('action-managed-upgrade'):
|
||||||
|
juju_log('Upgrading OpenStack release')
|
||||||
|
|
||||||
|
try:
|
||||||
|
upgrade_callback(configs=configs)
|
||||||
|
action_set({'outcome': 'success, upgrade completed.'})
|
||||||
|
ret = True
|
||||||
|
except:
|
||||||
|
action_set({'outcome': 'upgrade failed, see traceback.'})
|
||||||
|
action_set({'traceback': traceback.format_exc()})
|
||||||
|
action_fail('do_openstack_upgrade resulted in an '
|
||||||
|
'unexpected error')
|
||||||
|
else:
|
||||||
|
action_set({'outcome': 'action-managed-upgrade config is '
|
||||||
|
'False, skipped upgrade.'})
|
||||||
|
else:
|
||||||
|
action_set({'outcome': 'no upgrade available.'})
|
||||||
|
|
||||||
|
return ret
|
||||||
|
@ -28,6 +28,7 @@ import os
|
|||||||
import shutil
|
import shutil
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
|
import uuid
|
||||||
|
|
||||||
from subprocess import (
|
from subprocess import (
|
||||||
check_call,
|
check_call,
|
||||||
@ -35,8 +36,10 @@ from subprocess import (
|
|||||||
CalledProcessError,
|
CalledProcessError,
|
||||||
)
|
)
|
||||||
from charmhelpers.core.hookenv import (
|
from charmhelpers.core.hookenv import (
|
||||||
|
local_unit,
|
||||||
relation_get,
|
relation_get,
|
||||||
relation_ids,
|
relation_ids,
|
||||||
|
relation_set,
|
||||||
related_units,
|
related_units,
|
||||||
log,
|
log,
|
||||||
DEBUG,
|
DEBUG,
|
||||||
@ -56,6 +59,8 @@ from charmhelpers.fetch import (
|
|||||||
apt_install,
|
apt_install,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from charmhelpers.core.kernel import modprobe
|
||||||
|
|
||||||
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
|
KEYRING = '/etc/ceph/ceph.client.{}.keyring'
|
||||||
KEYFILE = '/etc/ceph/ceph.client.{}.key'
|
KEYFILE = '/etc/ceph/ceph.client.{}.key'
|
||||||
|
|
||||||
@ -288,17 +293,6 @@ def place_data_on_block_device(blk_device, data_src_dst):
|
|||||||
os.chown(data_src_dst, uid, gid)
|
os.chown(data_src_dst, uid, gid)
|
||||||
|
|
||||||
|
|
||||||
# TODO: re-use
|
|
||||||
def modprobe(module):
|
|
||||||
"""Load a kernel module and configure for auto-load on reboot."""
|
|
||||||
log('Loading kernel module', level=INFO)
|
|
||||||
cmd = ['modprobe', module]
|
|
||||||
check_call(cmd)
|
|
||||||
with open('/etc/modules', 'r+') as modules:
|
|
||||||
if module not in modules.read():
|
|
||||||
modules.write(module)
|
|
||||||
|
|
||||||
|
|
||||||
def copy_files(src, dst, symlinks=False, ignore=None):
|
def copy_files(src, dst, symlinks=False, ignore=None):
|
||||||
"""Copy files from src to dst."""
|
"""Copy files from src to dst."""
|
||||||
for item in os.listdir(src):
|
for item in os.listdir(src):
|
||||||
@ -411,17 +405,52 @@ class CephBrokerRq(object):
|
|||||||
|
|
||||||
The API is versioned and defaults to version 1.
|
The API is versioned and defaults to version 1.
|
||||||
"""
|
"""
|
||||||
def __init__(self, api_version=1):
|
def __init__(self, api_version=1, request_id=None):
|
||||||
self.api_version = api_version
|
self.api_version = api_version
|
||||||
|
if request_id:
|
||||||
|
self.request_id = request_id
|
||||||
|
else:
|
||||||
|
self.request_id = str(uuid.uuid1())
|
||||||
self.ops = []
|
self.ops = []
|
||||||
|
|
||||||
def add_op_create_pool(self, name, replica_count=3):
|
def add_op_create_pool(self, name, replica_count=3):
|
||||||
self.ops.append({'op': 'create-pool', 'name': name,
|
self.ops.append({'op': 'create-pool', 'name': name,
|
||||||
'replicas': replica_count})
|
'replicas': replica_count})
|
||||||
|
|
||||||
|
def set_ops(self, ops):
|
||||||
|
"""Set request ops to provided value.
|
||||||
|
|
||||||
|
Useful for injecting ops that come from a previous request
|
||||||
|
to allow comparisons to ensure validity.
|
||||||
|
"""
|
||||||
|
self.ops = ops
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def request(self):
|
def request(self):
|
||||||
return json.dumps({'api-version': self.api_version, 'ops': self.ops})
|
return json.dumps({'api-version': self.api_version, 'ops': self.ops,
|
||||||
|
'request-id': self.request_id})
|
||||||
|
|
||||||
|
def _ops_equal(self, other):
|
||||||
|
if len(self.ops) == len(other.ops):
|
||||||
|
for req_no in range(0, len(self.ops)):
|
||||||
|
for key in ['replicas', 'name', 'op']:
|
||||||
|
if self.ops[req_no][key] != other.ops[req_no][key]:
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
if not isinstance(other, self.__class__):
|
||||||
|
return False
|
||||||
|
if self.api_version == other.api_version and \
|
||||||
|
self._ops_equal(other):
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def __ne__(self, other):
|
||||||
|
return not self.__eq__(other)
|
||||||
|
|
||||||
|
|
||||||
class CephBrokerRsp(object):
|
class CephBrokerRsp(object):
|
||||||
@ -431,10 +460,15 @@ class CephBrokerRsp(object):
|
|||||||
|
|
||||||
The API is versioned and defaults to version 1.
|
The API is versioned and defaults to version 1.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, encoded_rsp):
|
def __init__(self, encoded_rsp):
|
||||||
self.api_version = None
|
self.api_version = None
|
||||||
self.rsp = json.loads(encoded_rsp)
|
self.rsp = json.loads(encoded_rsp)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def request_id(self):
|
||||||
|
return self.rsp.get('request-id')
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def exit_code(self):
|
def exit_code(self):
|
||||||
return self.rsp.get('exit-code')
|
return self.rsp.get('exit-code')
|
||||||
@ -442,3 +476,182 @@ class CephBrokerRsp(object):
|
|||||||
@property
|
@property
|
||||||
def exit_msg(self):
|
def exit_msg(self):
|
||||||
return self.rsp.get('stderr')
|
return self.rsp.get('stderr')
|
||||||
|
|
||||||
|
|
||||||
|
# Ceph Broker Conversation:
|
||||||
|
# If a charm needs an action to be taken by ceph it can create a CephBrokerRq
|
||||||
|
# and send that request to ceph via the ceph relation. The CephBrokerRq has a
|
||||||
|
# unique id so that the client can identity which CephBrokerRsp is associated
|
||||||
|
# with the request. Ceph will also respond to each client unit individually
|
||||||
|
# creating a response key per client unit eg glance/0 will get a CephBrokerRsp
|
||||||
|
# via key broker-rsp-glance-0
|
||||||
|
#
|
||||||
|
# To use this the charm can just do something like:
|
||||||
|
#
|
||||||
|
# from charmhelpers.contrib.storage.linux.ceph import (
|
||||||
|
# send_request_if_needed,
|
||||||
|
# is_request_complete,
|
||||||
|
# CephBrokerRq,
|
||||||
|
# )
|
||||||
|
#
|
||||||
|
# @hooks.hook('ceph-relation-changed')
|
||||||
|
# def ceph_changed():
|
||||||
|
# rq = CephBrokerRq()
|
||||||
|
# rq.add_op_create_pool(name='poolname', replica_count=3)
|
||||||
|
#
|
||||||
|
# if is_request_complete(rq):
|
||||||
|
# <Request complete actions>
|
||||||
|
# else:
|
||||||
|
# send_request_if_needed(get_ceph_request())
|
||||||
|
#
|
||||||
|
# CephBrokerRq and CephBrokerRsp are serialized into JSON. Below is an example
|
||||||
|
# of glance having sent a request to ceph which ceph has successfully processed
|
||||||
|
# 'ceph:8': {
|
||||||
|
# 'ceph/0': {
|
||||||
|
# 'auth': 'cephx',
|
||||||
|
# 'broker-rsp-glance-0': '{"request-id": "0bc7dc54", "exit-code": 0}',
|
||||||
|
# 'broker_rsp': '{"request-id": "0da543b8", "exit-code": 0}',
|
||||||
|
# 'ceph-public-address': '10.5.44.103',
|
||||||
|
# 'key': 'AQCLDttVuHXINhAAvI144CB09dYchhHyTUY9BQ==',
|
||||||
|
# 'private-address': '10.5.44.103',
|
||||||
|
# },
|
||||||
|
# 'glance/0': {
|
||||||
|
# 'broker_req': ('{"api-version": 1, "request-id": "0bc7dc54", '
|
||||||
|
# '"ops": [{"replicas": 3, "name": "glance", '
|
||||||
|
# '"op": "create-pool"}]}'),
|
||||||
|
# 'private-address': '10.5.44.109',
|
||||||
|
# },
|
||||||
|
# }
|
||||||
|
|
||||||
|
def get_previous_request(rid):
|
||||||
|
"""Return the last ceph broker request sent on a given relation
|
||||||
|
|
||||||
|
@param rid: Relation id to query for request
|
||||||
|
"""
|
||||||
|
request = None
|
||||||
|
broker_req = relation_get(attribute='broker_req', rid=rid,
|
||||||
|
unit=local_unit())
|
||||||
|
if broker_req:
|
||||||
|
request_data = json.loads(broker_req)
|
||||||
|
request = CephBrokerRq(api_version=request_data['api-version'],
|
||||||
|
request_id=request_data['request-id'])
|
||||||
|
request.set_ops(request_data['ops'])
|
||||||
|
|
||||||
|
return request
|
||||||
|
|
||||||
|
|
||||||
|
def get_request_states(request):
|
||||||
|
"""Return a dict of requests per relation id with their corresponding
|
||||||
|
completion state.
|
||||||
|
|
||||||
|
This allows a charm, which has a request for ceph, to see whether there is
|
||||||
|
an equivalent request already being processed and if so what state that
|
||||||
|
request is in.
|
||||||
|
|
||||||
|
@param request: A CephBrokerRq object
|
||||||
|
"""
|
||||||
|
complete = []
|
||||||
|
requests = {}
|
||||||
|
for rid in relation_ids('ceph'):
|
||||||
|
complete = False
|
||||||
|
previous_request = get_previous_request(rid)
|
||||||
|
if request == previous_request:
|
||||||
|
sent = True
|
||||||
|
complete = is_request_complete_for_rid(previous_request, rid)
|
||||||
|
else:
|
||||||
|
sent = False
|
||||||
|
complete = False
|
||||||
|
|
||||||
|
requests[rid] = {
|
||||||
|
'sent': sent,
|
||||||
|
'complete': complete,
|
||||||
|
}
|
||||||
|
|
||||||
|
return requests
|
||||||
|
|
||||||
|
|
||||||
|
def is_request_sent(request):
|
||||||
|
"""Check to see if a functionally equivalent request has already been sent
|
||||||
|
|
||||||
|
Returns True if a similair request has been sent
|
||||||
|
|
||||||
|
@param request: A CephBrokerRq object
|
||||||
|
"""
|
||||||
|
states = get_request_states(request)
|
||||||
|
for rid in states.keys():
|
||||||
|
if not states[rid]['sent']:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def is_request_complete(request):
|
||||||
|
"""Check to see if a functionally equivalent request has already been
|
||||||
|
completed
|
||||||
|
|
||||||
|
Returns True if a similair request has been completed
|
||||||
|
|
||||||
|
@param request: A CephBrokerRq object
|
||||||
|
"""
|
||||||
|
states = get_request_states(request)
|
||||||
|
for rid in states.keys():
|
||||||
|
if not states[rid]['complete']:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def is_request_complete_for_rid(request, rid):
|
||||||
|
"""Check if a given request has been completed on the given relation
|
||||||
|
|
||||||
|
@param request: A CephBrokerRq object
|
||||||
|
@param rid: Relation ID
|
||||||
|
"""
|
||||||
|
broker_key = get_broker_rsp_key()
|
||||||
|
for unit in related_units(rid):
|
||||||
|
rdata = relation_get(rid=rid, unit=unit)
|
||||||
|
if rdata.get(broker_key):
|
||||||
|
rsp = CephBrokerRsp(rdata.get(broker_key))
|
||||||
|
if rsp.request_id == request.request_id:
|
||||||
|
if not rsp.exit_code:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
# The remote unit sent no reply targeted at this unit so either the
|
||||||
|
# remote ceph cluster does not support unit targeted replies or it
|
||||||
|
# has not processed our request yet.
|
||||||
|
if rdata.get('broker_rsp'):
|
||||||
|
request_data = json.loads(rdata['broker_rsp'])
|
||||||
|
if request_data.get('request-id'):
|
||||||
|
log('Ignoring legacy broker_rsp without unit key as remote '
|
||||||
|
'service supports unit specific replies', level=DEBUG)
|
||||||
|
else:
|
||||||
|
log('Using legacy broker_rsp as remote service does not '
|
||||||
|
'supports unit specific replies', level=DEBUG)
|
||||||
|
rsp = CephBrokerRsp(rdata['broker_rsp'])
|
||||||
|
if not rsp.exit_code:
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def get_broker_rsp_key():
|
||||||
|
"""Return broker response key for this unit
|
||||||
|
|
||||||
|
This is the key that ceph is going to use to pass request status
|
||||||
|
information back to this unit
|
||||||
|
"""
|
||||||
|
return 'broker-rsp-' + local_unit().replace('/', '-')
|
||||||
|
|
||||||
|
|
||||||
|
def send_request_if_needed(request):
|
||||||
|
"""Send broker request if an equivalent request has not already been sent
|
||||||
|
|
||||||
|
@param request: A CephBrokerRq object
|
||||||
|
"""
|
||||||
|
if is_request_sent(request):
|
||||||
|
log('Request already sent but not complete, not sending new request',
|
||||||
|
level=DEBUG)
|
||||||
|
else:
|
||||||
|
for rid in relation_ids('ceph'):
|
||||||
|
log('Sending request {}'.format(request.request_id), level=DEBUG)
|
||||||
|
relation_set(relation_id=rid, broker_req=request.request)
|
||||||
|
@ -63,32 +63,48 @@ def service_reload(service_name, restart_on_failure=False):
|
|||||||
return service_result
|
return service_result
|
||||||
|
|
||||||
|
|
||||||
def service_pause(service_name, init_dir=None):
|
def service_pause(service_name, init_dir="/etc/init", initd_dir="/etc/init.d"):
|
||||||
"""Pause a system service.
|
"""Pause a system service.
|
||||||
|
|
||||||
Stop it, and prevent it from starting again at boot."""
|
Stop it, and prevent it from starting again at boot."""
|
||||||
if init_dir is None:
|
|
||||||
init_dir = "/etc/init"
|
|
||||||
stopped = service_stop(service_name)
|
stopped = service_stop(service_name)
|
||||||
# XXX: Support systemd too
|
upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
|
||||||
override_path = os.path.join(
|
sysv_file = os.path.join(initd_dir, service_name)
|
||||||
init_dir, '{}.override'.format(service_name))
|
if os.path.exists(upstart_file):
|
||||||
with open(override_path, 'w') as fh:
|
override_path = os.path.join(
|
||||||
fh.write("manual\n")
|
init_dir, '{}.override'.format(service_name))
|
||||||
|
with open(override_path, 'w') as fh:
|
||||||
|
fh.write("manual\n")
|
||||||
|
elif os.path.exists(sysv_file):
|
||||||
|
subprocess.check_call(["update-rc.d", service_name, "disable"])
|
||||||
|
else:
|
||||||
|
# XXX: Support SystemD too
|
||||||
|
raise ValueError(
|
||||||
|
"Unable to detect {0} as either Upstart {1} or SysV {2}".format(
|
||||||
|
service_name, upstart_file, sysv_file))
|
||||||
return stopped
|
return stopped
|
||||||
|
|
||||||
|
|
||||||
def service_resume(service_name, init_dir=None):
|
def service_resume(service_name, init_dir="/etc/init",
|
||||||
|
initd_dir="/etc/init.d"):
|
||||||
"""Resume a system service.
|
"""Resume a system service.
|
||||||
|
|
||||||
Reenable starting again at boot. Start the service"""
|
Reenable starting again at boot. Start the service"""
|
||||||
# XXX: Support systemd too
|
upstart_file = os.path.join(init_dir, "{}.conf".format(service_name))
|
||||||
if init_dir is None:
|
sysv_file = os.path.join(initd_dir, service_name)
|
||||||
init_dir = "/etc/init"
|
if os.path.exists(upstart_file):
|
||||||
override_path = os.path.join(
|
override_path = os.path.join(
|
||||||
init_dir, '{}.override'.format(service_name))
|
init_dir, '{}.override'.format(service_name))
|
||||||
if os.path.exists(override_path):
|
if os.path.exists(override_path):
|
||||||
os.unlink(override_path)
|
os.unlink(override_path)
|
||||||
|
elif os.path.exists(sysv_file):
|
||||||
|
subprocess.check_call(["update-rc.d", service_name, "enable"])
|
||||||
|
else:
|
||||||
|
# XXX: Support SystemD too
|
||||||
|
raise ValueError(
|
||||||
|
"Unable to detect {0} as either Upstart {1} or SysV {2}".format(
|
||||||
|
service_name, upstart_file, sysv_file))
|
||||||
|
|
||||||
started = service_start(service_name)
|
started = service_start(service_name)
|
||||||
return started
|
return started
|
||||||
|
|
||||||
|
@ -25,11 +25,13 @@ from charmhelpers.core.host import (
|
|||||||
fstab_mount,
|
fstab_mount,
|
||||||
mkdir,
|
mkdir,
|
||||||
)
|
)
|
||||||
|
from charmhelpers.core.strutils import bytes_from_string
|
||||||
|
from subprocess import check_output
|
||||||
|
|
||||||
|
|
||||||
def hugepage_support(user, group='hugetlb', nr_hugepages=256,
|
def hugepage_support(user, group='hugetlb', nr_hugepages=256,
|
||||||
max_map_count=65536, mnt_point='/run/hugepages/kvm',
|
max_map_count=65536, mnt_point='/run/hugepages/kvm',
|
||||||
pagesize='2MB', mount=True):
|
pagesize='2MB', mount=True, set_shmmax=False):
|
||||||
"""Enable hugepages on system.
|
"""Enable hugepages on system.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -49,6 +51,11 @@ def hugepage_support(user, group='hugetlb', nr_hugepages=256,
|
|||||||
'vm.max_map_count': max_map_count,
|
'vm.max_map_count': max_map_count,
|
||||||
'vm.hugetlb_shm_group': gid,
|
'vm.hugetlb_shm_group': gid,
|
||||||
}
|
}
|
||||||
|
if set_shmmax:
|
||||||
|
shmmax_current = int(check_output(['sysctl', '-n', 'kernel.shmmax']))
|
||||||
|
shmmax_minsize = bytes_from_string(pagesize) * nr_hugepages
|
||||||
|
if shmmax_minsize > shmmax_current:
|
||||||
|
sysctl_settings['kernel.shmmax'] = shmmax_minsize
|
||||||
sysctl.create(yaml.dump(sysctl_settings), '/etc/sysctl.d/10-hugepage.conf')
|
sysctl.create(yaml.dump(sysctl_settings), '/etc/sysctl.d/10-hugepage.conf')
|
||||||
mkdir(mnt_point, owner='root', group='root', perms=0o755, force=False)
|
mkdir(mnt_point, owner='root', group='root', perms=0o755, force=False)
|
||||||
lfstab = fstab.Fstab()
|
lfstab = fstab.Fstab()
|
||||||
|
68
hooks/charmhelpers/core/kernel.py
Normal file
68
hooks/charmhelpers/core/kernel.py
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
# Copyright 2014-2015 Canonical Limited.
|
||||||
|
#
|
||||||
|
# This file is part of charm-helpers.
|
||||||
|
#
|
||||||
|
# charm-helpers is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Lesser General Public License version 3 as
|
||||||
|
# published by the Free Software Foundation.
|
||||||
|
#
|
||||||
|
# charm-helpers is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Lesser General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Lesser General Public License
|
||||||
|
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
__author__ = "Jorge Niedbalski <jorge.niedbalski@canonical.com>"
|
||||||
|
|
||||||
|
from charmhelpers.core.hookenv import (
|
||||||
|
log,
|
||||||
|
INFO
|
||||||
|
)
|
||||||
|
|
||||||
|
from subprocess import check_call, check_output
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
|
def modprobe(module, persist=True):
|
||||||
|
"""Load a kernel module and configure for auto-load on reboot."""
|
||||||
|
cmd = ['modprobe', module]
|
||||||
|
|
||||||
|
log('Loading kernel module %s' % module, level=INFO)
|
||||||
|
|
||||||
|
check_call(cmd)
|
||||||
|
if persist:
|
||||||
|
with open('/etc/modules', 'r+') as modules:
|
||||||
|
if module not in modules.read():
|
||||||
|
modules.write(module)
|
||||||
|
|
||||||
|
|
||||||
|
def rmmod(module, force=False):
|
||||||
|
"""Remove a module from the linux kernel"""
|
||||||
|
cmd = ['rmmod']
|
||||||
|
if force:
|
||||||
|
cmd.append('-f')
|
||||||
|
cmd.append(module)
|
||||||
|
log('Removing kernel module %s' % module, level=INFO)
|
||||||
|
return check_call(cmd)
|
||||||
|
|
||||||
|
|
||||||
|
def lsmod():
|
||||||
|
"""Shows what kernel modules are currently loaded"""
|
||||||
|
return check_output(['lsmod'],
|
||||||
|
universal_newlines=True)
|
||||||
|
|
||||||
|
|
||||||
|
def is_module_loaded(module):
|
||||||
|
"""Checks if a kernel module is already loaded"""
|
||||||
|
matches = re.findall('^%s[ ]+' % module, lsmod(), re.M)
|
||||||
|
return len(matches) > 0
|
||||||
|
|
||||||
|
|
||||||
|
def update_initramfs(version='all'):
|
||||||
|
"""Updates an initramfs image"""
|
||||||
|
return check_call(["update-initramfs", "-k", version, "-u"])
|
@ -18,6 +18,7 @@
|
|||||||
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
|
# along with charm-helpers. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
def bool_from_string(value):
|
def bool_from_string(value):
|
||||||
@ -40,3 +41,32 @@ def bool_from_string(value):
|
|||||||
|
|
||||||
msg = "Unable to interpret string value '%s' as boolean" % (value)
|
msg = "Unable to interpret string value '%s' as boolean" % (value)
|
||||||
raise ValueError(msg)
|
raise ValueError(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def bytes_from_string(value):
|
||||||
|
"""Interpret human readable string value as bytes.
|
||||||
|
|
||||||
|
Returns int
|
||||||
|
"""
|
||||||
|
BYTE_POWER = {
|
||||||
|
'K': 1,
|
||||||
|
'KB': 1,
|
||||||
|
'M': 2,
|
||||||
|
'MB': 2,
|
||||||
|
'G': 3,
|
||||||
|
'GB': 3,
|
||||||
|
'T': 4,
|
||||||
|
'TB': 4,
|
||||||
|
'P': 5,
|
||||||
|
'PB': 5,
|
||||||
|
}
|
||||||
|
if isinstance(value, six.string_types):
|
||||||
|
value = six.text_type(value)
|
||||||
|
else:
|
||||||
|
msg = "Unable to interpret non-string value '%s' as boolean" % (value)
|
||||||
|
raise ValueError(msg)
|
||||||
|
matches = re.match("([0-9]+)([a-zA-Z]+)", value)
|
||||||
|
if not matches:
|
||||||
|
msg = "Unable to interpret string value '%s' as bytes" % (value)
|
||||||
|
raise ValueError(msg)
|
||||||
|
return int(matches.group(1)) * (1024 ** BYTE_POWER[matches.group(2)])
|
||||||
|
@ -51,7 +51,8 @@ class AmuletDeployment(object):
|
|||||||
if 'units' not in this_service:
|
if 'units' not in this_service:
|
||||||
this_service['units'] = 1
|
this_service['units'] = 1
|
||||||
|
|
||||||
self.d.add(this_service['name'], units=this_service['units'])
|
self.d.add(this_service['name'], units=this_service['units'],
|
||||||
|
constraints=this_service.get('constraints'))
|
||||||
|
|
||||||
for svc in other_services:
|
for svc in other_services:
|
||||||
if 'location' in svc:
|
if 'location' in svc:
|
||||||
@ -64,7 +65,8 @@ class AmuletDeployment(object):
|
|||||||
if 'units' not in svc:
|
if 'units' not in svc:
|
||||||
svc['units'] = 1
|
svc['units'] = 1
|
||||||
|
|
||||||
self.d.add(svc['name'], charm=branch_location, units=svc['units'])
|
self.d.add(svc['name'], charm=branch_location, units=svc['units'],
|
||||||
|
constraints=svc.get('constraints'))
|
||||||
|
|
||||||
def _add_relations(self, relations):
|
def _add_relations(self, relations):
|
||||||
"""Add all of the relations for the services."""
|
"""Add all of the relations for the services."""
|
||||||
|
@ -19,9 +19,11 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
import socket
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
import uuid
|
||||||
|
|
||||||
import amulet
|
import amulet
|
||||||
import distro_info
|
import distro_info
|
||||||
@ -114,7 +116,7 @@ class AmuletUtils(object):
|
|||||||
# /!\ DEPRECATION WARNING (beisner):
|
# /!\ DEPRECATION WARNING (beisner):
|
||||||
# New and existing tests should be rewritten to use
|
# New and existing tests should be rewritten to use
|
||||||
# validate_services_by_name() as it is aware of init systems.
|
# validate_services_by_name() as it is aware of init systems.
|
||||||
self.log.warn('/!\\ DEPRECATION WARNING: use '
|
self.log.warn('DEPRECATION WARNING: use '
|
||||||
'validate_services_by_name instead of validate_services '
|
'validate_services_by_name instead of validate_services '
|
||||||
'due to init system differences.')
|
'due to init system differences.')
|
||||||
|
|
||||||
@ -269,33 +271,52 @@ class AmuletUtils(object):
|
|||||||
"""Get last modification time of directory."""
|
"""Get last modification time of directory."""
|
||||||
return sentry_unit.directory_stat(directory)['mtime']
|
return sentry_unit.directory_stat(directory)['mtime']
|
||||||
|
|
||||||
def _get_proc_start_time(self, sentry_unit, service, pgrep_full=False):
|
def _get_proc_start_time(self, sentry_unit, service, pgrep_full=None):
|
||||||
"""Get process' start time.
|
"""Get start time of a process based on the last modification time
|
||||||
|
of the /proc/pid directory.
|
||||||
|
|
||||||
Determine start time of the process based on the last modification
|
:sentry_unit: The sentry unit to check for the service on
|
||||||
time of the /proc/pid directory. If pgrep_full is True, the process
|
:service: service name to look for in process table
|
||||||
name is matched against the full command line.
|
:pgrep_full: [Deprecated] Use full command line search mode with pgrep
|
||||||
"""
|
:returns: epoch time of service process start
|
||||||
if pgrep_full:
|
:param commands: list of bash commands
|
||||||
cmd = 'pgrep -o -f {}'.format(service)
|
:param sentry_units: list of sentry unit pointers
|
||||||
else:
|
:returns: None if successful; Failure message otherwise
|
||||||
cmd = 'pgrep -o {}'.format(service)
|
"""
|
||||||
cmd = cmd + ' | grep -v pgrep || exit 0'
|
if pgrep_full is not None:
|
||||||
cmd_out = sentry_unit.run(cmd)
|
# /!\ DEPRECATION WARNING (beisner):
|
||||||
self.log.debug('CMDout: ' + str(cmd_out))
|
# No longer implemented, as pidof is now used instead of pgrep.
|
||||||
if cmd_out[0]:
|
# https://bugs.launchpad.net/charm-helpers/+bug/1474030
|
||||||
self.log.debug('Pid for %s %s' % (service, str(cmd_out[0])))
|
self.log.warn('DEPRECATION WARNING: pgrep_full bool is no '
|
||||||
proc_dir = '/proc/{}'.format(cmd_out[0].strip())
|
'longer implemented re: lp 1474030.')
|
||||||
return self._get_dir_mtime(sentry_unit, proc_dir)
|
|
||||||
|
pid_list = self.get_process_id_list(sentry_unit, service)
|
||||||
|
pid = pid_list[0]
|
||||||
|
proc_dir = '/proc/{}'.format(pid)
|
||||||
|
self.log.debug('Pid for {} on {}: {}'.format(
|
||||||
|
service, sentry_unit.info['unit_name'], pid))
|
||||||
|
|
||||||
|
return self._get_dir_mtime(sentry_unit, proc_dir)
|
||||||
|
|
||||||
def service_restarted(self, sentry_unit, service, filename,
|
def service_restarted(self, sentry_unit, service, filename,
|
||||||
pgrep_full=False, sleep_time=20):
|
pgrep_full=None, sleep_time=20):
|
||||||
"""Check if service was restarted.
|
"""Check if service was restarted.
|
||||||
|
|
||||||
Compare a service's start time vs a file's last modification time
|
Compare a service's start time vs a file's last modification time
|
||||||
(such as a config file for that service) to determine if the service
|
(such as a config file for that service) to determine if the service
|
||||||
has been restarted.
|
has been restarted.
|
||||||
"""
|
"""
|
||||||
|
# /!\ DEPRECATION WARNING (beisner):
|
||||||
|
# This method is prone to races in that no before-time is known.
|
||||||
|
# Use validate_service_config_changed instead.
|
||||||
|
|
||||||
|
# NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
|
||||||
|
# used instead of pgrep. pgrep_full is still passed through to ensure
|
||||||
|
# deprecation WARNS. lp1474030
|
||||||
|
self.log.warn('DEPRECATION WARNING: use '
|
||||||
|
'validate_service_config_changed instead of '
|
||||||
|
'service_restarted due to known races.')
|
||||||
|
|
||||||
time.sleep(sleep_time)
|
time.sleep(sleep_time)
|
||||||
if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >=
|
if (self._get_proc_start_time(sentry_unit, service, pgrep_full) >=
|
||||||
self._get_file_mtime(sentry_unit, filename)):
|
self._get_file_mtime(sentry_unit, filename)):
|
||||||
@ -304,15 +325,15 @@ class AmuletUtils(object):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def service_restarted_since(self, sentry_unit, mtime, service,
|
def service_restarted_since(self, sentry_unit, mtime, service,
|
||||||
pgrep_full=False, sleep_time=20,
|
pgrep_full=None, sleep_time=20,
|
||||||
retry_count=2):
|
retry_count=2, retry_sleep_time=30):
|
||||||
"""Check if service was been started after a given time.
|
"""Check if service was been started after a given time.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
sentry_unit (sentry): The sentry unit to check for the service on
|
sentry_unit (sentry): The sentry unit to check for the service on
|
||||||
mtime (float): The epoch time to check against
|
mtime (float): The epoch time to check against
|
||||||
service (string): service name to look for in process table
|
service (string): service name to look for in process table
|
||||||
pgrep_full (boolean): Use full command line search mode with pgrep
|
pgrep_full: [Deprecated] Use full command line search mode with pgrep
|
||||||
sleep_time (int): Seconds to sleep before looking for process
|
sleep_time (int): Seconds to sleep before looking for process
|
||||||
retry_count (int): If service is not found, how many times to retry
|
retry_count (int): If service is not found, how many times to retry
|
||||||
|
|
||||||
@ -321,30 +342,44 @@ class AmuletUtils(object):
|
|||||||
False if service is older than mtime or if service was
|
False if service is older than mtime or if service was
|
||||||
not found.
|
not found.
|
||||||
"""
|
"""
|
||||||
self.log.debug('Checking %s restarted since %s' % (service, mtime))
|
# NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
|
||||||
|
# used instead of pgrep. pgrep_full is still passed through to ensure
|
||||||
|
# deprecation WARNS. lp1474030
|
||||||
|
|
||||||
|
unit_name = sentry_unit.info['unit_name']
|
||||||
|
self.log.debug('Checking that %s service restarted since %s on '
|
||||||
|
'%s' % (service, mtime, unit_name))
|
||||||
time.sleep(sleep_time)
|
time.sleep(sleep_time)
|
||||||
proc_start_time = self._get_proc_start_time(sentry_unit, service,
|
proc_start_time = None
|
||||||
pgrep_full)
|
tries = 0
|
||||||
while retry_count > 0 and not proc_start_time:
|
while tries <= retry_count and not proc_start_time:
|
||||||
self.log.debug('No pid file found for service %s, will retry %i '
|
try:
|
||||||
'more times' % (service, retry_count))
|
proc_start_time = self._get_proc_start_time(sentry_unit,
|
||||||
time.sleep(30)
|
service,
|
||||||
proc_start_time = self._get_proc_start_time(sentry_unit, service,
|
pgrep_full)
|
||||||
pgrep_full)
|
self.log.debug('Attempt {} to get {} proc start time on {} '
|
||||||
retry_count = retry_count - 1
|
'OK'.format(tries, service, unit_name))
|
||||||
|
except IOError:
|
||||||
|
# NOTE(beisner) - race avoidance, proc may not exist yet.
|
||||||
|
# https://bugs.launchpad.net/charm-helpers/+bug/1474030
|
||||||
|
self.log.debug('Attempt {} to get {} proc start time on {} '
|
||||||
|
'failed'.format(tries, service, unit_name))
|
||||||
|
time.sleep(retry_sleep_time)
|
||||||
|
tries += 1
|
||||||
|
|
||||||
if not proc_start_time:
|
if not proc_start_time:
|
||||||
self.log.warn('No proc start time found, assuming service did '
|
self.log.warn('No proc start time found, assuming service did '
|
||||||
'not start')
|
'not start')
|
||||||
return False
|
return False
|
||||||
if proc_start_time >= mtime:
|
if proc_start_time >= mtime:
|
||||||
self.log.debug('proc start time is newer than provided mtime'
|
self.log.debug('Proc start time is newer than provided mtime'
|
||||||
'(%s >= %s)' % (proc_start_time, mtime))
|
'(%s >= %s) on %s (OK)' % (proc_start_time,
|
||||||
|
mtime, unit_name))
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
self.log.warn('proc start time (%s) is older than provided mtime '
|
self.log.warn('Proc start time (%s) is older than provided mtime '
|
||||||
'(%s), service did not restart' % (proc_start_time,
|
'(%s) on %s, service did not '
|
||||||
mtime))
|
'restart' % (proc_start_time, mtime, unit_name))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def config_updated_since(self, sentry_unit, filename, mtime,
|
def config_updated_since(self, sentry_unit, filename, mtime,
|
||||||
@ -374,8 +409,9 @@ class AmuletUtils(object):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def validate_service_config_changed(self, sentry_unit, mtime, service,
|
def validate_service_config_changed(self, sentry_unit, mtime, service,
|
||||||
filename, pgrep_full=False,
|
filename, pgrep_full=None,
|
||||||
sleep_time=20, retry_count=2):
|
sleep_time=20, retry_count=2,
|
||||||
|
retry_sleep_time=30):
|
||||||
"""Check service and file were updated after mtime
|
"""Check service and file were updated after mtime
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -383,9 +419,10 @@ class AmuletUtils(object):
|
|||||||
mtime (float): The epoch time to check against
|
mtime (float): The epoch time to check against
|
||||||
service (string): service name to look for in process table
|
service (string): service name to look for in process table
|
||||||
filename (string): The file to check mtime of
|
filename (string): The file to check mtime of
|
||||||
pgrep_full (boolean): Use full command line search mode with pgrep
|
pgrep_full: [Deprecated] Use full command line search mode with pgrep
|
||||||
sleep_time (int): Seconds to sleep before looking for process
|
sleep_time (int): Initial sleep in seconds to pass to test helpers
|
||||||
retry_count (int): If service is not found, how many times to retry
|
retry_count (int): If service is not found, how many times to retry
|
||||||
|
retry_sleep_time (int): Time in seconds to wait between retries
|
||||||
|
|
||||||
Typical Usage:
|
Typical Usage:
|
||||||
u = OpenStackAmuletUtils(ERROR)
|
u = OpenStackAmuletUtils(ERROR)
|
||||||
@ -402,15 +439,25 @@ class AmuletUtils(object):
|
|||||||
mtime, False if service is older than mtime or if service was
|
mtime, False if service is older than mtime or if service was
|
||||||
not found or if filename was modified before mtime.
|
not found or if filename was modified before mtime.
|
||||||
"""
|
"""
|
||||||
self.log.debug('Checking %s restarted since %s' % (service, mtime))
|
|
||||||
time.sleep(sleep_time)
|
# NOTE(beisner) pgrep_full is no longer implemented, as pidof is now
|
||||||
service_restart = self.service_restarted_since(sentry_unit, mtime,
|
# used instead of pgrep. pgrep_full is still passed through to ensure
|
||||||
service,
|
# deprecation WARNS. lp1474030
|
||||||
pgrep_full=pgrep_full,
|
|
||||||
sleep_time=0,
|
service_restart = self.service_restarted_since(
|
||||||
retry_count=retry_count)
|
sentry_unit, mtime,
|
||||||
config_update = self.config_updated_since(sentry_unit, filename, mtime,
|
service,
|
||||||
sleep_time=0)
|
pgrep_full=pgrep_full,
|
||||||
|
sleep_time=sleep_time,
|
||||||
|
retry_count=retry_count,
|
||||||
|
retry_sleep_time=retry_sleep_time)
|
||||||
|
|
||||||
|
config_update = self.config_updated_since(
|
||||||
|
sentry_unit,
|
||||||
|
filename,
|
||||||
|
mtime,
|
||||||
|
sleep_time=0)
|
||||||
|
|
||||||
return service_restart and config_update
|
return service_restart and config_update
|
||||||
|
|
||||||
def get_sentry_time(self, sentry_unit):
|
def get_sentry_time(self, sentry_unit):
|
||||||
@ -428,7 +475,6 @@ class AmuletUtils(object):
|
|||||||
"""Return a list of all Ubuntu releases in order of release."""
|
"""Return a list of all Ubuntu releases in order of release."""
|
||||||
_d = distro_info.UbuntuDistroInfo()
|
_d = distro_info.UbuntuDistroInfo()
|
||||||
_release_list = _d.all
|
_release_list = _d.all
|
||||||
self.log.debug('Ubuntu release list: {}'.format(_release_list))
|
|
||||||
return _release_list
|
return _release_list
|
||||||
|
|
||||||
def file_to_url(self, file_rel_path):
|
def file_to_url(self, file_rel_path):
|
||||||
@ -568,6 +614,142 @@ class AmuletUtils(object):
|
|||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def validate_sectionless_conf(self, file_contents, expected):
|
||||||
|
"""A crude conf parser. Useful to inspect configuration files which
|
||||||
|
do not have section headers (as would be necessary in order to use
|
||||||
|
the configparser). Such as openstack-dashboard or rabbitmq confs."""
|
||||||
|
for line in file_contents.split('\n'):
|
||||||
|
if '=' in line:
|
||||||
|
args = line.split('=')
|
||||||
|
if len(args) <= 1:
|
||||||
|
continue
|
||||||
|
key = args[0].strip()
|
||||||
|
value = args[1].strip()
|
||||||
|
if key in expected.keys():
|
||||||
|
if expected[key] != value:
|
||||||
|
msg = ('Config mismatch. Expected, actual: {}, '
|
||||||
|
'{}'.format(expected[key], value))
|
||||||
|
amulet.raise_status(amulet.FAIL, msg=msg)
|
||||||
|
|
||||||
|
def get_unit_hostnames(self, units):
|
||||||
|
"""Return a dict of juju unit names to hostnames."""
|
||||||
|
host_names = {}
|
||||||
|
for unit in units:
|
||||||
|
host_names[unit.info['unit_name']] = \
|
||||||
|
str(unit.file_contents('/etc/hostname').strip())
|
||||||
|
self.log.debug('Unit host names: {}'.format(host_names))
|
||||||
|
return host_names
|
||||||
|
|
||||||
|
def run_cmd_unit(self, sentry_unit, cmd):
|
||||||
|
"""Run a command on a unit, return the output and exit code."""
|
||||||
|
output, code = sentry_unit.run(cmd)
|
||||||
|
if code == 0:
|
||||||
|
self.log.debug('{} `{}` command returned {} '
|
||||||
|
'(OK)'.format(sentry_unit.info['unit_name'],
|
||||||
|
cmd, code))
|
||||||
|
else:
|
||||||
|
msg = ('{} `{}` command returned {} '
|
||||||
|
'{}'.format(sentry_unit.info['unit_name'],
|
||||||
|
cmd, code, output))
|
||||||
|
amulet.raise_status(amulet.FAIL, msg=msg)
|
||||||
|
return str(output), code
|
||||||
|
|
||||||
|
def file_exists_on_unit(self, sentry_unit, file_name):
|
||||||
|
"""Check if a file exists on a unit."""
|
||||||
|
try:
|
||||||
|
sentry_unit.file_stat(file_name)
|
||||||
|
return True
|
||||||
|
except IOError:
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
msg = 'Error checking file {}: {}'.format(file_name, e)
|
||||||
|
amulet.raise_status(amulet.FAIL, msg=msg)
|
||||||
|
|
||||||
|
def file_contents_safe(self, sentry_unit, file_name,
|
||||||
|
max_wait=60, fatal=False):
|
||||||
|
"""Get file contents from a sentry unit. Wrap amulet file_contents
|
||||||
|
with retry logic to address races where a file checks as existing,
|
||||||
|
but no longer exists by the time file_contents is called.
|
||||||
|
Return None if file not found. Optionally raise if fatal is True."""
|
||||||
|
unit_name = sentry_unit.info['unit_name']
|
||||||
|
file_contents = False
|
||||||
|
tries = 0
|
||||||
|
while not file_contents and tries < (max_wait / 4):
|
||||||
|
try:
|
||||||
|
file_contents = sentry_unit.file_contents(file_name)
|
||||||
|
except IOError:
|
||||||
|
self.log.debug('Attempt {} to open file {} from {} '
|
||||||
|
'failed'.format(tries, file_name,
|
||||||
|
unit_name))
|
||||||
|
time.sleep(4)
|
||||||
|
tries += 1
|
||||||
|
|
||||||
|
if file_contents:
|
||||||
|
return file_contents
|
||||||
|
elif not fatal:
|
||||||
|
return None
|
||||||
|
elif fatal:
|
||||||
|
msg = 'Failed to get file contents from unit.'
|
||||||
|
amulet.raise_status(amulet.FAIL, msg)
|
||||||
|
|
||||||
|
def port_knock_tcp(self, host="localhost", port=22, timeout=15):
|
||||||
|
"""Open a TCP socket to check for a listening sevice on a host.
|
||||||
|
|
||||||
|
:param host: host name or IP address, default to localhost
|
||||||
|
:param port: TCP port number, default to 22
|
||||||
|
:param timeout: Connect timeout, default to 15 seconds
|
||||||
|
:returns: True if successful, False if connect failed
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Resolve host name if possible
|
||||||
|
try:
|
||||||
|
connect_host = socket.gethostbyname(host)
|
||||||
|
host_human = "{} ({})".format(connect_host, host)
|
||||||
|
except socket.error as e:
|
||||||
|
self.log.warn('Unable to resolve address: '
|
||||||
|
'{} ({}) Trying anyway!'.format(host, e))
|
||||||
|
connect_host = host
|
||||||
|
host_human = connect_host
|
||||||
|
|
||||||
|
# Attempt socket connection
|
||||||
|
try:
|
||||||
|
knock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
knock.settimeout(timeout)
|
||||||
|
knock.connect((connect_host, port))
|
||||||
|
knock.close()
|
||||||
|
self.log.debug('Socket connect OK for host '
|
||||||
|
'{} on port {}.'.format(host_human, port))
|
||||||
|
return True
|
||||||
|
except socket.error as e:
|
||||||
|
self.log.debug('Socket connect FAIL for'
|
||||||
|
' {} port {} ({})'.format(host_human, port, e))
|
||||||
|
return False
|
||||||
|
|
||||||
|
def port_knock_units(self, sentry_units, port=22,
|
||||||
|
timeout=15, expect_success=True):
|
||||||
|
"""Open a TCP socket to check for a listening sevice on each
|
||||||
|
listed juju unit.
|
||||||
|
|
||||||
|
:param sentry_units: list of sentry unit pointers
|
||||||
|
:param port: TCP port number, default to 22
|
||||||
|
:param timeout: Connect timeout, default to 15 seconds
|
||||||
|
:expect_success: True by default, set False to invert logic
|
||||||
|
:returns: None if successful, Failure message otherwise
|
||||||
|
"""
|
||||||
|
for unit in sentry_units:
|
||||||
|
host = unit.info['public-address']
|
||||||
|
connected = self.port_knock_tcp(host, port, timeout)
|
||||||
|
if not connected and expect_success:
|
||||||
|
return 'Socket connect failed.'
|
||||||
|
elif connected and not expect_success:
|
||||||
|
return 'Socket connected unexpectedly.'
|
||||||
|
|
||||||
|
def get_uuid_epoch_stamp(self):
|
||||||
|
"""Returns a stamp string based on uuid4 and epoch time. Useful in
|
||||||
|
generating test messages which need to be unique-ish."""
|
||||||
|
return '[{}-{}]'.format(uuid.uuid4(), time.time())
|
||||||
|
|
||||||
|
# amulet juju action helpers:
|
||||||
def run_action(self, unit_sentry, action,
|
def run_action(self, unit_sentry, action,
|
||||||
_check_output=subprocess.check_output):
|
_check_output=subprocess.check_output):
|
||||||
"""Run the named action on a given unit sentry.
|
"""Run the named action on a given unit sentry.
|
||||||
@ -594,3 +776,12 @@ class AmuletUtils(object):
|
|||||||
output = _check_output(command, universal_newlines=True)
|
output = _check_output(command, universal_newlines=True)
|
||||||
data = json.loads(output)
|
data = json.loads(output)
|
||||||
return data.get(u"status") == "completed"
|
return data.get(u"status") == "completed"
|
||||||
|
|
||||||
|
def status_get(self, unit):
|
||||||
|
"""Return the current service status of this unit."""
|
||||||
|
raw_status, return_code = unit.run(
|
||||||
|
"status-get --format=json --include-data")
|
||||||
|
if return_code != 0:
|
||||||
|
return ("unknown", "")
|
||||||
|
status = json.loads(raw_status)
|
||||||
|
return (status["status"], status["message"])
|
||||||
|
@ -44,20 +44,31 @@ class OpenStackAmuletDeployment(AmuletDeployment):
|
|||||||
Determine if the local branch being tested is derived from its
|
Determine if the local branch being tested is derived from its
|
||||||
stable or next (dev) branch, and based on this, use the corresonding
|
stable or next (dev) branch, and based on this, use the corresonding
|
||||||
stable or next branches for the other_services."""
|
stable or next branches for the other_services."""
|
||||||
|
|
||||||
|
# Charms outside the lp:~openstack-charmers namespace
|
||||||
base_charms = ['mysql', 'mongodb', 'nrpe']
|
base_charms = ['mysql', 'mongodb', 'nrpe']
|
||||||
|
|
||||||
|
# Force these charms to current series even when using an older series.
|
||||||
|
# ie. Use trusty/nrpe even when series is precise, as the P charm
|
||||||
|
# does not possess the necessary external master config and hooks.
|
||||||
|
force_series_current = ['nrpe']
|
||||||
|
|
||||||
if self.series in ['precise', 'trusty']:
|
if self.series in ['precise', 'trusty']:
|
||||||
base_series = self.series
|
base_series = self.series
|
||||||
else:
|
else:
|
||||||
base_series = self.current_next
|
base_series = self.current_next
|
||||||
|
|
||||||
if self.stable:
|
for svc in other_services:
|
||||||
for svc in other_services:
|
if svc['name'] in force_series_current:
|
||||||
|
base_series = self.current_next
|
||||||
|
# If a location has been explicitly set, use it
|
||||||
|
if svc.get('location'):
|
||||||
|
continue
|
||||||
|
if self.stable:
|
||||||
temp = 'lp:charms/{}/{}'
|
temp = 'lp:charms/{}/{}'
|
||||||
svc['location'] = temp.format(base_series,
|
svc['location'] = temp.format(base_series,
|
||||||
svc['name'])
|
svc['name'])
|
||||||
else:
|
else:
|
||||||
for svc in other_services:
|
|
||||||
if svc['name'] in base_charms:
|
if svc['name'] in base_charms:
|
||||||
temp = 'lp:charms/{}/{}'
|
temp = 'lp:charms/{}/{}'
|
||||||
svc['location'] = temp.format(base_series,
|
svc['location'] = temp.format(base_series,
|
||||||
@ -66,6 +77,7 @@ class OpenStackAmuletDeployment(AmuletDeployment):
|
|||||||
temp = 'lp:~openstack-charmers/charms/{}/{}/next'
|
temp = 'lp:~openstack-charmers/charms/{}/{}/next'
|
||||||
svc['location'] = temp.format(self.current_next,
|
svc['location'] = temp.format(self.current_next,
|
||||||
svc['name'])
|
svc['name'])
|
||||||
|
|
||||||
return other_services
|
return other_services
|
||||||
|
|
||||||
def _add_services(self, this_service, other_services):
|
def _add_services(self, this_service, other_services):
|
||||||
@ -77,21 +89,23 @@ class OpenStackAmuletDeployment(AmuletDeployment):
|
|||||||
|
|
||||||
services = other_services
|
services = other_services
|
||||||
services.append(this_service)
|
services.append(this_service)
|
||||||
|
|
||||||
|
# Charms which should use the source config option
|
||||||
use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
|
use_source = ['mysql', 'mongodb', 'rabbitmq-server', 'ceph',
|
||||||
'ceph-osd', 'ceph-radosgw']
|
'ceph-osd', 'ceph-radosgw']
|
||||||
# Most OpenStack subordinate charms do not expose an origin option
|
|
||||||
# as that is controlled by the principle.
|
# Charms which can not use openstack-origin, ie. many subordinates
|
||||||
ignore = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
|
no_origin = ['cinder-ceph', 'hacluster', 'neutron-openvswitch', 'nrpe']
|
||||||
|
|
||||||
if self.openstack:
|
if self.openstack:
|
||||||
for svc in services:
|
for svc in services:
|
||||||
if svc['name'] not in use_source + ignore:
|
if svc['name'] not in use_source + no_origin:
|
||||||
config = {'openstack-origin': self.openstack}
|
config = {'openstack-origin': self.openstack}
|
||||||
self.d.configure(svc['name'], config)
|
self.d.configure(svc['name'], config)
|
||||||
|
|
||||||
if self.source:
|
if self.source:
|
||||||
for svc in services:
|
for svc in services:
|
||||||
if svc['name'] in use_source and svc['name'] not in ignore:
|
if svc['name'] in use_source and svc['name'] not in no_origin:
|
||||||
config = {'source': self.source}
|
config = {'source': self.source}
|
||||||
self.d.configure(svc['name'], config)
|
self.d.configure(svc['name'], config)
|
||||||
|
|
||||||
|
@ -27,6 +27,7 @@ import glanceclient.v1.client as glance_client
|
|||||||
import heatclient.v1.client as heat_client
|
import heatclient.v1.client as heat_client
|
||||||
import keystoneclient.v2_0 as keystone_client
|
import keystoneclient.v2_0 as keystone_client
|
||||||
import novaclient.v1_1.client as nova_client
|
import novaclient.v1_1.client as nova_client
|
||||||
|
import pika
|
||||||
import swiftclient
|
import swiftclient
|
||||||
|
|
||||||
from charmhelpers.contrib.amulet.utils import (
|
from charmhelpers.contrib.amulet.utils import (
|
||||||
@ -602,3 +603,361 @@ class OpenStackAmuletUtils(AmuletUtils):
|
|||||||
self.log.debug('Ceph {} samples (OK): '
|
self.log.debug('Ceph {} samples (OK): '
|
||||||
'{}'.format(sample_type, samples))
|
'{}'.format(sample_type, samples))
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# rabbitmq/amqp specific helpers:
|
||||||
|
def add_rmq_test_user(self, sentry_units,
|
||||||
|
username="testuser1", password="changeme"):
|
||||||
|
"""Add a test user via the first rmq juju unit, check connection as
|
||||||
|
the new user against all sentry units.
|
||||||
|
|
||||||
|
:param sentry_units: list of sentry unit pointers
|
||||||
|
:param username: amqp user name, default to testuser1
|
||||||
|
:param password: amqp user password
|
||||||
|
:returns: None if successful. Raise on error.
|
||||||
|
"""
|
||||||
|
self.log.debug('Adding rmq user ({})...'.format(username))
|
||||||
|
|
||||||
|
# Check that user does not already exist
|
||||||
|
cmd_user_list = 'rabbitmqctl list_users'
|
||||||
|
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
|
||||||
|
if username in output:
|
||||||
|
self.log.warning('User ({}) already exists, returning '
|
||||||
|
'gracefully.'.format(username))
|
||||||
|
return
|
||||||
|
|
||||||
|
perms = '".*" ".*" ".*"'
|
||||||
|
cmds = ['rabbitmqctl add_user {} {}'.format(username, password),
|
||||||
|
'rabbitmqctl set_permissions {} {}'.format(username, perms)]
|
||||||
|
|
||||||
|
# Add user via first unit
|
||||||
|
for cmd in cmds:
|
||||||
|
output, _ = self.run_cmd_unit(sentry_units[0], cmd)
|
||||||
|
|
||||||
|
# Check connection against the other sentry_units
|
||||||
|
self.log.debug('Checking user connect against units...')
|
||||||
|
for sentry_unit in sentry_units:
|
||||||
|
connection = self.connect_amqp_by_unit(sentry_unit, ssl=False,
|
||||||
|
username=username,
|
||||||
|
password=password)
|
||||||
|
connection.close()
|
||||||
|
|
||||||
|
def delete_rmq_test_user(self, sentry_units, username="testuser1"):
|
||||||
|
"""Delete a rabbitmq user via the first rmq juju unit.
|
||||||
|
|
||||||
|
:param sentry_units: list of sentry unit pointers
|
||||||
|
:param username: amqp user name, default to testuser1
|
||||||
|
:param password: amqp user password
|
||||||
|
:returns: None if successful or no such user.
|
||||||
|
"""
|
||||||
|
self.log.debug('Deleting rmq user ({})...'.format(username))
|
||||||
|
|
||||||
|
# Check that the user exists
|
||||||
|
cmd_user_list = 'rabbitmqctl list_users'
|
||||||
|
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_list)
|
||||||
|
|
||||||
|
if username not in output:
|
||||||
|
self.log.warning('User ({}) does not exist, returning '
|
||||||
|
'gracefully.'.format(username))
|
||||||
|
return
|
||||||
|
|
||||||
|
# Delete the user
|
||||||
|
cmd_user_del = 'rabbitmqctl delete_user {}'.format(username)
|
||||||
|
output, _ = self.run_cmd_unit(sentry_units[0], cmd_user_del)
|
||||||
|
|
||||||
|
def get_rmq_cluster_status(self, sentry_unit):
|
||||||
|
"""Execute rabbitmq cluster status command on a unit and return
|
||||||
|
the full output.
|
||||||
|
|
||||||
|
:param unit: sentry unit
|
||||||
|
:returns: String containing console output of cluster status command
|
||||||
|
"""
|
||||||
|
cmd = 'rabbitmqctl cluster_status'
|
||||||
|
output, _ = self.run_cmd_unit(sentry_unit, cmd)
|
||||||
|
self.log.debug('{} cluster_status:\n{}'.format(
|
||||||
|
sentry_unit.info['unit_name'], output))
|
||||||
|
return str(output)
|
||||||
|
|
||||||
|
def get_rmq_cluster_running_nodes(self, sentry_unit):
|
||||||
|
"""Parse rabbitmqctl cluster_status output string, return list of
|
||||||
|
running rabbitmq cluster nodes.
|
||||||
|
|
||||||
|
:param unit: sentry unit
|
||||||
|
:returns: List containing node names of running nodes
|
||||||
|
"""
|
||||||
|
# NOTE(beisner): rabbitmqctl cluster_status output is not
|
||||||
|
# json-parsable, do string chop foo, then json.loads that.
|
||||||
|
str_stat = self.get_rmq_cluster_status(sentry_unit)
|
||||||
|
if 'running_nodes' in str_stat:
|
||||||
|
pos_start = str_stat.find("{running_nodes,") + 15
|
||||||
|
pos_end = str_stat.find("]},", pos_start) + 1
|
||||||
|
str_run_nodes = str_stat[pos_start:pos_end].replace("'", '"')
|
||||||
|
run_nodes = json.loads(str_run_nodes)
|
||||||
|
return run_nodes
|
||||||
|
else:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def validate_rmq_cluster_running_nodes(self, sentry_units):
|
||||||
|
"""Check that all rmq unit hostnames are represented in the
|
||||||
|
cluster_status output of all units.
|
||||||
|
|
||||||
|
:param host_names: dict of juju unit names to host names
|
||||||
|
:param units: list of sentry unit pointers (all rmq units)
|
||||||
|
:returns: None if successful, otherwise return error message
|
||||||
|
"""
|
||||||
|
host_names = self.get_unit_hostnames(sentry_units)
|
||||||
|
errors = []
|
||||||
|
|
||||||
|
# Query every unit for cluster_status running nodes
|
||||||
|
for query_unit in sentry_units:
|
||||||
|
query_unit_name = query_unit.info['unit_name']
|
||||||
|
running_nodes = self.get_rmq_cluster_running_nodes(query_unit)
|
||||||
|
|
||||||
|
# Confirm that every unit is represented in the queried unit's
|
||||||
|
# cluster_status running nodes output.
|
||||||
|
for validate_unit in sentry_units:
|
||||||
|
val_host_name = host_names[validate_unit.info['unit_name']]
|
||||||
|
val_node_name = 'rabbit@{}'.format(val_host_name)
|
||||||
|
|
||||||
|
if val_node_name not in running_nodes:
|
||||||
|
errors.append('Cluster member check failed on {}: {} not '
|
||||||
|
'in {}\n'.format(query_unit_name,
|
||||||
|
val_node_name,
|
||||||
|
running_nodes))
|
||||||
|
if errors:
|
||||||
|
return ''.join(errors)
|
||||||
|
|
||||||
|
def rmq_ssl_is_enabled_on_unit(self, sentry_unit, port=None):
|
||||||
|
"""Check a single juju rmq unit for ssl and port in the config file."""
|
||||||
|
host = sentry_unit.info['public-address']
|
||||||
|
unit_name = sentry_unit.info['unit_name']
|
||||||
|
|
||||||
|
conf_file = '/etc/rabbitmq/rabbitmq.config'
|
||||||
|
conf_contents = str(self.file_contents_safe(sentry_unit,
|
||||||
|
conf_file, max_wait=16))
|
||||||
|
# Checks
|
||||||
|
conf_ssl = 'ssl' in conf_contents
|
||||||
|
conf_port = str(port) in conf_contents
|
||||||
|
|
||||||
|
# Port explicitly checked in config
|
||||||
|
if port and conf_port and conf_ssl:
|
||||||
|
self.log.debug('SSL is enabled @{}:{} '
|
||||||
|
'({})'.format(host, port, unit_name))
|
||||||
|
return True
|
||||||
|
elif port and not conf_port and conf_ssl:
|
||||||
|
self.log.debug('SSL is enabled @{} but not on port {} '
|
||||||
|
'({})'.format(host, port, unit_name))
|
||||||
|
return False
|
||||||
|
# Port not checked (useful when checking that ssl is disabled)
|
||||||
|
elif not port and conf_ssl:
|
||||||
|
self.log.debug('SSL is enabled @{}:{} '
|
||||||
|
'({})'.format(host, port, unit_name))
|
||||||
|
return True
|
||||||
|
elif not port and not conf_ssl:
|
||||||
|
self.log.debug('SSL not enabled @{}:{} '
|
||||||
|
'({})'.format(host, port, unit_name))
|
||||||
|
return False
|
||||||
|
else:
|
||||||
|
msg = ('Unknown condition when checking SSL status @{}:{} '
|
||||||
|
'({})'.format(host, port, unit_name))
|
||||||
|
amulet.raise_status(amulet.FAIL, msg)
|
||||||
|
|
||||||
|
def validate_rmq_ssl_enabled_units(self, sentry_units, port=None):
|
||||||
|
"""Check that ssl is enabled on rmq juju sentry units.
|
||||||
|
|
||||||
|
:param sentry_units: list of all rmq sentry units
|
||||||
|
:param port: optional ssl port override to validate
|
||||||
|
:returns: None if successful, otherwise return error message
|
||||||
|
"""
|
||||||
|
for sentry_unit in sentry_units:
|
||||||
|
if not self.rmq_ssl_is_enabled_on_unit(sentry_unit, port=port):
|
||||||
|
return ('Unexpected condition: ssl is disabled on unit '
|
||||||
|
'({})'.format(sentry_unit.info['unit_name']))
|
||||||
|
return None
|
||||||
|
|
||||||
|
def validate_rmq_ssl_disabled_units(self, sentry_units):
|
||||||
|
"""Check that ssl is enabled on listed rmq juju sentry units.
|
||||||
|
|
||||||
|
:param sentry_units: list of all rmq sentry units
|
||||||
|
:returns: True if successful. Raise on error.
|
||||||
|
"""
|
||||||
|
for sentry_unit in sentry_units:
|
||||||
|
if self.rmq_ssl_is_enabled_on_unit(sentry_unit):
|
||||||
|
return ('Unexpected condition: ssl is enabled on unit '
|
||||||
|
'({})'.format(sentry_unit.info['unit_name']))
|
||||||
|
return None
|
||||||
|
|
||||||
|
def configure_rmq_ssl_on(self, sentry_units, deployment,
|
||||||
|
port=None, max_wait=60):
|
||||||
|
"""Turn ssl charm config option on, with optional non-default
|
||||||
|
ssl port specification. Confirm that it is enabled on every
|
||||||
|
unit.
|
||||||
|
|
||||||
|
:param sentry_units: list of sentry units
|
||||||
|
:param deployment: amulet deployment object pointer
|
||||||
|
:param port: amqp port, use defaults if None
|
||||||
|
:param max_wait: maximum time to wait in seconds to confirm
|
||||||
|
:returns: None if successful. Raise on error.
|
||||||
|
"""
|
||||||
|
self.log.debug('Setting ssl charm config option: on')
|
||||||
|
|
||||||
|
# Enable RMQ SSL
|
||||||
|
config = {'ssl': 'on'}
|
||||||
|
if port:
|
||||||
|
config['ssl_port'] = port
|
||||||
|
|
||||||
|
deployment.configure('rabbitmq-server', config)
|
||||||
|
|
||||||
|
# Confirm
|
||||||
|
tries = 0
|
||||||
|
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
|
||||||
|
while ret and tries < (max_wait / 4):
|
||||||
|
time.sleep(4)
|
||||||
|
self.log.debug('Attempt {}: {}'.format(tries, ret))
|
||||||
|
ret = self.validate_rmq_ssl_enabled_units(sentry_units, port=port)
|
||||||
|
tries += 1
|
||||||
|
|
||||||
|
if ret:
|
||||||
|
amulet.raise_status(amulet.FAIL, ret)
|
||||||
|
|
||||||
|
def configure_rmq_ssl_off(self, sentry_units, deployment, max_wait=60):
|
||||||
|
"""Turn ssl charm config option off, confirm that it is disabled
|
||||||
|
on every unit.
|
||||||
|
|
||||||
|
:param sentry_units: list of sentry units
|
||||||
|
:param deployment: amulet deployment object pointer
|
||||||
|
:param max_wait: maximum time to wait in seconds to confirm
|
||||||
|
:returns: None if successful. Raise on error.
|
||||||
|
"""
|
||||||
|
self.log.debug('Setting ssl charm config option: off')
|
||||||
|
|
||||||
|
# Disable RMQ SSL
|
||||||
|
config = {'ssl': 'off'}
|
||||||
|
deployment.configure('rabbitmq-server', config)
|
||||||
|
|
||||||
|
# Confirm
|
||||||
|
tries = 0
|
||||||
|
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
|
||||||
|
while ret and tries < (max_wait / 4):
|
||||||
|
time.sleep(4)
|
||||||
|
self.log.debug('Attempt {}: {}'.format(tries, ret))
|
||||||
|
ret = self.validate_rmq_ssl_disabled_units(sentry_units)
|
||||||
|
tries += 1
|
||||||
|
|
||||||
|
if ret:
|
||||||
|
amulet.raise_status(amulet.FAIL, ret)
|
||||||
|
|
||||||
|
def connect_amqp_by_unit(self, sentry_unit, ssl=False,
|
||||||
|
port=None, fatal=True,
|
||||||
|
username="testuser1", password="changeme"):
|
||||||
|
"""Establish and return a pika amqp connection to the rabbitmq service
|
||||||
|
running on a rmq juju unit.
|
||||||
|
|
||||||
|
:param sentry_unit: sentry unit pointer
|
||||||
|
:param ssl: boolean, default to False
|
||||||
|
:param port: amqp port, use defaults if None
|
||||||
|
:param fatal: boolean, default to True (raises on connect error)
|
||||||
|
:param username: amqp user name, default to testuser1
|
||||||
|
:param password: amqp user password
|
||||||
|
:returns: pika amqp connection pointer or None if failed and non-fatal
|
||||||
|
"""
|
||||||
|
host = sentry_unit.info['public-address']
|
||||||
|
unit_name = sentry_unit.info['unit_name']
|
||||||
|
|
||||||
|
# Default port logic if port is not specified
|
||||||
|
if ssl and not port:
|
||||||
|
port = 5671
|
||||||
|
elif not ssl and not port:
|
||||||
|
port = 5672
|
||||||
|
|
||||||
|
self.log.debug('Connecting to amqp on {}:{} ({}) as '
|
||||||
|
'{}...'.format(host, port, unit_name, username))
|
||||||
|
|
||||||
|
try:
|
||||||
|
credentials = pika.PlainCredentials(username, password)
|
||||||
|
parameters = pika.ConnectionParameters(host=host, port=port,
|
||||||
|
credentials=credentials,
|
||||||
|
ssl=ssl,
|
||||||
|
connection_attempts=3,
|
||||||
|
retry_delay=5,
|
||||||
|
socket_timeout=1)
|
||||||
|
connection = pika.BlockingConnection(parameters)
|
||||||
|
assert connection.server_properties['product'] == 'RabbitMQ'
|
||||||
|
self.log.debug('Connect OK')
|
||||||
|
return connection
|
||||||
|
except Exception as e:
|
||||||
|
msg = ('amqp connection failed to {}:{} as '
|
||||||
|
'{} ({})'.format(host, port, username, str(e)))
|
||||||
|
if fatal:
|
||||||
|
amulet.raise_status(amulet.FAIL, msg)
|
||||||
|
else:
|
||||||
|
self.log.warn(msg)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def publish_amqp_message_by_unit(self, sentry_unit, message,
|
||||||
|
queue="test", ssl=False,
|
||||||
|
username="testuser1",
|
||||||
|
password="changeme",
|
||||||
|
port=None):
|
||||||
|
"""Publish an amqp message to a rmq juju unit.
|
||||||
|
|
||||||
|
:param sentry_unit: sentry unit pointer
|
||||||
|
:param message: amqp message string
|
||||||
|
:param queue: message queue, default to test
|
||||||
|
:param username: amqp user name, default to testuser1
|
||||||
|
:param password: amqp user password
|
||||||
|
:param ssl: boolean, default to False
|
||||||
|
:param port: amqp port, use defaults if None
|
||||||
|
:returns: None. Raises exception if publish failed.
|
||||||
|
"""
|
||||||
|
self.log.debug('Publishing message to {} queue:\n{}'.format(queue,
|
||||||
|
message))
|
||||||
|
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
|
||||||
|
port=port,
|
||||||
|
username=username,
|
||||||
|
password=password)
|
||||||
|
|
||||||
|
# NOTE(beisner): extra debug here re: pika hang potential:
|
||||||
|
# https://github.com/pika/pika/issues/297
|
||||||
|
# https://groups.google.com/forum/#!topic/rabbitmq-users/Ja0iyfF0Szw
|
||||||
|
self.log.debug('Defining channel...')
|
||||||
|
channel = connection.channel()
|
||||||
|
self.log.debug('Declaring queue...')
|
||||||
|
channel.queue_declare(queue=queue, auto_delete=False, durable=True)
|
||||||
|
self.log.debug('Publishing message...')
|
||||||
|
channel.basic_publish(exchange='', routing_key=queue, body=message)
|
||||||
|
self.log.debug('Closing channel...')
|
||||||
|
channel.close()
|
||||||
|
self.log.debug('Closing connection...')
|
||||||
|
connection.close()
|
||||||
|
|
||||||
|
def get_amqp_message_by_unit(self, sentry_unit, queue="test",
|
||||||
|
username="testuser1",
|
||||||
|
password="changeme",
|
||||||
|
ssl=False, port=None):
|
||||||
|
"""Get an amqp message from a rmq juju unit.
|
||||||
|
|
||||||
|
:param sentry_unit: sentry unit pointer
|
||||||
|
:param queue: message queue, default to test
|
||||||
|
:param username: amqp user name, default to testuser1
|
||||||
|
:param password: amqp user password
|
||||||
|
:param ssl: boolean, default to False
|
||||||
|
:param port: amqp port, use defaults if None
|
||||||
|
:returns: amqp message body as string. Raise if get fails.
|
||||||
|
"""
|
||||||
|
connection = self.connect_amqp_by_unit(sentry_unit, ssl=ssl,
|
||||||
|
port=port,
|
||||||
|
username=username,
|
||||||
|
password=password)
|
||||||
|
channel = connection.channel()
|
||||||
|
method_frame, _, body = channel.basic_get(queue)
|
||||||
|
|
||||||
|
if method_frame:
|
||||||
|
self.log.debug('Retreived message from {} queue:\n{}'.format(queue,
|
||||||
|
body))
|
||||||
|
channel.basic_ack(method_frame.delivery_tag)
|
||||||
|
channel.close()
|
||||||
|
connection.close()
|
||||||
|
return body
|
||||||
|
else:
|
||||||
|
msg = 'No message retrieved.'
|
||||||
|
amulet.raise_status(amulet.FAIL, msg)
|
||||||
|
Loading…
Reference in New Issue
Block a user