Initial support for single instance Cassandra Database

Reasons
- This code adds ability to create/delete instance
with mongodb service type. No other operations supported.
- Trove should support multiple extensions. One of the is NoSQL support.
Main goal is to add support for Apache Cassandra NoSQL database.

Changes
This code gives ability to create/delete instance
with cassandra datastire type. No other operations supported.

Change-Id: Ib48f6392765af49c2df1447c3df46675825d210f
Implements: blueprint cassandra-db-support
This commit is contained in:
Denis Makogon 2014-01-23 13:00:43 +02:00
parent c1c9bdbb8f
commit 39f4c789b3
14 changed files with 872 additions and 28 deletions

View File

@ -215,7 +215,10 @@ common_opts = [
cfg.IntOpt('exists_notification_ticks', default=360,
help='Number of report_intervals to wait between pushing '
'events (see report_interval).'),
cfg.DictOpt('notification_service_id', default={},
cfg.DictOpt('notification_service_id',
default={'mysql': '2f3ff068-2bfb-4f70-9a9d-a6bb65bc084b',
'redis': 'b216ffc5-1947-456c-a4cf-70f94c05f7d0',
'cassandra': '459a230d-4e97-4344-9067-2a54a310b0ed'},
help='Unique ID to tag notification events.'),
cfg.StrOpt('nova_proxy_admin_user', default='',
help="Admin username used to connect to Nova.", secret=True),

View File

@ -15,6 +15,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import fcntl
import struct
import socket
REDHAT = 'redhat'
DEBIAN = 'debian'
@ -68,3 +71,20 @@ def service_discovery(service_candidates):
result['cmd_disable'] = "sudo systemctl disable %s" % service
break
return result
#Uses the Linux SIOCGIFADDR ioctl to find the IP address associated
# with a network interface, given the name of that interface,
# e.g. "eth0". The address is returned as a string containing a dotted quad.
def get_ip_address(ifname='eth0'):
"""
Retrieves IP address which assigned to given network interface
@parameter ifname network interface (ethX, wlanX, etc.)
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15])
)[20:24])

View File

@ -0,0 +1,158 @@
# Copyright 2013 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import os
from trove.common import cfg
from trove.common import exception
from trove.guestagent import volume
from trove.guestagent.datastore.cassandra import service
from trove.guestagent.datastore.cassandra import system
from trove.openstack.common import periodic_task
from trove.openstack.common import log as logging
from trove.openstack.common.gettextutils import _
from trove.guestagent import dbaas
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
USAGE_SLEEP_TIME = CONF.usage_sleep_time # seconds.
USAGE_TIMEOUT = CONF.usage_timeout # seconds.
ERROR_MSG = _("Not supported")
class Manager(periodic_task.PeriodicTasks):
def __init__(self):
self.appStatus = service.CassandraAppStatus()
self.app = service.CassandraApp(self.appStatus)
@periodic_task.periodic_task(ticks_between_runs=3)
def update_status(self, context):
"""Update the status of the Cassandra service"""
self.appStatus.update()
def restart(self, context):
self.app.restart()
def get_filesystem_stats(self, context, fs_path):
"""Gets the filesystem stats for the path given. """
return dbaas.get_filesystem_volume_stats(
system.CASSANDRA_MOUNT_POINT)
def start_db_with_conf_changes(self, context, config_contents):
self.app.start_db_with_conf_changes(config_contents)
def stop_db(self, context, do_not_start_on_reboot=False):
self.app.stop_db(do_not_start_on_reboot=do_not_start_on_reboot)
def reset_configuration(self, context, configuration):
self.app.reset_configuration(configuration)
def prepare(self, context, packages, databases, memory_mb, users,
device_path=None, mount_point=None, backup_info=None,
config_contents=None, root_password=None, overrides=None):
LOG.info(_("Setting status BUILDING"))
self.appStatus.begin_install()
LOG.info("Installing cassandra")
self.app.install_if_needed(packages)
self.app.init_storage_structure()
if config_contents:
LOG.info(_("Config processing"))
self.app.write_config(config_contents)
self.app.make_host_reachable()
if device_path:
device = volume.VolumeDevice(device_path)
device.format()
if os.path.exists(system.CASSANDRA_MOUNT_POINT):
#rsync exiting data
device.migrate_data(system.CASSANDRA_MOUNT_POINT)
#mount the volume
device.mount(system.CASSANDRA_MOUNT_POINT)
LOG.debug(_("Mounting new volume."))
self.app.restart()
self.appStatus.end_install_or_restart()
LOG.info(_('"prepare" call has finished.'))
def change_passwords(self, context, users):
raise exception.TroveError(ERROR_MSG)
def update_attributes(self, context, username, hostname, user_attrs):
raise exception.TroveError(ERROR_MSG)
def create_database(self, context, databases):
raise exception.TroveError(ERROR_MSG)
def create_user(self, context, users):
raise exception.TroveError(ERROR_MSG)
def delete_database(self, context, database):
raise exception.TroveError(ERROR_MSG)
def delete_user(self, context, user):
raise exception.TroveError(ERROR_MSG)
def get_user(self, context, username, hostname):
raise exception.TroveError(ERROR_MSG)
def grant_access(self, context, username, hostname, databases):
raise exception.TroveError(ERROR_MSG)
def revoke_access(self, context, username, hostname, database):
raise exception.TroveError(ERROR_MSG)
def list_access(self, context, username, hostname):
raise exception.TroveError(ERROR_MSG)
def list_databases(self, context, limit=None, marker=None,
include_marker=False):
raise exception.TroveError(ERROR_MSG)
def list_users(self, context, limit=None, marker=None,
include_marker=False):
raise exception.TroveError(ERROR_MSG)
def enable_root(self, context):
raise exception.TroveError(ERROR_MSG)
def is_root_enabled(self, context):
raise exception.TroveError(ERROR_MSG)
def _perform_restore(self, backup_info, context, restore_location, app):
raise exception.TroveError(ERROR_MSG)
def create_backup(self, context, backup_info):
raise exception.TroveError(ERROR_MSG)
def mount_volume(self, context, device_path=None, mount_point=None):
device = volume.VolumeDevice(device_path)
device.mount(mount_point, write_to_fstab=False)
LOG.debug(_("Mounted the volume."))
def unmount_volume(self, context, device_path=None, mount_point=None):
device = volume.VolumeDevice(device_path)
device.unmount(mount_point)
LOG.debug(_("Unmounted the volume."))
def resize_fs(self, context, device_path=None, mount_point=None):
device = volume.VolumeDevice(device_path)
device.resize_fs(mount_point)
LOG.debug(_("Resized the filesystem"))
def update_overrides(self, context, overrides, remove=False):
raise exception.TroveError(ERROR_MSG)
def apply_overrides(self, context, overrides):
raise exception.TroveError(ERROR_MSG)

View File

@ -0,0 +1,205 @@
# Copyright 2013 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import yaml
from trove.common import cfg
from trove.common import utils
from trove.common import exception
from trove.common import instance as rd_instance
from trove.guestagent.common import operating_system
from trove.guestagent.datastore.cassandra import system
from trove.guestagent.datastore import service
from trove.guestagent import pkg
from trove.openstack.common import log as logging
from trove.openstack.common.gettextutils import _
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
packager = pkg.Package()
class CassandraApp(object):
"""Prepares DBaaS on a Guest container."""
def __init__(self, status):
"""By default login with root no password for initial setup. """
self.state_change_wait_time = CONF.state_change_wait_time
self.status = status
def install_if_needed(self, packages):
"""Prepare the guest machine with a cassandra server installation"""
LOG.info(_("Preparing Guest as Cassandra Server"))
if not packager.pkg_is_installed(packages):
self._install_db(packages)
LOG.info(_("Dbaas install_if_needed complete"))
def complete_install_or_restart(self):
self.status.end_install_or_restart()
def _enable_db_on_boot(self):
utils.execute_with_timeout(system.ENABLE_CASSANDRA_ON_BOOT,
shell=True)
def _disable_db_on_boot(self):
utils.execute_with_timeout(system.DISABLE_CASSANDRA_ON_BOOT,
shell=True)
def init_storage_structure(self):
utils.execute_with_timeout(system.INIT_FS, shell=True)
def start_db(self, update_db=False):
self._enable_db_on_boot()
try:
utils.execute_with_timeout(system.START_CASSANDRA,
shell=True)
except exception.ProcessExecutionError:
pass
if not (self.status.
wait_for_real_status_to_change_to(
rd_instance.ServiceStatuses.RUNNING,
self.state_change_wait_time,
update_db)):
try:
utils.execute_with_timeout(system.CASSANDRA_KILL,
shell=True)
except exception.ProcessExecutionError as p:
LOG.error(_("Error killing stalled Cassandra start command."))
LOG.error(p)
self.status.end_install_or_restart()
raise RuntimeError(_("Could not start Cassandra"))
def stop_db(self, update_db=False, do_not_start_on_reboot=False):
if do_not_start_on_reboot:
self._disable_db_on_boot()
utils.execute_with_timeout(system.STOP_CASSANDRA,
shell=True)
if not (self.status.wait_for_real_status_to_change_to(
rd_instance.ServiceStatuses.SHUTDOWN,
self.state_change_wait_time, update_db)):
LOG.error(_("Could not stop Cassandra"))
self.status.end_install_or_restart()
raise RuntimeError(_("Could not stop Cassandra"))
def restart(self):
try:
self.status.begin_restart()
LOG.info(_("Restarting DB"))
self.stop_db()
self.start_db()
finally:
self.status.end_install_or_restart()
def _install_db(self, packages):
"""Install cassandra server"""
LOG.debug(_("Installing cassandra server"))
packager.pkg_install(packages, None, system.TIME_OUT)
LOG.debug(_("Finished installing cassandra server"))
def write_config(self, config_contents):
LOG.info(_('Defining temp config holder at '
'%s') % system.CASSANDRA_TEMP_CONF)
with open(system.CASSANDRA_TEMP_CONF, 'w+') as conf:
conf.write(config_contents)
LOG.info(_('Writing new config'))
utils.execute_with_timeout("sudo", "mv",
system.CASSANDRA_TEMP_CONF,
system.CASSANDRA_CONF)
LOG.info(_('Overriding old config'))
def read_conf(self):
"""Returns cassandra.yaml in dict structure"""
LOG.info(_("Opening cassandra.yaml"))
with open(system.CASSANDRA_CONF, 'r') as config:
LOG.info(_("Preparing YAML object from cassandra.yaml"))
yamled = yaml.load(config.read())
return yamled
def update_config_with_single(self, key, value):
"""Updates single key:value in cassandra.yaml"""
yamled = self.read_conf()
yamled.update({key: value})
LOG.info(_("Updating cassandra.yaml with %(key)s: %(value)s")
% {'key': key, 'value': value})
dump = yaml.dump(yamled, default_flow_style=False)
LOG.info(_("Dumping YAML to stream"))
self.write_config(dump)
def update_conf_with_group(self, group):
"""Updates group of key:value in cassandra.yaml"""
yamled = self.read_conf()
for key, value in group.iteritems():
if key == 'seed':
(yamled.get('seed_provider')[0].
get('parameters')[0].
update({'seeds': value}))
else:
yamled.update({key: value})
LOG.info(_("Updating cassandra.yaml with %(key)s: %(value)s")
% {'key': key, 'value': value})
dump = yaml.dump(yamled, default_flow_style=False)
LOG.info(_("Dumping YAML to stream"))
self.write_config(dump)
def make_host_reachable(self):
updates = {
'rpc_address': "0.0.0.0",
'listen_address': operating_system.get_ip_address(),
'seed': operating_system.get_ip_address()
}
self.update_conf_with_group(updates)
def start_db_with_conf_changes(self, config_contents):
LOG.info(_("Starting cassandra with conf changes..."))
LOG.info(_("inside the guest - cassandra is running %s...")
% self.status.is_running)
if self.status.is_running:
LOG.error(_("Cannot execute start_db_with_conf_changes because "
"cassandra state == %s!") % self.status)
raise RuntimeError("Cassandra not stopped.")
LOG.info(_("Initiating config."))
self.write_config(config_contents)
self.start_db(True)
def reset_configuration(self, configuration):
config_contents = configuration['config_contents']
LOG.info(_("Resetting configuration"))
self.write_config(config_contents)
class CassandraAppStatus(service.BaseDbStatus):
def _get_actual_db_status(self):
try:
# If status check would be successful,
# bot stdin and stdout would contain nothing
out, err = utils.execute_with_timeout(system.CASSANDRA_STATUS,
shell=True)
if "Connection error. Could not connect to" not in err:
return rd_instance.ServiceStatuses.RUNNING
else:
return rd_instance.ServiceStatuses.SHUTDOWN
except exception.ProcessExecutionError as e:
LOG.error(_("Process execution %s") % e)
return rd_instance.ServiceStatuses.SHUTDOWN
except OSError as e:
LOG.error(_("OS Error %s") % e)
return rd_instance.ServiceStatuses.SHUTDOWN

View File

@ -0,0 +1,41 @@
# Copyright 2013 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from trove.common import cfg
from trove.openstack.common import log as logging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
CASSANDRA_DATA_DIR = "/var/lib/cassandra/data"
CASSANDRA_MOUNT_POINT = "/var/lib/cassandra"
CASSANDRA_CONF = "/etc/cassandra/cassandra.yaml"
CASSANDRA_TEMP_CONF = "/tmp/cassandra.yaml"
CASSANDRA_TEMP_DIR = "/tmp/cassandra"
INIT_FS = "sudo mkdir -p %s" % CASSANDRA_MOUNT_POINT
ENABLE_CASSANDRA_ON_BOOT = "sudo update-rc.d cassandra enable"
DISABLE_CASSANDRA_ON_BOOT = "sudo update-rc.d cassandra disable"
# cassandra binary stored at /usr/sbin/
START_CASSANDRA = "sudo /usr/sbin/cassandra"
STOP_CASSANDRA = "sudo killall java 2> /dev/null || true"
CASSANDRA_STATUS = """echo "use system;" > /tmp/check; cqlsh -f /tmp/check"""
CASSANDRA_KILL = "sudo killall java || true"
TIME_OUT = 10000

View File

@ -37,6 +37,7 @@ defaults = {
'mysql': 'trove.guestagent.datastore.mysql.manager.Manager',
'percona': 'trove.guestagent.datastore.mysql.manager.Manager',
'redis': 'trove.guestagent.datastore.redis.manager.Manager',
'cassandra': 'trove.guestagent.datastore.cassandra.manager.Manager',
}
CONF = cfg.CONF

View File

@ -347,7 +347,8 @@ class DebianPackagerMixin(BasePackagerMixin):
# even after successful install, packages can stay unconfigured
# config_opts - is dict with name/value for questions asked by
# interactive configure script
self._fix_package_selections(packages, config_opts)
if config_opts:
self._fix_package_selections(packages, config_opts)
def pkg_version(self, package_name):
p = commands.getstatusoutput("apt-cache policy %s" % package_name)

View File

@ -0,0 +1,77 @@
cluster_name: 'Test Cluster'
num_tokens: 256
hinted_handoff_enabled: true
max_hint_window_in_ms: 10800000 # 3 hours
hinted_handoff_throttle_in_kb: 1024
max_hints_delivery_threads: 2
authenticator: AllowAllAuthenticator
authorizer: AllowAllAuthorizer
permissions_validity_in_ms: 2000
partitioner: org.apache.cassandra.dht.Murmur3Partitioner
data_file_directories:
- /var/lib/cassandra/data
commitlog_directory: /var/lib/cassandra/commitlog
disk_failure_policy: stop
key_cache_size_in_mb:
key_cache_save_period: 14400
row_cache_size_in_mb: 0
row_cache_save_period: 0
saved_caches_directory: /var/lib/cassandra/saved_caches
commitlog_sync: periodic
commitlog_sync_period_in_ms: 10000
commitlog_segment_size_in_mb: 32
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
- seeds: "127.0.0.1"
concurrent_reads: 32
concurrent_writes: 32
memtable_flush_queue_size: 4
trickle_fsync: false
trickle_fsync_interval_in_kb: 10240
storage_port: 7000
ssl_storage_port: 7001
listen_address: localhost
start_native_transport: true
native_transport_port: 9042
start_rpc: true
rpc_address: localhost
rpc_port: 9160
rpc_keepalive: true
rpc_server_type: sync
thrift_framed_transport_size_in_mb: 15
incremental_backups: false
snapshot_before_compaction: false
auto_snapshot: true
tombstone_warn_threshold: 1000
tombstone_failure_threshold: 100000
column_index_size_in_kb: 64
in_memory_compaction_limit_in_mb: 64
multithreaded_compaction: false
compaction_throughput_mb_per_sec: 16
compaction_preheat_key_cache: true
read_request_timeout_in_ms: 5000
range_request_timeout_in_ms: 10000
write_request_timeout_in_ms: 2000
cas_contention_timeout_in_ms: 1000
truncate_request_timeout_in_ms: 60000
request_timeout_in_ms: 10000
cross_node_timeout: false
endpoint_snitch: SimpleSnitch
dynamic_snitch_update_interval_in_ms: 100
dynamic_snitch_reset_interval_in_ms: 600000
dynamic_snitch_badness_threshold: 0.1
request_scheduler: org.apache.cassandra.scheduler.NoScheduler
server_encryption_options:
internode_encryption: none
keystore: conf/.keystore
keystore_password: cassandra
truststore: conf/.truststore
truststore_password: cassandra
client_encryption_options:
enabled: false
keystore: conf/.keystore
keystore_password: cassandra
internode_compression: all
inter_dc_tcp_nodelay: false
preheat_kernel_page_cache: false

View File

@ -0,0 +1,79 @@
HeatTemplateFormatVersion: '2012-12-12'
Description: Instance creation template for cassandra
Parameters:
Flavor:
Type: String
VolumeSize:
Type: Number
Default : '1'
InstanceId:
Type: String
ImageId:
Type: String
DatastoreManager:
Type: String
AvailabilityZone:
Type: String
Default: nova
TenantId:
Type: String
Resources:
BaseInstance:
Type: AWS::EC2::Instance
Metadata:
AWS::CloudFormation::Init:
config:
files:
/etc/guest_info:
content:
Fn::Join:
- ''
- ["[DEFAULT]\nguest_id=", {Ref: InstanceId},
"\ndatastore_manager=", {Ref: DatastoreManager},
"\ntenant_id=", {Ref: TenantId}]
mode: '000644'
owner: root
group: root
Properties:
ImageId: {Ref: ImageId}
InstanceType: {Ref: Flavor}
AvailabilityZone: {Ref: AvailabilityZone}
SecurityGroups : [{Ref: CassandraDbaasSG}]
UserData:
Fn::Base64:
Fn::Join:
- ''
- ["#!/bin/bash -v\n",
"/opt/aws/bin/cfn-init\n",
"sudo service trove-guest start\n"]
{% if volume_support %}
DataVolume:
Type: AWS::EC2::Volume
Properties:
Size: {Ref: VolumeSize}
AvailabilityZone: {Ref: AvailabilityZone}
Tags:
- {Key: Usage, Value: Test}
MountPoint:
Type: AWS::EC2::VolumeAttachment
Properties:
InstanceId: {Ref: BaseInstance}
VolumeId: {Ref: DataVolume}
Device: /dev/vdb
{% endif %}
CassandraDbaasSG:
Type: AWS::EC2::SecurityGroup
Properties:
GroupDescription: Default Security group for Cassandra
SecurityGroupIngress:
- IpProtocol: "tcp"
FromPort: "9160"
ToPort: "9160"
CidrIp: "0.0.0.0/0"
DatabaseIPAddress:
Type: AWS::EC2::EIP
DatabaseIPAssoc :
Type: AWS::EC2::EIPAssociation
Properties:
InstanceId: {Ref: BaseInstance}
EIP: {Ref: DatabaseIPAddress}

View File

@ -0,0 +1 @@

View File

@ -85,5 +85,7 @@ class HeatTemplateLoadTest(testtools.TestCase):
'mysql-blah')
def test_heat_template_load_success(self):
htmpl = template.load_heat_template('mysql')
self.assertNotEqual(None, htmpl)
mysql_tmpl = template.load_heat_template('mysql')
cassandra_tmpl = template.load_heat_template('cassandra')
self.assertIsNotNone(mysql_tmpl)
self.assertIsNotNone(cassandra_tmpl)

View File

@ -31,6 +31,7 @@ import testtools
from testtools.matchers import Is
from testtools.matchers import Equals
from testtools.matchers import Not
from trove.common.exception import ProcessExecutionError
from trove.common import utils
from trove.common import instance as rd_instance
from trove.conductor import api as conductor_api
@ -43,6 +44,7 @@ from trove.guestagent.datastore.service import BaseDbStatus
from trove.guestagent.datastore.redis import service as rservice
from trove.guestagent.datastore.redis.service import RedisApp
from trove.guestagent.datastore.redis import system as RedisSystem
from trove.guestagent.datastore.cassandra import service as cass_service
from trove.guestagent.datastore.mysql.service import MySqlAdmin
from trove.guestagent.datastore.mysql.service import MySqlRootAccess
from trove.guestagent.datastore.mysql.service import MySqlApp
@ -134,7 +136,6 @@ class DbaasTest(testtools.TestCase):
def test_load_mysqld_options_error(self):
from trove.common.exception import ProcessExecutionError
dbaas.utils.execute = Mock(side_effect=ProcessExecutionError())
self.assertFalse(dbaas.load_mysqld_options())
@ -543,7 +544,9 @@ class MySqlAppTest(testtools.TestCase):
self.mySqlApp.stop_db(True)
self.assertTrue(conductor_api.API.heartbeat.called_once_with(
self.FAKE_ID, {'service_status': 'shutdown'}))
self.FAKE_ID,
{'service_status':
rd_instance.ServiceStatuses.SHUTDOWN.description}))
def test_stop_mysql_error(self):
@ -564,7 +567,9 @@ class MySqlAppTest(testtools.TestCase):
self.assertTrue(self.mySqlApp.stop_db.called)
self.assertTrue(self.mySqlApp.start_mysql.called)
self.assertTrue(conductor_api.API.heartbeat.called_once_with(
self.FAKE_ID, {'service_status': 'running'}))
self.FAKE_ID,
{'service_status':
rd_instance.ServiceStatuses.RUNNING.description}))
def test_restart_mysql_wont_start_up(self):
@ -581,7 +586,6 @@ class MySqlAppTest(testtools.TestCase):
def test_wipe_ib_logfiles_no_file(self):
from trove.common.exception import ProcessExecutionError
processexecerror = ProcessExecutionError('No such file or directory')
dbaas.utils.execute_with_timeout = Mock(side_effect=processexecerror)
@ -589,7 +593,6 @@ class MySqlAppTest(testtools.TestCase):
def test_wipe_ib_logfiles_error(self):
from trove.common.exception import ProcessExecutionError
mocked = Mock(side_effect=ProcessExecutionError('Error'))
dbaas.utils.execute_with_timeout = mocked
@ -612,7 +615,9 @@ class MySqlAppTest(testtools.TestCase):
self.mySqlApp.start_mysql(update_db=True)
self.assertTrue(conductor_api.API.heartbeat.called_once_with(
self.FAKE_ID, {'service_status': 'running'}))
self.FAKE_ID,
{'service_status':
rd_instance.ServiceStatuses.RUNNING.description}))
def test_start_mysql_runs_forever(self):
@ -623,12 +628,13 @@ class MySqlAppTest(testtools.TestCase):
self.assertRaises(RuntimeError, self.mySqlApp.start_mysql)
self.assertTrue(conductor_api.API.heartbeat.called_once_with(
self.FAKE_ID, {'service_status': 'shutdown'}))
self.FAKE_ID,
{'service_status':
rd_instance.ServiceStatuses.SHUTDOWN.description}))
def test_start_mysql_error(self):
self.mySqlApp._enable_mysql_on_boot = Mock()
from trove.common.exception import ProcessExecutionError
mocked = Mock(side_effect=ProcessExecutionError('Error'))
dbaas.utils.execute_with_timeout = mocked
@ -924,7 +930,6 @@ class ServiceRegistryTest(testtools.TestCase):
dbaas_sr.get_custom_managers = Mock(return_value=
datastore_registry_ext_test)
test_dict = dbaas_sr.datastore_registry()
self.assertEqual(4, len(test_dict))
self.assertEqual(test_dict.get('test'),
datastore_registry_ext_test.get('test', None))
self.assertEqual(test_dict.get('mysql'),
@ -936,6 +941,9 @@ class ServiceRegistryTest(testtools.TestCase):
self.assertEqual(test_dict.get('redis'),
'trove.guestagent.datastore.redis.'
'manager.Manager')
self.assertEqual(test_dict.get('cassandra'),
'trove.guestagent.datastore.cassandra.'
'manager.Manager')
def test_datastore_registry_with_existing_manager(self):
datastore_registry_ext_test = {
@ -945,7 +953,6 @@ class ServiceRegistryTest(testtools.TestCase):
dbaas_sr.get_custom_managers = Mock(return_value=
datastore_registry_ext_test)
test_dict = dbaas_sr.datastore_registry()
self.assertEqual(3, len(test_dict))
self.assertEqual(test_dict.get('mysql'),
'trove.guestagent.datastore.mysql.'
'manager.Manager123')
@ -954,13 +961,15 @@ class ServiceRegistryTest(testtools.TestCase):
'manager.Manager')
self.assertEqual(test_dict.get('redis'),
'trove.guestagent.datastore.redis.manager.Manager')
self.assertEqual(test_dict.get('cassandra'),
'trove.guestagent.datastore.cassandra.'
'manager.Manager')
def test_datastore_registry_with_blank_dict(self):
datastore_registry_ext_test = dict()
dbaas_sr.get_custom_managers = Mock(return_value=
datastore_registry_ext_test)
test_dict = dbaas_sr.datastore_registry()
self.assertEqual(3, len(test_dict))
self.assertEqual(test_dict.get('mysql'),
'trove.guestagent.datastore.mysql.'
'manager.Manager')
@ -969,6 +978,9 @@ class ServiceRegistryTest(testtools.TestCase):
'manager.Manager')
self.assertEqual(test_dict.get('redis'),
'trove.guestagent.datastore.redis.manager.Manager')
self.assertEqual(test_dict.get('cassandra'),
'trove.guestagent.datastore.cassandra.'
'manager.Manager')
class KeepAliveConnectionTest(testtools.TestCase):
@ -1171,7 +1183,6 @@ class MySqlAppStatusTest(testtools.TestCase):
def test_get_actual_db_status_error_shutdown(self):
from trove.common.exception import ProcessExecutionError
mocked = Mock(side_effect=ProcessExecutionError())
dbaas.utils.execute_with_timeout = mocked
dbaas.load_mysqld_options = Mock()
@ -1184,7 +1195,6 @@ class MySqlAppStatusTest(testtools.TestCase):
def test_get_actual_db_status_error_crashed(self):
from trove.common.exception import ProcessExecutionError
dbaas.utils.execute_with_timeout = MagicMock(
side_effect=[ProcessExecutionError(), ("some output", None)])
dbaas.load_mysqld_options = Mock()
@ -1377,3 +1387,143 @@ class TestRedisApp(testtools.TestCase):
run_as_root=True,
root_helper='sudo')
verify(mock_status).end_install_or_restart()
class CassandraDBAppTest(testtools.TestCase):
def setUp(self):
super(CassandraDBAppTest, self).setUp()
self.utils_execute_with_timeout = (cass_service .
utils.execute_with_timeout)
self.sleep = time.sleep
self.pkg_version = cass_service.packager.pkg_version
self.pkg = cass_service.packager
util.init_db()
self.FAKE_ID = str(uuid4())
InstanceServiceStatus.create(instance_id=self.FAKE_ID,
status=rd_instance.ServiceStatuses.NEW)
self.appStatus = FakeAppStatus(self.FAKE_ID,
rd_instance.ServiceStatuses.NEW)
self.cassandra = cass_service.CassandraApp(self.appStatus)
def tearDown(self):
super(CassandraDBAppTest, self).tearDown()
cass_service.utils.execute_with_timeout = (self.
utils_execute_with_timeout)
time.sleep = self.sleep
cass_service.packager.pkg_version = self.pkg_version
cass_service.packager = self.pkg
InstanceServiceStatus.find_by(instance_id=self.FAKE_ID).delete()
def assert_reported_status(self, expected_status):
service_status = InstanceServiceStatus.find_by(
instance_id=self.FAKE_ID)
self.assertEqual(expected_status, service_status.status)
def test_stop_db(self):
cass_service.utils.execute_with_timeout = Mock()
self.appStatus.set_next_status(
rd_instance.ServiceStatuses.SHUTDOWN)
self.cassandra.stop_db()
self.assert_reported_status(rd_instance.ServiceStatuses.NEW)
def test_stop_db_with_db_update(self):
cass_service.utils.execute_with_timeout = Mock()
self.appStatus.set_next_status(
rd_instance.ServiceStatuses.SHUTDOWN)
self.cassandra.stop_db(True)
self.assertTrue(conductor_api.API.heartbeat.called_once_with(
self.FAKE_ID,
{'service_status':
rd_instance.ServiceStatuses.SHUTDOWN.description}))
def test_stop_db_error(self):
cass_service.utils.execute_with_timeout = Mock()
self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING)
self.cassandra.state_change_wait_time = 1
self.assertRaises(RuntimeError, self.cassandra.stop_db)
def test_restart(self):
self.cassandra.stop_db = Mock()
self.cassandra.start_db = Mock()
self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING)
self.cassandra.restart()
self.assertTrue(conductor_api.API.heartbeat.called_once_with(
self.FAKE_ID,
{'service_status':
rd_instance.ServiceStatuses.RUNNING.description}))
self.assert_reported_status(rd_instance.ServiceStatuses.NEW)
def test_start_cassandra(self):
cass_service.utils.execute_with_timeout = Mock()
self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING)
self.cassandra.start_db()
self.assert_reported_status(rd_instance.ServiceStatuses.NEW)
def test_start_cassandra_runs_forever(self):
cass_service.utils.execute_with_timeout = Mock()
(self.cassandra.status.
wait_for_real_status_to_change_to) = Mock(return_value=False)
self.appStatus.set_next_status(rd_instance.ServiceStatuses.SHUTDOWN)
self.assertRaises(RuntimeError, self.cassandra.stop_db)
self.assertTrue(conductor_api.API.heartbeat.called_once_with(
self.FAKE_ID,
{'service_status':
rd_instance.ServiceStatuses.SHUTDOWN.description}))
def test_start_db_with_db_update(self):
cass_service.utils.execute_with_timeout = Mock()
self.appStatus.set_next_status(
rd_instance.ServiceStatuses.RUNNING)
self.cassandra.start_db(True)
self.assertTrue(conductor_api.API.heartbeat.called_once_with(
self.FAKE_ID,
{'service_status':
rd_instance.ServiceStatuses.RUNNING.description}))
self.assert_reported_status(rd_instance.ServiceStatuses.NEW)
def test_start_cassandra_error(self):
self.cassandra._enable_db_on_boot = Mock()
self.cassandra.state_change_wait_time = 1
cass_service.utils.execute_with_timeout = Mock(
side_effect=ProcessExecutionError('Error'))
self.assertRaises(RuntimeError, self.cassandra.start_db)
def test_install(self):
self.cassandra._install_db = Mock()
self.pkg.pkg_is_installed = Mock(return_value=False)
self.cassandra.install_if_needed(['cassandra'])
self.assertTrue(self.cassandra._install_db.called)
self.assert_reported_status(rd_instance.ServiceStatuses.NEW)
def test_install_install_error(self):
from trove.guestagent import pkg
self.cassandra.start_db = Mock()
self.cassandra.stop_db = Mock()
self.pkg.pkg_is_installed = Mock(return_value=False)
self.cassandra._install_db = Mock(
side_effect=pkg.PkgPackageStateError("Install error"))
self.assertRaises(pkg.PkgPackageStateError,
self.cassandra.install_if_needed,
['cassandra=1.2.10'])
self.assert_reported_status(rd_instance.ServiceStatuses.NEW)

View File

@ -15,11 +15,15 @@
import os
import testtools
from mock import Mock
from mockito import verify, when, unstub, any, mock, never
from testtools.matchers import Is, Equals, Not
from trove.common.context import TroveContext
from trove.common.instance import ServiceStatuses
from trove.guestagent import volume
from trove.guestagent.common import operating_system
from trove.guestagent.datastore.cassandra import service as cass_service
from trove.guestagent.datastore.cassandra import manager as cass_manager
from trove.guestagent.datastore.mysql.manager import Manager
import trove.guestagent.datastore.mysql.service as dbaas
from trove.guestagent.datastore.redis.manager import Manager as RedisManager
@ -31,7 +35,6 @@ from trove.guestagent import pkg
class GuestAgentManagerTest(testtools.TestCase):
def setUp(self):
super(GuestAgentManagerTest, self).setUp()
self.context = TroveContext()
@ -165,7 +168,6 @@ class GuestAgentManagerTest(testtools.TestCase):
def _prepare_dynamic(self, device_path='/dev/vdb', is_mysql_installed=True,
backup_id=None, is_root_enabled=False,
overrides=None):
# covering all outcomes is starting to cause trouble here
COUNT = 1 if device_path else 0
backup_info = None
@ -233,10 +235,6 @@ class RedisGuestAgentManagerTest(testtools.TestCase):
self.manager = RedisManager()
self.packages = 'redis-server'
self.origin_RedisAppStatus = redis_service.RedisAppStatus
self.origin_os_path_exists = os.path.exists
self.origin_format = volume.VolumeDevice.format
self.origin_migrate_data = volume.VolumeDevice.migrate_data
self.origin_mount = volume.VolumeDevice.mount
self.origin_stop_redis = redis_service.RedisApp.stop_db
self.origin_start_redis = redis_service.RedisApp.start_redis
self.origin_install_redis = redis_service.RedisApp._install_redis
@ -244,10 +242,6 @@ class RedisGuestAgentManagerTest(testtools.TestCase):
def tearDown(self):
super(RedisGuestAgentManagerTest, self).tearDown()
redis_service.RedisAppStatus = self.origin_RedisAppStatus
os.path.exists = self.origin_os_path_exists
volume.VolumeDevice.format = self.origin_format
volume.VolumeDevice.migrate_data = self.origin_migrate_data
volume.VolumeDevice.mount = self.origin_mount
redis_service.RedisApp.stop_db = self.origin_stop_redis
redis_service.RedisApp.start_redis = self.origin_start_redis
redis_service.RedisApp._install_redis = self.origin_install_redis
@ -314,3 +308,115 @@ class RedisGuestAgentManagerTest(testtools.TestCase):
self.manager.stop_db(self.context)
verify(redis_service.RedisAppStatus).get()
verify(redis_service.RedisApp).stop_db(do_not_start_on_reboot=False)
class GuestAgentCassandraDBManagerTest(testtools.TestCase):
def setUp(self):
super(GuestAgentCassandraDBManagerTest, self).setUp()
self.real_status = cass_service.CassandraAppStatus.set_status
class FakeInstanceServiceStatus(object):
status = ServiceStatuses.NEW
def save(self):
pass
cass_service.CassandraAppStatus.set_status = Mock(
return_value=FakeInstanceServiceStatus())
self.context = TroveContext()
self.manager = cass_manager.Manager()
self.pkg = cass_service.packager
self.real_db_app_status = cass_service.CassandraAppStatus
self.origin_os_path_exists = os.path.exists
self.origin_format = volume.VolumeDevice.format
self.origin_migrate_data = volume.VolumeDevice.migrate_data
self.origin_mount = volume.VolumeDevice.mount
self.origin_stop_db = cass_service.CassandraApp.stop_db
self.origin_start_db = cass_service.CassandraApp.start_db
self.origin_install_db = cass_service.CassandraApp._install_db
self.original_get_ip = operating_system.get_ip_address
self.orig_make_host_reachable = (
cass_service.CassandraApp.make_host_reachable)
def tearDown(self):
super(GuestAgentCassandraDBManagerTest, self).tearDown()
cass_service.packager = self.pkg
cass_service.CassandraAppStatus.set_status = self.real_db_app_status
os.path.exists = self.origin_os_path_exists
volume.VolumeDevice.format = self.origin_format
volume.VolumeDevice.migrate_data = self.origin_migrate_data
volume.VolumeDevice.mount = self.origin_mount
cass_service.CassandraApp.stop_db = self.origin_stop_db
cass_service.CassandraApp.start_db = self.origin_start_db
cass_service.CassandraApp._install_db = self.origin_install_db
operating_system.get_ip_address = self.original_get_ip
cass_service.CassandraApp.make_host_reachable = (
self.orig_make_host_reachable)
def test_update_status(self):
mock_status = mock()
self.manager.appStatus = mock_status
self.manager.update_status(self.context)
verify(mock_status).update()
def test_prepare_pkg(self):
self._prepare_dynamic(['cassandra'])
def test_prepare_no_pkg(self):
self._prepare_dynamic([])
def test_prepare_db_not_installed(self):
self._prepare_dynamic([], is_db_installed=False)
def test_prepare_db_not_installed_no_package(self):
self._prepare_dynamic([],
is_db_installed=True)
def _prepare_dynamic(self, packages,
config_content=any(), device_path='/dev/vdb',
is_db_installed=True, backup_id=None,
is_root_enabled=False,
overrides=None):
# covering all outcomes is starting to cause trouble here
if not backup_id:
backup_info = {'id': backup_id,
'location': 'fake-location',
'type': 'InnoBackupEx',
'checksum': 'fake-checksum',
}
mock_status = mock()
self.manager.appStatus = mock_status
when(mock_status).begin_install().thenReturn(None)
mock_app = mock()
self.manager.app = mock_app
when(mock_app).install_if_needed(packages).thenReturn(None)
(when(pkg.Package).pkg_is_installed(any()).
thenReturn(is_db_installed))
when(mock_app).init_storage_structure().thenReturn(None)
when(mock_app).write_config(config_content).thenReturn(None)
when(mock_app).make_host_reachable().thenReturn(None)
when(mock_app).restart().thenReturn(None)
when(os.path).exists(any()).thenReturn(True)
when(volume.VolumeDevice).format().thenReturn(None)
when(volume.VolumeDevice).migrate_data(any()).thenReturn(None)
when(volume.VolumeDevice).mount().thenReturn(None)
# invocation
self.manager.prepare(context=self.context, packages=packages,
config_contents=config_content,
databases=None,
memory_mb='2048', users=None,
device_path=device_path,
mount_point="/var/lib/cassandra",
backup_info=backup_info,
overrides=None)
# verification/assertion
verify(mock_status).begin_install()
verify(mock_app).install_if_needed(packages)
verify(mock_app).init_storage_structure()
verify(mock_app).make_host_reachable()
verify(mock_app).restart()