From 39f4c789b307a150c1e95a0468bf502ec170b84f Mon Sep 17 00:00:00 2001 From: Denis Makogon Date: Thu, 23 Jan 2014 13:00:43 +0200 Subject: [PATCH] Initial support for single instance Cassandra Database Reasons - This code adds ability to create/delete instance with mongodb service type. No other operations supported. - Trove should support multiple extensions. One of the is NoSQL support. Main goal is to add support for Apache Cassandra NoSQL database. Changes This code gives ability to create/delete instance with cassandra datastire type. No other operations supported. Change-Id: Ib48f6392765af49c2df1447c3df46675825d210f Implements: blueprint cassandra-db-support --- trove/common/cfg.py | 5 +- trove/guestagent/common/operating_system.py | 20 ++ .../datastore/cassandra/__init__.py | 0 .../guestagent/datastore/cassandra/manager.py | 158 ++++++++++++++ .../guestagent/datastore/cassandra/service.py | 205 ++++++++++++++++++ .../guestagent/datastore/cassandra/system.py | 41 ++++ trove/guestagent/dbaas.py | 1 + trove/guestagent/pkg.py | 3 +- trove/templates/cassandra/config.template | 77 +++++++ trove/templates/cassandra/heat.template | 79 +++++++ .../cassandra/override.config.template | 1 + trove/tests/unittests/common/test_template.py | 6 +- .../tests/unittests/guestagent/test_dbaas.py | 176 +++++++++++++-- .../unittests/guestagent/test_manager.py | 128 ++++++++++- 14 files changed, 872 insertions(+), 28 deletions(-) create mode 100644 trove/guestagent/datastore/cassandra/__init__.py create mode 100644 trove/guestagent/datastore/cassandra/manager.py create mode 100644 trove/guestagent/datastore/cassandra/service.py create mode 100644 trove/guestagent/datastore/cassandra/system.py create mode 100644 trove/templates/cassandra/config.template create mode 100644 trove/templates/cassandra/heat.template create mode 100644 trove/templates/cassandra/override.config.template diff --git a/trove/common/cfg.py b/trove/common/cfg.py index 1a9688e1e7..c04174c27a 100644 --- a/trove/common/cfg.py +++ b/trove/common/cfg.py @@ -215,7 +215,10 @@ common_opts = [ cfg.IntOpt('exists_notification_ticks', default=360, help='Number of report_intervals to wait between pushing ' 'events (see report_interval).'), - cfg.DictOpt('notification_service_id', default={}, + cfg.DictOpt('notification_service_id', + default={'mysql': '2f3ff068-2bfb-4f70-9a9d-a6bb65bc084b', + 'redis': 'b216ffc5-1947-456c-a4cf-70f94c05f7d0', + 'cassandra': '459a230d-4e97-4344-9067-2a54a310b0ed'}, help='Unique ID to tag notification events.'), cfg.StrOpt('nova_proxy_admin_user', default='', help="Admin username used to connect to Nova.", secret=True), diff --git a/trove/guestagent/common/operating_system.py b/trove/guestagent/common/operating_system.py index e591db4830..3b444b7c52 100644 --- a/trove/guestagent/common/operating_system.py +++ b/trove/guestagent/common/operating_system.py @@ -15,6 +15,9 @@ # License for the specific language governing permissions and limitations # under the License. import os +import fcntl +import struct +import socket REDHAT = 'redhat' DEBIAN = 'debian' @@ -68,3 +71,20 @@ def service_discovery(service_candidates): result['cmd_disable'] = "sudo systemctl disable %s" % service break return result + + +#Uses the Linux SIOCGIFADDR ioctl to find the IP address associated +# with a network interface, given the name of that interface, +# e.g. "eth0". The address is returned as a string containing a dotted quad. +def get_ip_address(ifname='eth0'): + """ + Retrieves IP address which assigned to given network interface + + @parameter ifname network interface (ethX, wlanX, etc.) + """ + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + return socket.inet_ntoa(fcntl.ioctl( + s.fileno(), + 0x8915, # SIOCGIFADDR + struct.pack('256s', ifname[:15]) + )[20:24]) diff --git a/trove/guestagent/datastore/cassandra/__init__.py b/trove/guestagent/datastore/cassandra/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/trove/guestagent/datastore/cassandra/manager.py b/trove/guestagent/datastore/cassandra/manager.py new file mode 100644 index 0000000000..dc7aa0180f --- /dev/null +++ b/trove/guestagent/datastore/cassandra/manager.py @@ -0,0 +1,158 @@ +# Copyright 2013 Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import os +from trove.common import cfg +from trove.common import exception +from trove.guestagent import volume +from trove.guestagent.datastore.cassandra import service +from trove.guestagent.datastore.cassandra import system +from trove.openstack.common import periodic_task +from trove.openstack.common import log as logging +from trove.openstack.common.gettextutils import _ +from trove.guestagent import dbaas + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) +USAGE_SLEEP_TIME = CONF.usage_sleep_time # seconds. +USAGE_TIMEOUT = CONF.usage_timeout # seconds. +ERROR_MSG = _("Not supported") + + +class Manager(periodic_task.PeriodicTasks): + + def __init__(self): + self.appStatus = service.CassandraAppStatus() + self.app = service.CassandraApp(self.appStatus) + + @periodic_task.periodic_task(ticks_between_runs=3) + def update_status(self, context): + """Update the status of the Cassandra service""" + self.appStatus.update() + + def restart(self, context): + self.app.restart() + + def get_filesystem_stats(self, context, fs_path): + """Gets the filesystem stats for the path given. """ + return dbaas.get_filesystem_volume_stats( + system.CASSANDRA_MOUNT_POINT) + + def start_db_with_conf_changes(self, context, config_contents): + self.app.start_db_with_conf_changes(config_contents) + + def stop_db(self, context, do_not_start_on_reboot=False): + self.app.stop_db(do_not_start_on_reboot=do_not_start_on_reboot) + + def reset_configuration(self, context, configuration): + self.app.reset_configuration(configuration) + + def prepare(self, context, packages, databases, memory_mb, users, + device_path=None, mount_point=None, backup_info=None, + config_contents=None, root_password=None, overrides=None): + LOG.info(_("Setting status BUILDING")) + self.appStatus.begin_install() + LOG.info("Installing cassandra") + self.app.install_if_needed(packages) + self.app.init_storage_structure() + if config_contents: + LOG.info(_("Config processing")) + self.app.write_config(config_contents) + self.app.make_host_reachable() + if device_path: + device = volume.VolumeDevice(device_path) + device.format() + if os.path.exists(system.CASSANDRA_MOUNT_POINT): + #rsync exiting data + device.migrate_data(system.CASSANDRA_MOUNT_POINT) + #mount the volume + device.mount(system.CASSANDRA_MOUNT_POINT) + LOG.debug(_("Mounting new volume.")) + self.app.restart() + + self.appStatus.end_install_or_restart() + LOG.info(_('"prepare" call has finished.')) + + def change_passwords(self, context, users): + raise exception.TroveError(ERROR_MSG) + + def update_attributes(self, context, username, hostname, user_attrs): + raise exception.TroveError(ERROR_MSG) + + def create_database(self, context, databases): + raise exception.TroveError(ERROR_MSG) + + def create_user(self, context, users): + raise exception.TroveError(ERROR_MSG) + + def delete_database(self, context, database): + raise exception.TroveError(ERROR_MSG) + + def delete_user(self, context, user): + raise exception.TroveError(ERROR_MSG) + + def get_user(self, context, username, hostname): + raise exception.TroveError(ERROR_MSG) + + def grant_access(self, context, username, hostname, databases): + raise exception.TroveError(ERROR_MSG) + + def revoke_access(self, context, username, hostname, database): + raise exception.TroveError(ERROR_MSG) + + def list_access(self, context, username, hostname): + raise exception.TroveError(ERROR_MSG) + + def list_databases(self, context, limit=None, marker=None, + include_marker=False): + raise exception.TroveError(ERROR_MSG) + + def list_users(self, context, limit=None, marker=None, + include_marker=False): + raise exception.TroveError(ERROR_MSG) + + def enable_root(self, context): + raise exception.TroveError(ERROR_MSG) + + def is_root_enabled(self, context): + raise exception.TroveError(ERROR_MSG) + + def _perform_restore(self, backup_info, context, restore_location, app): + raise exception.TroveError(ERROR_MSG) + + def create_backup(self, context, backup_info): + raise exception.TroveError(ERROR_MSG) + + def mount_volume(self, context, device_path=None, mount_point=None): + device = volume.VolumeDevice(device_path) + device.mount(mount_point, write_to_fstab=False) + LOG.debug(_("Mounted the volume.")) + + def unmount_volume(self, context, device_path=None, mount_point=None): + device = volume.VolumeDevice(device_path) + device.unmount(mount_point) + LOG.debug(_("Unmounted the volume.")) + + def resize_fs(self, context, device_path=None, mount_point=None): + device = volume.VolumeDevice(device_path) + device.resize_fs(mount_point) + LOG.debug(_("Resized the filesystem")) + + def update_overrides(self, context, overrides, remove=False): + raise exception.TroveError(ERROR_MSG) + + def apply_overrides(self, context, overrides): + raise exception.TroveError(ERROR_MSG) diff --git a/trove/guestagent/datastore/cassandra/service.py b/trove/guestagent/datastore/cassandra/service.py new file mode 100644 index 0000000000..e910a74f8c --- /dev/null +++ b/trove/guestagent/datastore/cassandra/service.py @@ -0,0 +1,205 @@ +# Copyright 2013 Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import yaml +from trove.common import cfg +from trove.common import utils +from trove.common import exception +from trove.common import instance as rd_instance +from trove.guestagent.common import operating_system +from trove.guestagent.datastore.cassandra import system +from trove.guestagent.datastore import service +from trove.guestagent import pkg +from trove.openstack.common import log as logging +from trove.openstack.common.gettextutils import _ + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + +packager = pkg.Package() + + +class CassandraApp(object): + """Prepares DBaaS on a Guest container.""" + + def __init__(self, status): + """By default login with root no password for initial setup. """ + self.state_change_wait_time = CONF.state_change_wait_time + self.status = status + + def install_if_needed(self, packages): + """Prepare the guest machine with a cassandra server installation""" + LOG.info(_("Preparing Guest as Cassandra Server")) + if not packager.pkg_is_installed(packages): + self._install_db(packages) + LOG.info(_("Dbaas install_if_needed complete")) + + def complete_install_or_restart(self): + self.status.end_install_or_restart() + + def _enable_db_on_boot(self): + utils.execute_with_timeout(system.ENABLE_CASSANDRA_ON_BOOT, + shell=True) + + def _disable_db_on_boot(self): + utils.execute_with_timeout(system.DISABLE_CASSANDRA_ON_BOOT, + shell=True) + + def init_storage_structure(self): + utils.execute_with_timeout(system.INIT_FS, shell=True) + + def start_db(self, update_db=False): + self._enable_db_on_boot() + try: + utils.execute_with_timeout(system.START_CASSANDRA, + shell=True) + except exception.ProcessExecutionError: + pass + + if not (self.status. + wait_for_real_status_to_change_to( + rd_instance.ServiceStatuses.RUNNING, + self.state_change_wait_time, + update_db)): + try: + utils.execute_with_timeout(system.CASSANDRA_KILL, + shell=True) + except exception.ProcessExecutionError as p: + LOG.error(_("Error killing stalled Cassandra start command.")) + LOG.error(p) + self.status.end_install_or_restart() + raise RuntimeError(_("Could not start Cassandra")) + + def stop_db(self, update_db=False, do_not_start_on_reboot=False): + if do_not_start_on_reboot: + self._disable_db_on_boot() + utils.execute_with_timeout(system.STOP_CASSANDRA, + shell=True) + + if not (self.status.wait_for_real_status_to_change_to( + rd_instance.ServiceStatuses.SHUTDOWN, + self.state_change_wait_time, update_db)): + LOG.error(_("Could not stop Cassandra")) + self.status.end_install_or_restart() + raise RuntimeError(_("Could not stop Cassandra")) + + def restart(self): + try: + self.status.begin_restart() + LOG.info(_("Restarting DB")) + self.stop_db() + self.start_db() + finally: + self.status.end_install_or_restart() + + def _install_db(self, packages): + """Install cassandra server""" + LOG.debug(_("Installing cassandra server")) + packager.pkg_install(packages, None, system.TIME_OUT) + LOG.debug(_("Finished installing cassandra server")) + + def write_config(self, config_contents): + LOG.info(_('Defining temp config holder at ' + '%s') % system.CASSANDRA_TEMP_CONF) + with open(system.CASSANDRA_TEMP_CONF, 'w+') as conf: + conf.write(config_contents) + LOG.info(_('Writing new config')) + utils.execute_with_timeout("sudo", "mv", + system.CASSANDRA_TEMP_CONF, + system.CASSANDRA_CONF) + LOG.info(_('Overriding old config')) + + def read_conf(self): + """Returns cassandra.yaml in dict structure""" + + LOG.info(_("Opening cassandra.yaml")) + with open(system.CASSANDRA_CONF, 'r') as config: + LOG.info(_("Preparing YAML object from cassandra.yaml")) + yamled = yaml.load(config.read()) + return yamled + + def update_config_with_single(self, key, value): + """Updates single key:value in cassandra.yaml""" + + yamled = self.read_conf() + yamled.update({key: value}) + LOG.info(_("Updating cassandra.yaml with %(key)s: %(value)s") + % {'key': key, 'value': value}) + dump = yaml.dump(yamled, default_flow_style=False) + LOG.info(_("Dumping YAML to stream")) + self.write_config(dump) + + def update_conf_with_group(self, group): + """Updates group of key:value in cassandra.yaml""" + + yamled = self.read_conf() + for key, value in group.iteritems(): + if key == 'seed': + (yamled.get('seed_provider')[0]. + get('parameters')[0]. + update({'seeds': value})) + else: + yamled.update({key: value}) + LOG.info(_("Updating cassandra.yaml with %(key)s: %(value)s") + % {'key': key, 'value': value}) + dump = yaml.dump(yamled, default_flow_style=False) + LOG.info(_("Dumping YAML to stream")) + self.write_config(dump) + + def make_host_reachable(self): + updates = { + 'rpc_address': "0.0.0.0", + 'listen_address': operating_system.get_ip_address(), + 'seed': operating_system.get_ip_address() + } + self.update_conf_with_group(updates) + + def start_db_with_conf_changes(self, config_contents): + LOG.info(_("Starting cassandra with conf changes...")) + LOG.info(_("inside the guest - cassandra is running %s...") + % self.status.is_running) + if self.status.is_running: + LOG.error(_("Cannot execute start_db_with_conf_changes because " + "cassandra state == %s!") % self.status) + raise RuntimeError("Cassandra not stopped.") + LOG.info(_("Initiating config.")) + self.write_config(config_contents) + self.start_db(True) + + def reset_configuration(self, configuration): + config_contents = configuration['config_contents'] + LOG.info(_("Resetting configuration")) + self.write_config(config_contents) + + +class CassandraAppStatus(service.BaseDbStatus): + + def _get_actual_db_status(self): + try: + # If status check would be successful, + # bot stdin and stdout would contain nothing + out, err = utils.execute_with_timeout(system.CASSANDRA_STATUS, + shell=True) + if "Connection error. Could not connect to" not in err: + return rd_instance.ServiceStatuses.RUNNING + else: + return rd_instance.ServiceStatuses.SHUTDOWN + except exception.ProcessExecutionError as e: + LOG.error(_("Process execution %s") % e) + return rd_instance.ServiceStatuses.SHUTDOWN + except OSError as e: + LOG.error(_("OS Error %s") % e) + return rd_instance.ServiceStatuses.SHUTDOWN diff --git a/trove/guestagent/datastore/cassandra/system.py b/trove/guestagent/datastore/cassandra/system.py new file mode 100644 index 0000000000..3783f9c89e --- /dev/null +++ b/trove/guestagent/datastore/cassandra/system.py @@ -0,0 +1,41 @@ +# Copyright 2013 Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from trove.common import cfg +from trove.openstack.common import log as logging + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + +CASSANDRA_DATA_DIR = "/var/lib/cassandra/data" +CASSANDRA_MOUNT_POINT = "/var/lib/cassandra" +CASSANDRA_CONF = "/etc/cassandra/cassandra.yaml" +CASSANDRA_TEMP_CONF = "/tmp/cassandra.yaml" +CASSANDRA_TEMP_DIR = "/tmp/cassandra" + +INIT_FS = "sudo mkdir -p %s" % CASSANDRA_MOUNT_POINT + +ENABLE_CASSANDRA_ON_BOOT = "sudo update-rc.d cassandra enable" +DISABLE_CASSANDRA_ON_BOOT = "sudo update-rc.d cassandra disable" + +# cassandra binary stored at /usr/sbin/ +START_CASSANDRA = "sudo /usr/sbin/cassandra" +STOP_CASSANDRA = "sudo killall java 2> /dev/null || true" + +CASSANDRA_STATUS = """echo "use system;" > /tmp/check; cqlsh -f /tmp/check""" + +CASSANDRA_KILL = "sudo killall java || true" + +TIME_OUT = 10000 diff --git a/trove/guestagent/dbaas.py b/trove/guestagent/dbaas.py index 0cecaa52f4..646c0155f3 100644 --- a/trove/guestagent/dbaas.py +++ b/trove/guestagent/dbaas.py @@ -37,6 +37,7 @@ defaults = { 'mysql': 'trove.guestagent.datastore.mysql.manager.Manager', 'percona': 'trove.guestagent.datastore.mysql.manager.Manager', 'redis': 'trove.guestagent.datastore.redis.manager.Manager', + 'cassandra': 'trove.guestagent.datastore.cassandra.manager.Manager', } CONF = cfg.CONF diff --git a/trove/guestagent/pkg.py b/trove/guestagent/pkg.py index e5bec3171e..d676ea847a 100644 --- a/trove/guestagent/pkg.py +++ b/trove/guestagent/pkg.py @@ -347,7 +347,8 @@ class DebianPackagerMixin(BasePackagerMixin): # even after successful install, packages can stay unconfigured # config_opts - is dict with name/value for questions asked by # interactive configure script - self._fix_package_selections(packages, config_opts) + if config_opts: + self._fix_package_selections(packages, config_opts) def pkg_version(self, package_name): p = commands.getstatusoutput("apt-cache policy %s" % package_name) diff --git a/trove/templates/cassandra/config.template b/trove/templates/cassandra/config.template new file mode 100644 index 0000000000..48a87f8b0d --- /dev/null +++ b/trove/templates/cassandra/config.template @@ -0,0 +1,77 @@ +cluster_name: 'Test Cluster' +num_tokens: 256 +hinted_handoff_enabled: true +max_hint_window_in_ms: 10800000 # 3 hours +hinted_handoff_throttle_in_kb: 1024 +max_hints_delivery_threads: 2 +authenticator: AllowAllAuthenticator +authorizer: AllowAllAuthorizer +permissions_validity_in_ms: 2000 +partitioner: org.apache.cassandra.dht.Murmur3Partitioner +data_file_directories: + - /var/lib/cassandra/data +commitlog_directory: /var/lib/cassandra/commitlog +disk_failure_policy: stop +key_cache_size_in_mb: +key_cache_save_period: 14400 +row_cache_size_in_mb: 0 +row_cache_save_period: 0 +saved_caches_directory: /var/lib/cassandra/saved_caches +commitlog_sync: periodic +commitlog_sync_period_in_ms: 10000 +commitlog_segment_size_in_mb: 32 +seed_provider: + - class_name: org.apache.cassandra.locator.SimpleSeedProvider + parameters: + - seeds: "127.0.0.1" +concurrent_reads: 32 +concurrent_writes: 32 +memtable_flush_queue_size: 4 +trickle_fsync: false +trickle_fsync_interval_in_kb: 10240 +storage_port: 7000 +ssl_storage_port: 7001 +listen_address: localhost +start_native_transport: true +native_transport_port: 9042 +start_rpc: true +rpc_address: localhost +rpc_port: 9160 +rpc_keepalive: true +rpc_server_type: sync +thrift_framed_transport_size_in_mb: 15 +incremental_backups: false +snapshot_before_compaction: false +auto_snapshot: true +tombstone_warn_threshold: 1000 +tombstone_failure_threshold: 100000 +column_index_size_in_kb: 64 +in_memory_compaction_limit_in_mb: 64 +multithreaded_compaction: false +compaction_throughput_mb_per_sec: 16 +compaction_preheat_key_cache: true +read_request_timeout_in_ms: 5000 +range_request_timeout_in_ms: 10000 +write_request_timeout_in_ms: 2000 +cas_contention_timeout_in_ms: 1000 +truncate_request_timeout_in_ms: 60000 +request_timeout_in_ms: 10000 +cross_node_timeout: false +endpoint_snitch: SimpleSnitch +dynamic_snitch_update_interval_in_ms: 100 +dynamic_snitch_reset_interval_in_ms: 600000 +dynamic_snitch_badness_threshold: 0.1 +request_scheduler: org.apache.cassandra.scheduler.NoScheduler +server_encryption_options: + internode_encryption: none + keystore: conf/.keystore + keystore_password: cassandra + truststore: conf/.truststore + truststore_password: cassandra +client_encryption_options: + enabled: false + keystore: conf/.keystore + keystore_password: cassandra +internode_compression: all +inter_dc_tcp_nodelay: false +preheat_kernel_page_cache: false \ No newline at end of file diff --git a/trove/templates/cassandra/heat.template b/trove/templates/cassandra/heat.template new file mode 100644 index 0000000000..1e5694a213 --- /dev/null +++ b/trove/templates/cassandra/heat.template @@ -0,0 +1,79 @@ +HeatTemplateFormatVersion: '2012-12-12' +Description: Instance creation template for cassandra +Parameters: + Flavor: + Type: String + VolumeSize: + Type: Number + Default : '1' + InstanceId: + Type: String + ImageId: + Type: String + DatastoreManager: + Type: String + AvailabilityZone: + Type: String + Default: nova + TenantId: + Type: String +Resources: + BaseInstance: + Type: AWS::EC2::Instance + Metadata: + AWS::CloudFormation::Init: + config: + files: + /etc/guest_info: + content: + Fn::Join: + - '' + - ["[DEFAULT]\nguest_id=", {Ref: InstanceId}, + "\ndatastore_manager=", {Ref: DatastoreManager}, + "\ntenant_id=", {Ref: TenantId}] + mode: '000644' + owner: root + group: root + Properties: + ImageId: {Ref: ImageId} + InstanceType: {Ref: Flavor} + AvailabilityZone: {Ref: AvailabilityZone} + SecurityGroups : [{Ref: CassandraDbaasSG}] + UserData: + Fn::Base64: + Fn::Join: + - '' + - ["#!/bin/bash -v\n", + "/opt/aws/bin/cfn-init\n", + "sudo service trove-guest start\n"] +{% if volume_support %} + DataVolume: + Type: AWS::EC2::Volume + Properties: + Size: {Ref: VolumeSize} + AvailabilityZone: {Ref: AvailabilityZone} + Tags: + - {Key: Usage, Value: Test} + MountPoint: + Type: AWS::EC2::VolumeAttachment + Properties: + InstanceId: {Ref: BaseInstance} + VolumeId: {Ref: DataVolume} + Device: /dev/vdb +{% endif %} + CassandraDbaasSG: + Type: AWS::EC2::SecurityGroup + Properties: + GroupDescription: Default Security group for Cassandra + SecurityGroupIngress: + - IpProtocol: "tcp" + FromPort: "9160" + ToPort: "9160" + CidrIp: "0.0.0.0/0" + DatabaseIPAddress: + Type: AWS::EC2::EIP + DatabaseIPAssoc : + Type: AWS::EC2::EIPAssociation + Properties: + InstanceId: {Ref: BaseInstance} + EIP: {Ref: DatabaseIPAddress} diff --git a/trove/templates/cassandra/override.config.template b/trove/templates/cassandra/override.config.template new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/trove/templates/cassandra/override.config.template @@ -0,0 +1 @@ + diff --git a/trove/tests/unittests/common/test_template.py b/trove/tests/unittests/common/test_template.py index 39f697e330..a13c2d7124 100644 --- a/trove/tests/unittests/common/test_template.py +++ b/trove/tests/unittests/common/test_template.py @@ -85,5 +85,7 @@ class HeatTemplateLoadTest(testtools.TestCase): 'mysql-blah') def test_heat_template_load_success(self): - htmpl = template.load_heat_template('mysql') - self.assertNotEqual(None, htmpl) + mysql_tmpl = template.load_heat_template('mysql') + cassandra_tmpl = template.load_heat_template('cassandra') + self.assertIsNotNone(mysql_tmpl) + self.assertIsNotNone(cassandra_tmpl) diff --git a/trove/tests/unittests/guestagent/test_dbaas.py b/trove/tests/unittests/guestagent/test_dbaas.py index 5b8269c21e..86879b1505 100644 --- a/trove/tests/unittests/guestagent/test_dbaas.py +++ b/trove/tests/unittests/guestagent/test_dbaas.py @@ -31,6 +31,7 @@ import testtools from testtools.matchers import Is from testtools.matchers import Equals from testtools.matchers import Not +from trove.common.exception import ProcessExecutionError from trove.common import utils from trove.common import instance as rd_instance from trove.conductor import api as conductor_api @@ -43,6 +44,7 @@ from trove.guestagent.datastore.service import BaseDbStatus from trove.guestagent.datastore.redis import service as rservice from trove.guestagent.datastore.redis.service import RedisApp from trove.guestagent.datastore.redis import system as RedisSystem +from trove.guestagent.datastore.cassandra import service as cass_service from trove.guestagent.datastore.mysql.service import MySqlAdmin from trove.guestagent.datastore.mysql.service import MySqlRootAccess from trove.guestagent.datastore.mysql.service import MySqlApp @@ -134,7 +136,6 @@ class DbaasTest(testtools.TestCase): def test_load_mysqld_options_error(self): - from trove.common.exception import ProcessExecutionError dbaas.utils.execute = Mock(side_effect=ProcessExecutionError()) self.assertFalse(dbaas.load_mysqld_options()) @@ -543,7 +544,9 @@ class MySqlAppTest(testtools.TestCase): self.mySqlApp.stop_db(True) self.assertTrue(conductor_api.API.heartbeat.called_once_with( - self.FAKE_ID, {'service_status': 'shutdown'})) + self.FAKE_ID, + {'service_status': + rd_instance.ServiceStatuses.SHUTDOWN.description})) def test_stop_mysql_error(self): @@ -564,7 +567,9 @@ class MySqlAppTest(testtools.TestCase): self.assertTrue(self.mySqlApp.stop_db.called) self.assertTrue(self.mySqlApp.start_mysql.called) self.assertTrue(conductor_api.API.heartbeat.called_once_with( - self.FAKE_ID, {'service_status': 'running'})) + self.FAKE_ID, + {'service_status': + rd_instance.ServiceStatuses.RUNNING.description})) def test_restart_mysql_wont_start_up(self): @@ -581,7 +586,6 @@ class MySqlAppTest(testtools.TestCase): def test_wipe_ib_logfiles_no_file(self): - from trove.common.exception import ProcessExecutionError processexecerror = ProcessExecutionError('No such file or directory') dbaas.utils.execute_with_timeout = Mock(side_effect=processexecerror) @@ -589,7 +593,6 @@ class MySqlAppTest(testtools.TestCase): def test_wipe_ib_logfiles_error(self): - from trove.common.exception import ProcessExecutionError mocked = Mock(side_effect=ProcessExecutionError('Error')) dbaas.utils.execute_with_timeout = mocked @@ -612,7 +615,9 @@ class MySqlAppTest(testtools.TestCase): self.mySqlApp.start_mysql(update_db=True) self.assertTrue(conductor_api.API.heartbeat.called_once_with( - self.FAKE_ID, {'service_status': 'running'})) + self.FAKE_ID, + {'service_status': + rd_instance.ServiceStatuses.RUNNING.description})) def test_start_mysql_runs_forever(self): @@ -623,12 +628,13 @@ class MySqlAppTest(testtools.TestCase): self.assertRaises(RuntimeError, self.mySqlApp.start_mysql) self.assertTrue(conductor_api.API.heartbeat.called_once_with( - self.FAKE_ID, {'service_status': 'shutdown'})) + self.FAKE_ID, + {'service_status': + rd_instance.ServiceStatuses.SHUTDOWN.description})) def test_start_mysql_error(self): self.mySqlApp._enable_mysql_on_boot = Mock() - from trove.common.exception import ProcessExecutionError mocked = Mock(side_effect=ProcessExecutionError('Error')) dbaas.utils.execute_with_timeout = mocked @@ -924,7 +930,6 @@ class ServiceRegistryTest(testtools.TestCase): dbaas_sr.get_custom_managers = Mock(return_value= datastore_registry_ext_test) test_dict = dbaas_sr.datastore_registry() - self.assertEqual(4, len(test_dict)) self.assertEqual(test_dict.get('test'), datastore_registry_ext_test.get('test', None)) self.assertEqual(test_dict.get('mysql'), @@ -936,6 +941,9 @@ class ServiceRegistryTest(testtools.TestCase): self.assertEqual(test_dict.get('redis'), 'trove.guestagent.datastore.redis.' 'manager.Manager') + self.assertEqual(test_dict.get('cassandra'), + 'trove.guestagent.datastore.cassandra.' + 'manager.Manager') def test_datastore_registry_with_existing_manager(self): datastore_registry_ext_test = { @@ -945,7 +953,6 @@ class ServiceRegistryTest(testtools.TestCase): dbaas_sr.get_custom_managers = Mock(return_value= datastore_registry_ext_test) test_dict = dbaas_sr.datastore_registry() - self.assertEqual(3, len(test_dict)) self.assertEqual(test_dict.get('mysql'), 'trove.guestagent.datastore.mysql.' 'manager.Manager123') @@ -954,13 +961,15 @@ class ServiceRegistryTest(testtools.TestCase): 'manager.Manager') self.assertEqual(test_dict.get('redis'), 'trove.guestagent.datastore.redis.manager.Manager') + self.assertEqual(test_dict.get('cassandra'), + 'trove.guestagent.datastore.cassandra.' + 'manager.Manager') def test_datastore_registry_with_blank_dict(self): datastore_registry_ext_test = dict() dbaas_sr.get_custom_managers = Mock(return_value= datastore_registry_ext_test) test_dict = dbaas_sr.datastore_registry() - self.assertEqual(3, len(test_dict)) self.assertEqual(test_dict.get('mysql'), 'trove.guestagent.datastore.mysql.' 'manager.Manager') @@ -969,6 +978,9 @@ class ServiceRegistryTest(testtools.TestCase): 'manager.Manager') self.assertEqual(test_dict.get('redis'), 'trove.guestagent.datastore.redis.manager.Manager') + self.assertEqual(test_dict.get('cassandra'), + 'trove.guestagent.datastore.cassandra.' + 'manager.Manager') class KeepAliveConnectionTest(testtools.TestCase): @@ -1171,7 +1183,6 @@ class MySqlAppStatusTest(testtools.TestCase): def test_get_actual_db_status_error_shutdown(self): - from trove.common.exception import ProcessExecutionError mocked = Mock(side_effect=ProcessExecutionError()) dbaas.utils.execute_with_timeout = mocked dbaas.load_mysqld_options = Mock() @@ -1184,7 +1195,6 @@ class MySqlAppStatusTest(testtools.TestCase): def test_get_actual_db_status_error_crashed(self): - from trove.common.exception import ProcessExecutionError dbaas.utils.execute_with_timeout = MagicMock( side_effect=[ProcessExecutionError(), ("some output", None)]) dbaas.load_mysqld_options = Mock() @@ -1377,3 +1387,143 @@ class TestRedisApp(testtools.TestCase): run_as_root=True, root_helper='sudo') verify(mock_status).end_install_or_restart() + + +class CassandraDBAppTest(testtools.TestCase): + + def setUp(self): + super(CassandraDBAppTest, self).setUp() + self.utils_execute_with_timeout = (cass_service . + utils.execute_with_timeout) + self.sleep = time.sleep + self.pkg_version = cass_service.packager.pkg_version + self.pkg = cass_service.packager + util.init_db() + self.FAKE_ID = str(uuid4()) + InstanceServiceStatus.create(instance_id=self.FAKE_ID, + status=rd_instance.ServiceStatuses.NEW) + self.appStatus = FakeAppStatus(self.FAKE_ID, + rd_instance.ServiceStatuses.NEW) + self.cassandra = cass_service.CassandraApp(self.appStatus) + + def tearDown(self): + + super(CassandraDBAppTest, self).tearDown() + cass_service.utils.execute_with_timeout = (self. + utils_execute_with_timeout) + time.sleep = self.sleep + cass_service.packager.pkg_version = self.pkg_version + cass_service.packager = self.pkg + InstanceServiceStatus.find_by(instance_id=self.FAKE_ID).delete() + + def assert_reported_status(self, expected_status): + service_status = InstanceServiceStatus.find_by( + instance_id=self.FAKE_ID) + self.assertEqual(expected_status, service_status.status) + + def test_stop_db(self): + + cass_service.utils.execute_with_timeout = Mock() + self.appStatus.set_next_status( + rd_instance.ServiceStatuses.SHUTDOWN) + + self.cassandra.stop_db() + self.assert_reported_status(rd_instance.ServiceStatuses.NEW) + + def test_stop_db_with_db_update(self): + + cass_service.utils.execute_with_timeout = Mock() + self.appStatus.set_next_status( + rd_instance.ServiceStatuses.SHUTDOWN) + + self.cassandra.stop_db(True) + self.assertTrue(conductor_api.API.heartbeat.called_once_with( + self.FAKE_ID, + {'service_status': + rd_instance.ServiceStatuses.SHUTDOWN.description})) + + def test_stop_db_error(self): + + cass_service.utils.execute_with_timeout = Mock() + self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING) + self.cassandra.state_change_wait_time = 1 + self.assertRaises(RuntimeError, self.cassandra.stop_db) + + def test_restart(self): + + self.cassandra.stop_db = Mock() + self.cassandra.start_db = Mock() + self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING) + + self.cassandra.restart() + + self.assertTrue(conductor_api.API.heartbeat.called_once_with( + self.FAKE_ID, + {'service_status': + rd_instance.ServiceStatuses.RUNNING.description})) + self.assert_reported_status(rd_instance.ServiceStatuses.NEW) + + def test_start_cassandra(self): + + cass_service.utils.execute_with_timeout = Mock() + self.appStatus.set_next_status(rd_instance.ServiceStatuses.RUNNING) + + self.cassandra.start_db() + self.assert_reported_status(rd_instance.ServiceStatuses.NEW) + + def test_start_cassandra_runs_forever(self): + + cass_service.utils.execute_with_timeout = Mock() + (self.cassandra.status. + wait_for_real_status_to_change_to) = Mock(return_value=False) + self.appStatus.set_next_status(rd_instance.ServiceStatuses.SHUTDOWN) + + self.assertRaises(RuntimeError, self.cassandra.stop_db) + self.assertTrue(conductor_api.API.heartbeat.called_once_with( + self.FAKE_ID, + {'service_status': + rd_instance.ServiceStatuses.SHUTDOWN.description})) + + def test_start_db_with_db_update(self): + + cass_service.utils.execute_with_timeout = Mock() + self.appStatus.set_next_status( + rd_instance.ServiceStatuses.RUNNING) + + self.cassandra.start_db(True) + self.assertTrue(conductor_api.API.heartbeat.called_once_with( + self.FAKE_ID, + {'service_status': + rd_instance.ServiceStatuses.RUNNING.description})) + self.assert_reported_status(rd_instance.ServiceStatuses.NEW) + + def test_start_cassandra_error(self): + self.cassandra._enable_db_on_boot = Mock() + self.cassandra.state_change_wait_time = 1 + cass_service.utils.execute_with_timeout = Mock( + side_effect=ProcessExecutionError('Error')) + + self.assertRaises(RuntimeError, self.cassandra.start_db) + + def test_install(self): + + self.cassandra._install_db = Mock() + self.pkg.pkg_is_installed = Mock(return_value=False) + self.cassandra.install_if_needed(['cassandra']) + self.assertTrue(self.cassandra._install_db.called) + self.assert_reported_status(rd_instance.ServiceStatuses.NEW) + + def test_install_install_error(self): + + from trove.guestagent import pkg + self.cassandra.start_db = Mock() + self.cassandra.stop_db = Mock() + self.pkg.pkg_is_installed = Mock(return_value=False) + self.cassandra._install_db = Mock( + side_effect=pkg.PkgPackageStateError("Install error")) + + self.assertRaises(pkg.PkgPackageStateError, + self.cassandra.install_if_needed, + ['cassandra=1.2.10']) + + self.assert_reported_status(rd_instance.ServiceStatuses.NEW) diff --git a/trove/tests/unittests/guestagent/test_manager.py b/trove/tests/unittests/guestagent/test_manager.py index 10db558d9e..3c97dc4d3d 100644 --- a/trove/tests/unittests/guestagent/test_manager.py +++ b/trove/tests/unittests/guestagent/test_manager.py @@ -15,11 +15,15 @@ import os import testtools +from mock import Mock from mockito import verify, when, unstub, any, mock, never from testtools.matchers import Is, Equals, Not - from trove.common.context import TroveContext +from trove.common.instance import ServiceStatuses from trove.guestagent import volume +from trove.guestagent.common import operating_system +from trove.guestagent.datastore.cassandra import service as cass_service +from trove.guestagent.datastore.cassandra import manager as cass_manager from trove.guestagent.datastore.mysql.manager import Manager import trove.guestagent.datastore.mysql.service as dbaas from trove.guestagent.datastore.redis.manager import Manager as RedisManager @@ -31,7 +35,6 @@ from trove.guestagent import pkg class GuestAgentManagerTest(testtools.TestCase): - def setUp(self): super(GuestAgentManagerTest, self).setUp() self.context = TroveContext() @@ -165,7 +168,6 @@ class GuestAgentManagerTest(testtools.TestCase): def _prepare_dynamic(self, device_path='/dev/vdb', is_mysql_installed=True, backup_id=None, is_root_enabled=False, overrides=None): - # covering all outcomes is starting to cause trouble here COUNT = 1 if device_path else 0 backup_info = None @@ -233,10 +235,6 @@ class RedisGuestAgentManagerTest(testtools.TestCase): self.manager = RedisManager() self.packages = 'redis-server' self.origin_RedisAppStatus = redis_service.RedisAppStatus - self.origin_os_path_exists = os.path.exists - self.origin_format = volume.VolumeDevice.format - self.origin_migrate_data = volume.VolumeDevice.migrate_data - self.origin_mount = volume.VolumeDevice.mount self.origin_stop_redis = redis_service.RedisApp.stop_db self.origin_start_redis = redis_service.RedisApp.start_redis self.origin_install_redis = redis_service.RedisApp._install_redis @@ -244,10 +242,6 @@ class RedisGuestAgentManagerTest(testtools.TestCase): def tearDown(self): super(RedisGuestAgentManagerTest, self).tearDown() redis_service.RedisAppStatus = self.origin_RedisAppStatus - os.path.exists = self.origin_os_path_exists - volume.VolumeDevice.format = self.origin_format - volume.VolumeDevice.migrate_data = self.origin_migrate_data - volume.VolumeDevice.mount = self.origin_mount redis_service.RedisApp.stop_db = self.origin_stop_redis redis_service.RedisApp.start_redis = self.origin_start_redis redis_service.RedisApp._install_redis = self.origin_install_redis @@ -314,3 +308,115 @@ class RedisGuestAgentManagerTest(testtools.TestCase): self.manager.stop_db(self.context) verify(redis_service.RedisAppStatus).get() verify(redis_service.RedisApp).stop_db(do_not_start_on_reboot=False) + + +class GuestAgentCassandraDBManagerTest(testtools.TestCase): + def setUp(self): + super(GuestAgentCassandraDBManagerTest, self).setUp() + self.real_status = cass_service.CassandraAppStatus.set_status + + class FakeInstanceServiceStatus(object): + status = ServiceStatuses.NEW + + def save(self): + pass + + cass_service.CassandraAppStatus.set_status = Mock( + return_value=FakeInstanceServiceStatus()) + self.context = TroveContext() + self.manager = cass_manager.Manager() + self.pkg = cass_service.packager + self.real_db_app_status = cass_service.CassandraAppStatus + self.origin_os_path_exists = os.path.exists + self.origin_format = volume.VolumeDevice.format + self.origin_migrate_data = volume.VolumeDevice.migrate_data + self.origin_mount = volume.VolumeDevice.mount + self.origin_stop_db = cass_service.CassandraApp.stop_db + self.origin_start_db = cass_service.CassandraApp.start_db + self.origin_install_db = cass_service.CassandraApp._install_db + self.original_get_ip = operating_system.get_ip_address + self.orig_make_host_reachable = ( + cass_service.CassandraApp.make_host_reachable) + + def tearDown(self): + super(GuestAgentCassandraDBManagerTest, self).tearDown() + cass_service.packager = self.pkg + cass_service.CassandraAppStatus.set_status = self.real_db_app_status + os.path.exists = self.origin_os_path_exists + volume.VolumeDevice.format = self.origin_format + volume.VolumeDevice.migrate_data = self.origin_migrate_data + volume.VolumeDevice.mount = self.origin_mount + cass_service.CassandraApp.stop_db = self.origin_stop_db + cass_service.CassandraApp.start_db = self.origin_start_db + cass_service.CassandraApp._install_db = self.origin_install_db + operating_system.get_ip_address = self.original_get_ip + cass_service.CassandraApp.make_host_reachable = ( + self.orig_make_host_reachable) + + def test_update_status(self): + mock_status = mock() + self.manager.appStatus = mock_status + self.manager.update_status(self.context) + verify(mock_status).update() + + def test_prepare_pkg(self): + self._prepare_dynamic(['cassandra']) + + def test_prepare_no_pkg(self): + self._prepare_dynamic([]) + + def test_prepare_db_not_installed(self): + self._prepare_dynamic([], is_db_installed=False) + + def test_prepare_db_not_installed_no_package(self): + self._prepare_dynamic([], + is_db_installed=True) + + def _prepare_dynamic(self, packages, + config_content=any(), device_path='/dev/vdb', + is_db_installed=True, backup_id=None, + is_root_enabled=False, + overrides=None): + # covering all outcomes is starting to cause trouble here + if not backup_id: + backup_info = {'id': backup_id, + 'location': 'fake-location', + 'type': 'InnoBackupEx', + 'checksum': 'fake-checksum', + } + + mock_status = mock() + self.manager.appStatus = mock_status + when(mock_status).begin_install().thenReturn(None) + + mock_app = mock() + self.manager.app = mock_app + + when(mock_app).install_if_needed(packages).thenReturn(None) + (when(pkg.Package).pkg_is_installed(any()). + thenReturn(is_db_installed)) + when(mock_app).init_storage_structure().thenReturn(None) + when(mock_app).write_config(config_content).thenReturn(None) + when(mock_app).make_host_reachable().thenReturn(None) + when(mock_app).restart().thenReturn(None) + when(os.path).exists(any()).thenReturn(True) + + when(volume.VolumeDevice).format().thenReturn(None) + when(volume.VolumeDevice).migrate_data(any()).thenReturn(None) + when(volume.VolumeDevice).mount().thenReturn(None) + + # invocation + self.manager.prepare(context=self.context, packages=packages, + config_contents=config_content, + databases=None, + memory_mb='2048', users=None, + device_path=device_path, + mount_point="/var/lib/cassandra", + backup_info=backup_info, + overrides=None) + # verification/assertion + verify(mock_status).begin_install() + verify(mock_app).install_if_needed(packages) + verify(mock_app).init_storage_structure() + verify(mock_app).make_host_reachable() + verify(mock_app).restart()