Merge pull request #64 from TimSimpsonR/api-actions4
Implemented all of restart, API code for resize.
This commit is contained in:
commit
3b2d3ac2d2
@ -74,9 +74,16 @@ class GuestError(ReddwarfError):
|
||||
|
||||
class BadRequest(ReddwarfError):
|
||||
|
||||
message = _("The server could not comply with the request since it is "
|
||||
"either malformed or otherwise incorrect.")
|
||||
|
||||
|
||||
class MissingKey(BadRequest):
|
||||
|
||||
message = _("Required element/key - %(key)s was not specified")
|
||||
|
||||
|
||||
class UnprocessableEntity(ReddwarfError):
|
||||
|
||||
message = _("Unable to process the contained request")
|
||||
|
||||
|
@ -17,13 +17,14 @@
|
||||
|
||||
from reddwarf.common import config
|
||||
from novaclient.v1_1.client import Client
|
||||
from reddwarf.guestagent.api import API
|
||||
|
||||
|
||||
|
||||
CONFIG = config.Config
|
||||
|
||||
|
||||
def create_guest_client(context, id):
|
||||
from reddwarf.guestagent.api import API
|
||||
return API(context, id)
|
||||
|
||||
|
||||
|
@ -20,6 +20,7 @@ import datetime
|
||||
import inspect
|
||||
import logging
|
||||
import re
|
||||
import signal
|
||||
import sys
|
||||
import uuid
|
||||
|
||||
@ -27,9 +28,10 @@ from eventlet import event
|
||||
from eventlet import greenthread
|
||||
from eventlet import semaphore
|
||||
from eventlet.green import subprocess
|
||||
from eventlet.timeout import Timeout
|
||||
|
||||
from reddwarf.openstack.common import utils as openstack_utils
|
||||
|
||||
from reddwarf.common import exception
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
import_class = openstack_utils.import_class
|
||||
@ -188,3 +190,38 @@ class LoopingCall(object):
|
||||
|
||||
def wait(self):
|
||||
return self.done.wait()
|
||||
|
||||
|
||||
# Copied from nova.api.openstack.common in the old code.
|
||||
def get_id_from_href(href):
|
||||
"""Return the id or uuid portion of a url.
|
||||
|
||||
Given: 'http://www.foo.com/bar/123?q=4'
|
||||
Returns: '123'
|
||||
|
||||
Given: 'http://www.foo.com/bar/abc123?q=4'
|
||||
Returns: 'abc123'
|
||||
|
||||
"""
|
||||
return urlparse.urlsplit("%s" % href).path.split('/')[-1]
|
||||
|
||||
|
||||
def execute_with_timeout(*args, **kwargs):
|
||||
time = kwargs.get('timeout', 30)
|
||||
def cb_timeout():
|
||||
raise exception.ProcessExecutionError("Time out after waiting "
|
||||
+ str(time) + " seconds when running proc: " + str(args)
|
||||
+ str(kwargs))
|
||||
|
||||
timeout = Timeout(time)
|
||||
try:
|
||||
return execute(*args, **kwargs)
|
||||
except Timeout as t:
|
||||
if t is not timeout:
|
||||
raise
|
||||
else:
|
||||
raise exception.ProcessExecutionError("Time out after waiting "
|
||||
+ str(time) + " seconds when running proc: " + str(args)
|
||||
+ str(kwargs))
|
||||
finally:
|
||||
timeout.cancel()
|
||||
|
@ -88,12 +88,12 @@ class UserController(BaseController):
|
||||
raise exception.BadRequest("The request contains an empty body")
|
||||
|
||||
if not body.get('users', ''):
|
||||
raise exception.BadRequest(key='users')
|
||||
raise exception.MissingKey(key='users')
|
||||
for user in body.get('users'):
|
||||
if not user.get('name'):
|
||||
raise exception.BadRequest(key='name')
|
||||
raise exception.MissingKey(key='name')
|
||||
if not user.get('password'):
|
||||
raise exception.BadRequest(key='password')
|
||||
raise exception.MissingKey(key='password')
|
||||
|
||||
def index(self, req, tenant_id, instance_id):
|
||||
"""Return all users."""
|
||||
@ -134,10 +134,10 @@ class SchemaController(BaseController):
|
||||
if not body:
|
||||
raise exception.BadRequest("The request contains an empty body")
|
||||
if not body.get('databases', ''):
|
||||
raise exception.BadRequest(key='databases')
|
||||
raise exception.MissingKey(key='databases')
|
||||
for database in body.get('databases'):
|
||||
if not database.get('name', ''):
|
||||
raise exception.BadRequest(key='name')
|
||||
raise exception.MissingKey(key='name')
|
||||
|
||||
def index(self, req, tenant_id, instance_id):
|
||||
"""Return all schemas."""
|
||||
|
@ -38,6 +38,31 @@ class API(object):
|
||||
self.context = context
|
||||
self.id = id
|
||||
|
||||
def _call(self, method_name, **kwargs):
|
||||
try:
|
||||
return rpc.call(self.context, self._get_routing_key(),
|
||||
{"method": method_name, "args": kwargs})
|
||||
except Exception as e:
|
||||
LOG.error(e)
|
||||
raise exception.GuestError(original_message=str(e))
|
||||
|
||||
def _cast(self, method_name, **kwargs):
|
||||
try:
|
||||
rpc.cast(self.context, self._get_routing_key(),
|
||||
{"method": method_name,
|
||||
"args": kwargs})
|
||||
except Exception as e:
|
||||
LOG.error(e)
|
||||
raise exception.GuestError(original_message=str(e))
|
||||
|
||||
def _cast_with_consumer(self, method_name, **kwargs):
|
||||
try:
|
||||
rpc.cast_with_consumer(self.context, self._get_routing_key(),
|
||||
{"method": method_name, "args": kwargs})
|
||||
except Exception as e:
|
||||
LOG.error(e)
|
||||
raise exception.GuestError(original_message=str(e))
|
||||
|
||||
def _get_routing_key(self):
|
||||
"""Create the routing key based on the container id"""
|
||||
return "guestagent.%s" % self.id
|
||||
@ -45,122 +70,82 @@ class API(object):
|
||||
def create_user(self, users):
|
||||
"""Make an asynchronous call to create a new database user"""
|
||||
LOG.debug(_("Creating Users for Instance %s"), self.id)
|
||||
rpc.cast(self.context, self._get_routing_key(),
|
||||
{"method": "create_user",
|
||||
"args": {"users": users}
|
||||
})
|
||||
self._cast("create_user", users=users)
|
||||
|
||||
def list_users(self):
|
||||
"""Make an asynchronous call to list database users"""
|
||||
LOG.debug(_("Listing Users for Instance %s"), self.id)
|
||||
return rpc.call(self.context, self._get_routing_key(),
|
||||
{"method": "list_users"})
|
||||
return self._call("list_users")
|
||||
|
||||
def delete_user(self, user):
|
||||
"""Make an asynchronous call to delete an existing database user"""
|
||||
LOG.debug(_("Deleting user %s for Instance %s"),
|
||||
user, self.id)
|
||||
rpc.cast(self.context, self._get_routing_key(),
|
||||
{"method": "delete_user",
|
||||
"args": {"user": user}
|
||||
})
|
||||
LOG.debug(_("Deleting user %s for Instance %s"), user, self.id)
|
||||
return self._cast("delete_user", user=user)
|
||||
|
||||
def create_database(self, databases):
|
||||
"""Make an asynchronous call to create a new database
|
||||
within the specified container"""
|
||||
LOG.debug(_("Creating databases for Instance %s"), self.id)
|
||||
rpc.cast(self.context, self._get_routing_key(),
|
||||
{"method": "create_database",
|
||||
"args": {"databases": databases}
|
||||
})
|
||||
self._cast("create_database", databases=databases)
|
||||
|
||||
def list_databases(self):
|
||||
"""Make an asynchronous call to list database users"""
|
||||
LOG.debug(_("Listing Users for Instance %s"), self.id)
|
||||
return rpc.call(self.context, self._get_routing_key(),
|
||||
{"method": "list_databases"})
|
||||
"""Make an asynchronous call to list databases"""
|
||||
LOG.debug(_("Listing databases for Instance %s"), self.id)
|
||||
return self._call("list_databases")
|
||||
|
||||
def delete_database(self, database):
|
||||
"""Make an asynchronous call to delete an existing database
|
||||
within the specified container"""
|
||||
LOG.debug(_("Deleting database %s for Instance %s"),
|
||||
database, self.id)
|
||||
rpc.cast(self.context, self._get_routing_key(),
|
||||
{"method": "delete_database",
|
||||
"args": {"database": database}
|
||||
})
|
||||
LOG.debug(_("Deleting database %s for Instance %s"), database, self.id)
|
||||
self._cast("delete_database", database=database)
|
||||
|
||||
def enable_root(self):
|
||||
"""Make a synchronous call to enable the root user for
|
||||
access from anywhere"""
|
||||
LOG.debug(_("Enable root user for Instance %s"), self.id)
|
||||
return rpc.call(self.context, self._get_routing_key(),
|
||||
{"method": "enable_root"})
|
||||
return self._call("enable_root")
|
||||
|
||||
def disable_root(self):
|
||||
"""Make a synchronous call to disable the root user for
|
||||
access from anywhere"""
|
||||
LOG.debug(_("Disable root user for Instance %s"), self.id)
|
||||
return rpc.call(self.context, self._get_routing_key(),
|
||||
{"method": "disable_root"})
|
||||
return self._call("disable_root")
|
||||
|
||||
def is_root_enabled(self):
|
||||
"""Make a synchronous call to check if root access is
|
||||
available for the container"""
|
||||
LOG.debug(_("Check root access for Instance %s"), self.id)
|
||||
return rpc.call(self.context, self._get_routing_key(),
|
||||
{"method": "is_root_enabled"})
|
||||
return self._call("is_root_enabled")
|
||||
|
||||
def get_diagnostics(self):
|
||||
"""Make a synchronous call to get diagnostics for the container"""
|
||||
LOG.debug(_("Check diagnostics on Instance %s"), self.id)
|
||||
return rpc.call(self.context, self._get_routing_key(),
|
||||
{"method": "get_diagnostics"})
|
||||
return self._call("get_diagnostics")
|
||||
|
||||
def prepare(self, memory_mb, databases):
|
||||
def prepare(self, memory_mb, databases, users):
|
||||
"""Make an asynchronous call to prepare the guest
|
||||
as a database container"""
|
||||
LOG.debug(_("Sending the call to prepare the Guest"))
|
||||
rpc.cast_with_consumer(self.context, self._get_routing_key(),
|
||||
{"method": "prepare",
|
||||
"args": {"databases": databases,
|
||||
"memory_mb": memory_mb}
|
||||
})
|
||||
self._cast_with_consumer("prepare", databases=databases,
|
||||
memory_mb=memory_mb, users=users)
|
||||
|
||||
def restart(self):
|
||||
"""Restart the MySQL server."""
|
||||
LOG.debug(_("Sending the call to restart MySQL on the Guest."))
|
||||
rpc.call(self.context, self._get_routing_key(),
|
||||
{"method": "restart",
|
||||
"args": {}
|
||||
})
|
||||
self._call("restart")
|
||||
|
||||
def start_mysql_with_conf_changes(self, updated_memory_size):
|
||||
"""Start the MySQL server."""
|
||||
LOG.debug(_("Sending the call to start MySQL on the Guest."))
|
||||
try:
|
||||
rpc.call(self.context, self._get_routing_key(),
|
||||
{"method": "start_mysql_with_conf_changes",
|
||||
"args": {'updated_memory_size': updated_memory_size}
|
||||
})
|
||||
except Exception as e:
|
||||
LOG.error(e)
|
||||
raise exception.GuestError(original_message=str(e))
|
||||
self._call("start_mysql_with_conf_changes",
|
||||
updated_memory_size=updated_memory_size)
|
||||
|
||||
def stop_mysql(self):
|
||||
"""Stop the MySQL server."""
|
||||
LOG.debug(_("Sending the call to stop MySQL on the Guest."))
|
||||
try:
|
||||
rpc.call(self.context, self._get_routing_key(),
|
||||
{"method": "stop_mysql",
|
||||
"args": {}
|
||||
})
|
||||
except Exception as e:
|
||||
LOG.error(e)
|
||||
raise exception.GuestError(original_message=str(e))
|
||||
self._call("stop_mysql")
|
||||
|
||||
def upgrade(self):
|
||||
"""Make an asynchronous call to self upgrade the guest agent"""
|
||||
topic = self._get_routing_key(self.context, self.id)
|
||||
LOG.debug(_("Sending an upgrade call to nova-guest %s"), topic)
|
||||
rpc.cast_with_consumer(self.context, topic, {"method": "upgrade"})
|
||||
self._cast_with_consumer("upgrade")
|
||||
|
@ -30,6 +30,7 @@ import logging
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from datetime import date
|
||||
@ -54,11 +55,25 @@ MYSQLD_ARGS = None
|
||||
PREPARING = False
|
||||
UUID = False
|
||||
|
||||
ORIG_MYCNF = "/etc/mysql/my.cnf"
|
||||
FINAL_MYCNF = "/var/lib/mysql/my.cnf"
|
||||
TMP_MYCNF = "/tmp/my.cnf.tmp"
|
||||
DBAAS_MYCNF = "/etc/dbaas/my.cnf/my.cnf.%dM"
|
||||
MYSQL_BASE_DIR = "/var/lib/mysql"
|
||||
|
||||
|
||||
def generate_random_password():
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
def get_auth_password():
|
||||
pwd, err = utils.execute_with_timeout("sudo", "awk",
|
||||
"/password\\t=/{print $3}", "/etc/mysql/my.cnf")
|
||||
if err:
|
||||
LOG.err(err)
|
||||
raise RuntimeError("Problem reading my.cnf! : %s" % err)
|
||||
|
||||
|
||||
def get_engine():
|
||||
"""Create the default engine with the updated admin user"""
|
||||
#TODO(rnirmal):Based on permissions issues being resolved we may revert
|
||||
@ -68,8 +83,8 @@ def get_engine():
|
||||
if ENGINE:
|
||||
return ENGINE
|
||||
#ENGINE = create_engine(name_or_url=url)
|
||||
pwd, err = utils.execute("sudo", "awk", "/password\\t=/{print $3}",
|
||||
"/etc/mysql/my.cnf")
|
||||
pwd, err = utils.execute_with_timeout("sudo", "awk",
|
||||
"/password\\t=/{print $3}", "/etc/mysql/my.cnf")
|
||||
if not err:
|
||||
ENGINE = create_engine("mysql://%s:%s@localhost:3306" %
|
||||
(ADMIN_USER_NAME, pwd.strip()),
|
||||
@ -97,8 +112,206 @@ def load_mysqld_options():
|
||||
return None
|
||||
|
||||
|
||||
class DBaaSAgent(object):
|
||||
""" Database as a Service Agent Controller """
|
||||
class MySqlAppStatus(object):
|
||||
"""
|
||||
Answers the question "what is the status of the MySQL application on
|
||||
this box?" The answer can be that the application is not installed, or
|
||||
the state of the application is determined by calling a series of
|
||||
commands.
|
||||
|
||||
This class also handles saving and load the status of the MySQL application
|
||||
in the database.
|
||||
The status is updated whenever the update() method is called, except
|
||||
if the state is changed to building or restart mode using the
|
||||
"begin_mysql_install" and "begin_mysql_restart" methods.
|
||||
The building mode persists in the database while restarting mode does
|
||||
not (so if there is a Python Pete crash update() will set the status to
|
||||
show a failure).
|
||||
These modes are exited and functionality to update() returns when
|
||||
end_install_or_restart() is called, at which point the status again
|
||||
reflects the actual status of the MySQL app.
|
||||
"""
|
||||
|
||||
_instance = None
|
||||
|
||||
def __init__(self):
|
||||
if self._instance is not None:
|
||||
raise RuntimeError("Cannot instantiate twice.")
|
||||
self.status = self._load_status()
|
||||
self.restart_mode = False
|
||||
|
||||
|
||||
def begin_mysql_install(self):
|
||||
"""Called right before MySQL is prepared."""
|
||||
self.set_status(rd_models.ServiceStatuses.BUILDING)
|
||||
|
||||
def begin_mysql_restart(self):
|
||||
"""Called before restarting MySQL."""
|
||||
self.restart_mode = True
|
||||
|
||||
def end_install_or_restart(self):
|
||||
"""Called after MySQL is installed or restarted.
|
||||
|
||||
Updates the database with the actual MySQL status.
|
||||
"""
|
||||
LOG.info("Ending install or restart.")
|
||||
self.restart_mode = False
|
||||
real_status = self._get_actual_db_status()
|
||||
LOG.info("Updating status to %s" % real_status)
|
||||
self.set_status(real_status)
|
||||
|
||||
@classmethod
|
||||
def get(cls):
|
||||
if not cls._instance:
|
||||
cls._instance = MySqlAppStatus()
|
||||
return cls._instance
|
||||
|
||||
def _get_actual_db_status(self):
|
||||
global MYSQLD_ARGS
|
||||
try:
|
||||
out, err = utils.execute_with_timeout("/usr/bin/mysqladmin",
|
||||
"ping", run_as_root=True)
|
||||
LOG.info("Service Status is RUNNING.")
|
||||
return rd_models.ServiceStatuses.RUNNING
|
||||
except ProcessExecutionError as e:
|
||||
LOG.error("Process execution ")
|
||||
try:
|
||||
out, err = utils.execute_with_timeout("/bin/ps", "-C", "mysqld",
|
||||
"h")
|
||||
pid = out.split()[0]
|
||||
# TODO(rnirmal): Need to create new statuses for instances
|
||||
# where the mysql service is up, but unresponsive
|
||||
LOG.info("Service Status is BLOCKED.")
|
||||
return rd_models.ServiceStatuses.BLOCKED
|
||||
except ProcessExecutionError as e:
|
||||
if not MYSQLD_ARGS:
|
||||
MYSQLD_ARGS = load_mysqld_options()
|
||||
pid_file = MYSQLD_ARGS.get('pid-file',
|
||||
'/var/run/mysqld/mysqld.pid')
|
||||
if os.path.exists(pid_file):
|
||||
LOG.info("Service Status is CRASHED.")
|
||||
return rd_models.ServiceStatuses.CRASHED
|
||||
else:
|
||||
LOG.info("Service Status is SHUTDOWN.")
|
||||
return rd_models.ServiceStatuses.SHUTDOWN
|
||||
|
||||
@property
|
||||
def is_mysql_installed(self):
|
||||
"""
|
||||
True if MySQL app should be installed and attempts to ascertain
|
||||
its status won't result in nonsense.
|
||||
"""
|
||||
return self.status is not None and \
|
||||
self.status != rd_models.ServiceStatuses.BUILDING and \
|
||||
self.status != rd_models.ServiceStatuses.FAILED
|
||||
|
||||
@property
|
||||
def _is_mysql_restarting(self):
|
||||
return self.restart_mode
|
||||
|
||||
@property
|
||||
def is_mysql_running(self):
|
||||
"""True if MySQL is running."""
|
||||
return self.status is not None and \
|
||||
self.status == rd_models.ServiceStatuses.RUNNING
|
||||
|
||||
@staticmethod
|
||||
def _load_status():
|
||||
"""Loads the status from the database."""
|
||||
id = config.Config.get('guest_id')
|
||||
return rd_models.InstanceServiceStatus.find_by(instance_id=id)
|
||||
|
||||
def set_status(self, status):
|
||||
"""Changes the status of the MySQL app in the database."""
|
||||
db_status = self._load_status()
|
||||
db_status.set_status(status)
|
||||
db_status.save()
|
||||
self.status = status
|
||||
|
||||
|
||||
def update(self):
|
||||
"""Find and report status of MySQL on this machine.
|
||||
|
||||
The database is update and the status is also returned.
|
||||
"""
|
||||
if self.is_mysql_installed and not self._is_mysql_restarting:
|
||||
LOG.info("Determining status of MySQL app...")
|
||||
status = self._get_actual_db_status()
|
||||
self.set_status(status)
|
||||
else:
|
||||
LOG.info("MySQL is not installed or is in restart mode, so for "
|
||||
"now we'll skip determining the status of MySQL on this "
|
||||
"box.")
|
||||
|
||||
def wait_for_real_status_to_change_to(self, status, max_time,
|
||||
update_db=False):
|
||||
"""
|
||||
Waits the given time for the real status to change to the one
|
||||
specified. Does not update the publicly viewable status Unless
|
||||
"update_db" is True.
|
||||
"""
|
||||
WAIT_TIME = 3
|
||||
waited_time = 0
|
||||
while(waited_time < max_time):
|
||||
time.sleep(WAIT_TIME)
|
||||
waited_time += WAIT_TIME
|
||||
LOG.info("Waiting for MySQL status to change to %s..." % status)
|
||||
actual_status = self._get_actual_db_status()
|
||||
LOG.info("MySQL status was %s after %d seconds."
|
||||
% (actual_status, waited_time))
|
||||
if actual_status == status:
|
||||
if update_db:
|
||||
self.set_status(actual_status)
|
||||
return True
|
||||
LOG.error("Time out while waiting for MySQL app status to change!")
|
||||
return False
|
||||
|
||||
|
||||
class LocalSqlClient(object):
|
||||
"""A sqlalchemy wrapper to manage transactions"""
|
||||
|
||||
def __init__(self, engine, use_flush=True):
|
||||
self.engine = engine
|
||||
self.use_flush = use_flush
|
||||
|
||||
def __enter__(self):
|
||||
self.conn = self.engine.connect()
|
||||
self.trans = self.conn.begin()
|
||||
return self.conn
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
if self.trans:
|
||||
if type is not None: # An error occurred
|
||||
self.trans.rollback()
|
||||
else:
|
||||
if self.use_flush:
|
||||
self.conn.execute(FLUSH)
|
||||
self.trans.commit()
|
||||
self.conn.close()
|
||||
|
||||
def execute(self, t, **kwargs):
|
||||
try:
|
||||
return self.conn.execute(t, kwargs)
|
||||
except:
|
||||
self.trans.rollback()
|
||||
self.trans = None
|
||||
raise
|
||||
|
||||
|
||||
class MySqlAdmin(object):
|
||||
"""Handles administrative tasks on the MySQL database."""
|
||||
|
||||
def create_database(self, databases):
|
||||
"""Create the list of specified databases"""
|
||||
client = LocalSqlClient(get_engine())
|
||||
with client:
|
||||
for item in databases:
|
||||
mydb = models.MySQLDatabase()
|
||||
mydb.deserialize(item)
|
||||
t = text("""CREATE DATABASE IF NOT EXISTS
|
||||
`%s` CHARACTER SET = %s COLLATE = %s;"""
|
||||
% (mydb.name, mydb.character_set, mydb.collate))
|
||||
client.execute(t)
|
||||
|
||||
def create_user(self, users):
|
||||
"""Create users and grant them privileges for the
|
||||
@ -122,36 +335,14 @@ class DBaaSAgent(object):
|
||||
% (mydb.name, user.name))
|
||||
client.execute(t, host=host)
|
||||
|
||||
def list_users(self):
|
||||
"""List users that have access to the database"""
|
||||
LOG.debug(_("---Listing Users---"))
|
||||
users = []
|
||||
def delete_database(self, database):
|
||||
"""Delete the specified database"""
|
||||
client = LocalSqlClient(get_engine())
|
||||
with client:
|
||||
mysql_user = models.MySQLUser()
|
||||
t = text("""select User from mysql.user where host !=
|
||||
'localhost';""")
|
||||
result = client.execute(t)
|
||||
LOG.debug(_("result = %s") % str(result))
|
||||
for row in result:
|
||||
LOG.debug(_("user = %s") % str(row))
|
||||
mysql_user = models.MySQLUser()
|
||||
mysql_user.name = row['User']
|
||||
# Now get the databases
|
||||
t = text("""SELECT grantee, table_schema
|
||||
from information_schema.SCHEMA_PRIVILEGES
|
||||
group by grantee, table_schema;""")
|
||||
db_result = client.execute(t)
|
||||
for db in db_result:
|
||||
matches = re.match("^'(.+)'@", db['grantee'])
|
||||
if matches is not None and \
|
||||
matches.group(1) == mysql_user.name:
|
||||
mysql_db = models.MySQLDatabase()
|
||||
mysql_db.name = db['table_schema']
|
||||
mysql_user.databases.append(mysql_db.serialize())
|
||||
users.append(mysql_user.serialize())
|
||||
LOG.debug(_("users = %s") % str(users))
|
||||
return users
|
||||
mydb = models.MySQLDatabase()
|
||||
mydb.deserialize(database)
|
||||
t = text("""DROP DATABASE `%s`;""" % mydb.name)
|
||||
client.execute(t)
|
||||
|
||||
def delete_user(self, user):
|
||||
"""Delete the specified users"""
|
||||
@ -162,17 +353,40 @@ class DBaaSAgent(object):
|
||||
t = text("""DROP USER `%s`""" % mysql_user.name)
|
||||
client.execute(t)
|
||||
|
||||
def create_database(self, databases):
|
||||
"""Create the list of specified databases"""
|
||||
def enable_root(self):
|
||||
"""Enable the root user global access and/or reset the root password"""
|
||||
host = "%"
|
||||
user = models.MySQLUser()
|
||||
user.name = "root"
|
||||
user.password = generate_random_password()
|
||||
client = LocalSqlClient(get_engine())
|
||||
with client:
|
||||
for item in databases:
|
||||
mydb = models.MySQLDatabase()
|
||||
mydb.deserialize(item)
|
||||
t = text("""CREATE DATABASE IF NOT EXISTS
|
||||
`%s` CHARACTER SET = %s COLLATE = %s;"""
|
||||
% (mydb.name, mydb.character_set, mydb.collate))
|
||||
client.execute(t)
|
||||
try:
|
||||
t = text("""CREATE USER :user@:host;""")
|
||||
client.execute(t, user=user.name, host=host, pwd=user.password)
|
||||
except exc.OperationalError as err:
|
||||
# Ignore, user is already created, just reset the password
|
||||
# TODO(rnirmal): More fine grained error checking later on
|
||||
LOG.debug(err)
|
||||
with client:
|
||||
t = text("""UPDATE mysql.user SET Password=PASSWORD(:pwd)
|
||||
WHERE User=:user;""")
|
||||
client.execute(t, user=user.name, pwd=user.password)
|
||||
t = text("""GRANT ALL PRIVILEGES ON *.* TO :user@:host
|
||||
WITH GRANT OPTION;""")
|
||||
client.execute(t, user=user.name, host=host)
|
||||
return user.serialize()
|
||||
|
||||
def is_root_enabled(self):
|
||||
"""Return True if root access is enabled; False otherwise."""
|
||||
client = LocalSqlClient(get_engine())
|
||||
with client:
|
||||
mysql_user = models.MySQLUser()
|
||||
t = text("""SELECT User FROM mysql.user where User = 'root'
|
||||
and host != 'localhost';""")
|
||||
result = client.execute(t)
|
||||
LOG.debug("result = " + str(result))
|
||||
return result.rowcount != 0
|
||||
|
||||
def list_databases(self):
|
||||
"""List databases the user created on this mysql instance"""
|
||||
@ -209,144 +423,92 @@ class DBaaSAgent(object):
|
||||
LOG.debug(_("databases = ") + str(databases))
|
||||
return databases
|
||||
|
||||
def delete_database(self, database):
|
||||
"""Delete the specified database"""
|
||||
client = LocalSqlClient(get_engine())
|
||||
with client:
|
||||
mydb = models.MySQLDatabase()
|
||||
mydb.deserialize(database)
|
||||
t = text("""DROP DATABASE `%s`;""" % mydb.name)
|
||||
client.execute(t)
|
||||
|
||||
def enable_root(self):
|
||||
"""Enable the root user global access and/or reset the root password"""
|
||||
host = "%"
|
||||
user = models.MySQLUser()
|
||||
user.name = "root"
|
||||
user.password = generate_random_password()
|
||||
client = LocalSqlClient(get_engine())
|
||||
with client:
|
||||
try:
|
||||
t = text("""CREATE USER :user@:host;""")
|
||||
client.execute(t, user=user.name, host=host, pwd=user.password)
|
||||
except exc.OperationalError as err:
|
||||
# Ignore, user is already created, just reset the password
|
||||
# TODO(rnirmal): More fine grained error checking later on
|
||||
LOG.debug(err)
|
||||
with client:
|
||||
t = text("""UPDATE mysql.user SET Password=PASSWORD(:pwd)
|
||||
WHERE User=:user;""")
|
||||
client.execute(t, user=user.name, pwd=user.password)
|
||||
t = text("""GRANT ALL PRIVILEGES ON *.* TO :user@:host
|
||||
WITH GRANT OPTION;""")
|
||||
client.execute(t, user=user.name, host=host)
|
||||
return user.serialize()
|
||||
|
||||
def disable_root(self):
|
||||
"""Disable root access apart from localhost"""
|
||||
host = "localhost"
|
||||
pwd = generate_random_password()
|
||||
user = "root"
|
||||
client = LocalSqlClient(get_engine())
|
||||
with client:
|
||||
t = text("""DELETE FROM mysql.user where User=:user
|
||||
and Host!=:host""")
|
||||
client.execute(t, user=user, host=host)
|
||||
t = text("""UPDATE mysql.user SET Password=PASSWORD(:pwd)
|
||||
WHERE User=:user;""")
|
||||
client.execute(t, pwd=pwd, user=user)
|
||||
return True
|
||||
|
||||
def is_root_enabled(self):
|
||||
"""Return True if root access is enabled; False otherwise."""
|
||||
def list_users(self):
|
||||
"""List users that have access to the database"""
|
||||
LOG.debug(_("---Listing Users---"))
|
||||
users = []
|
||||
client = LocalSqlClient(get_engine())
|
||||
with client:
|
||||
mysql_user = models.MySQLUser()
|
||||
t = text("""SELECT User FROM mysql.user where User = 'root'
|
||||
and host != 'localhost';""")
|
||||
t = text("""select User from mysql.user where host !=
|
||||
'localhost';""")
|
||||
result = client.execute(t)
|
||||
LOG.debug(_("result = ") + str(result))
|
||||
return result.rowcount != 0
|
||||
LOG.debug("result = " + str(result))
|
||||
for row in result:
|
||||
LOG.debug("user = " + str(row))
|
||||
mysql_user = models.MySQLUser()
|
||||
mysql_user.name = row['User']
|
||||
# Now get the databases
|
||||
t = text("""SELECT grantee, table_schema
|
||||
from information_schema.SCHEMA_PRIVILEGES
|
||||
group by grantee, table_schema;""")
|
||||
db_result = client.execute(t)
|
||||
for db in db_result:
|
||||
matches = re.match("^'(.+)'@", db['grantee'])
|
||||
if matches is not None and \
|
||||
matches.group(1) == mysql_user.name:
|
||||
mysql_db = models.MySQLDatabase()
|
||||
mysql_db.name = db['table_schema']
|
||||
mysql_user.databases.append(mysql_db.serialize())
|
||||
users.append(mysql_user.serialize())
|
||||
LOG.debug("users = " + str(users))
|
||||
return users
|
||||
|
||||
def prepare(self, databases, memory_mb):
|
||||
|
||||
class DBaaSAgent(object):
|
||||
""" Database as a Service Agent Controller """
|
||||
|
||||
def __init__(self):
|
||||
self.status = MySqlAppStatus.get()
|
||||
|
||||
def begin_mysql_restart(self):
|
||||
self.restart_mode = True
|
||||
|
||||
def create_database(self, databases):
|
||||
return MySqlAdmin().create_database(databases)
|
||||
|
||||
def create_user(self, users):
|
||||
MySqlAdmin().create_user(users)
|
||||
|
||||
def delete_database(self, database):
|
||||
return MySqlAdmin().delete_database(database)
|
||||
|
||||
def delete_user(self, user):
|
||||
MySqlAdmin().delete_user(user)
|
||||
|
||||
def list_databases(self):
|
||||
return MySqlAdmin().list_databases()
|
||||
|
||||
def list_users(self):
|
||||
return MySqlAdmin().list_users()
|
||||
|
||||
def enable_root(self):
|
||||
return MySqlAdmin().enable_root()
|
||||
|
||||
def is_root_enabled(self):
|
||||
return MySqlAdmin().is_root_enabled()
|
||||
|
||||
def prepare(self, databases, memory_mb, users):
|
||||
"""Makes ready DBAAS on a Guest container."""
|
||||
global PREPARING
|
||||
PREPARING = True
|
||||
from reddwarf.guestagent.pkg import PkgAgent
|
||||
if not isinstance(self, PkgAgent):
|
||||
raise TypeError("This must also be an instance of Pkg agent.")
|
||||
preparer = DBaaSPreparer(self)
|
||||
preparer.prepare()
|
||||
pkg = self # Python cast.
|
||||
app = MySqlApp(self.status)
|
||||
app.install_and_secure(pkg, memory_mb)
|
||||
LOG.info("Creating initial databases and users following successful "
|
||||
"prepare.")
|
||||
self.create_database(databases)
|
||||
PREPARING = False
|
||||
self.create_user(users)
|
||||
LOG.info('"prepare" call has finished.')
|
||||
|
||||
def restart(self):
|
||||
app = MySqlApp(self.status)
|
||||
app.restart()
|
||||
|
||||
def update_status(self):
|
||||
"""Update the status of the MySQL service"""
|
||||
global MYSQLD_ARGS
|
||||
global PREPARING
|
||||
id = config.Config.get('guest_id')
|
||||
status = rd_models.InstanceServiceStatus.find_by(instance_id=id)
|
||||
|
||||
if PREPARING:
|
||||
status.set_status(rd_models.ServiceStatuses.BUILDING)
|
||||
status.save()
|
||||
return
|
||||
|
||||
try:
|
||||
out, err = utils.execute("/usr/bin/mysqladmin", "ping",
|
||||
run_as_root=True)
|
||||
status.set_status(rd_models.ServiceStatuses.RUNNING)
|
||||
status.save()
|
||||
except ProcessExecutionError as e:
|
||||
try:
|
||||
out, err = utils.execute("ps", "-C", "mysqld", "h")
|
||||
pid = out.split()[0]
|
||||
# TODO(rnirmal): Need to create new statuses for instances
|
||||
# where the mysql service is up, but unresponsive
|
||||
status.set_status(rd_models.ServiceStatuses.BLOCKED)
|
||||
status.save()
|
||||
except ProcessExecutionError as e:
|
||||
if not MYSQLD_ARGS:
|
||||
MYSQLD_ARGS = load_mysqld_options()
|
||||
pid_file = MYSQLD_ARGS.get('pid-file',
|
||||
'/var/run/mysqld/mysqld.pid')
|
||||
if os.path.exists(pid_file):
|
||||
status.set_status(rd_models.ServiceStatuses.CRASHED)
|
||||
status.save()
|
||||
else:
|
||||
status.set_status(rd_models.ServiceStatuses.SHUTDOWN)
|
||||
status.save()
|
||||
|
||||
|
||||
class LocalSqlClient(object):
|
||||
"""A sqlalchemy wrapper to manage transactions"""
|
||||
|
||||
def __init__(self, engine, use_flush=True):
|
||||
self.engine = engine
|
||||
self.use_flush = use_flush
|
||||
|
||||
def __enter__(self):
|
||||
self.conn = self.engine.connect()
|
||||
self.trans = self.conn.begin()
|
||||
return self.conn
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
if self.trans:
|
||||
if type is not None: # An error occurred
|
||||
self.trans.rollback()
|
||||
else:
|
||||
if self.use_flush:
|
||||
self.conn.execute(FLUSH)
|
||||
self.trans.commit()
|
||||
self.conn.close()
|
||||
|
||||
def execute(self, t, **kwargs):
|
||||
try:
|
||||
return self.conn.execute(t, kwargs)
|
||||
except:
|
||||
self.trans.rollback()
|
||||
self.trans = None
|
||||
raise
|
||||
MySqlAppStatus.get().update()
|
||||
|
||||
|
||||
class KeepAliveConnection(interfaces.PoolListener):
|
||||
@ -370,68 +532,16 @@ class KeepAliveConnection(interfaces.PoolListener):
|
||||
raise
|
||||
|
||||
|
||||
class DBaaSPreparer(object):
|
||||
class MySqlApp(object):
|
||||
"""Prepares DBaaS on a Guest container."""
|
||||
|
||||
TIME_OUT = 1000
|
||||
|
||||
def __init__(self, pkg_agent):
|
||||
def __init__(self, status):
|
||||
""" By default login with root no password for initial setup. """
|
||||
self.engine = create_engine("mysql://root:@localhost:3306", echo=True)
|
||||
self.pkg = pkg_agent
|
||||
|
||||
def _generate_root_password(self, client):
|
||||
""" Generate and set a random root password and forget about it. """
|
||||
t = text("""UPDATE mysql.user SET Password=PASSWORD(:pwd)
|
||||
WHERE User='root';""")
|
||||
client.execute(t, pwd=generate_random_password())
|
||||
|
||||
def _init_mycnf(self, password):
|
||||
"""
|
||||
Install the set of mysql my.cnf templates from dbaas-mycnf package.
|
||||
The package generates a template suited for the current
|
||||
container flavor. Update the os_admin user and password
|
||||
to the my.cnf file for direct login from localhost
|
||||
"""
|
||||
orig_mycnf = "/etc/mysql/my.cnf"
|
||||
final_mycnf = "/var/lib/mysql/my.cnf"
|
||||
tmp_mycnf = "/tmp/my.cnf.tmp"
|
||||
dbaas_mycnf = "/etc/dbaas/my.cnf/my.cnf.default"
|
||||
|
||||
LOG.debug(_("Installing my.cnf templates"))
|
||||
self.pkg.pkg_install("dbaas-mycnf", self.TIME_OUT)
|
||||
|
||||
if os.path.isfile(dbaas_mycnf):
|
||||
utils.execute("sudo", "mv", orig_mycnf,
|
||||
"%(name)s.%(date)s"
|
||||
% {'name': orig_mycnf,
|
||||
'date': date.today().isoformat()})
|
||||
utils.execute("sudo", "cp", dbaas_mycnf, orig_mycnf)
|
||||
|
||||
mycnf_file = open(orig_mycnf, 'r')
|
||||
tmp_file = open(tmp_mycnf, 'w')
|
||||
|
||||
for line in mycnf_file:
|
||||
tmp_file.write(line)
|
||||
if "[client]" in line:
|
||||
tmp_file.write("user\t\t= %s\n" % ADMIN_USER_NAME)
|
||||
tmp_file.write("password\t= %s\n" % password)
|
||||
|
||||
mycnf_file.close()
|
||||
tmp_file.close()
|
||||
utils.execute("sudo", "mv", tmp_mycnf, final_mycnf)
|
||||
utils.execute("sudo", "rm", orig_mycnf)
|
||||
utils.execute("sudo", "ln", "-s", final_mycnf, orig_mycnf)
|
||||
|
||||
def _remove_anonymous_user(self, client):
|
||||
t = text("""DELETE FROM mysql.user WHERE User='';""")
|
||||
client.execute(t)
|
||||
|
||||
def _remove_remote_root_access(self, client):
|
||||
t = text("""DELETE FROM mysql.user
|
||||
WHERE User='root'
|
||||
AND Host!='localhost';""")
|
||||
client.execute(t)
|
||||
self.state_change_wait_time = config.Config.get(
|
||||
'state_change_wait_time', 2 * 60)
|
||||
self.status = status
|
||||
|
||||
def _create_admin_user(self, client, password):
|
||||
"""
|
||||
@ -451,54 +561,183 @@ class DBaaSPreparer(object):
|
||||
""")
|
||||
client.execute(t, user=ADMIN_USER_NAME)
|
||||
|
||||
def _install_mysql(self):
|
||||
"""Install mysql server. The current version is 5.1"""
|
||||
LOG.debug(_("Installing mysql server"))
|
||||
self.pkg.pkg_install("mysql-server-5.1", self.TIME_OUT)
|
||||
#TODO(rnirmal): Add checks to make sure the package got installed
|
||||
@staticmethod
|
||||
def _generate_root_password(client):
|
||||
""" Generate and set a random root password and forget about it. """
|
||||
t = text("""UPDATE mysql.user SET Password=PASSWORD(:pwd)
|
||||
WHERE User='root';""")
|
||||
client.execute(t, pwd=generate_random_password())
|
||||
|
||||
def _restart_mysql(self):
|
||||
"""
|
||||
Restart mysql after all the modifications are completed.
|
||||
List of modifications:
|
||||
- Remove existing ib_logfile*
|
||||
"""
|
||||
# TODO(rnirmal): To be replaced by the mounted volume location
|
||||
# FIXME once we have volumes in place, use default till then
|
||||
mysql_base_dir = "/var/lib/mysql"
|
||||
try:
|
||||
LOG.debug(_("Restarting mysql..."))
|
||||
utils.execute("sudo", "service", "mysql", "stop")
|
||||
|
||||
# Remove the ib_logfile, if not mysql won't start.
|
||||
# For some reason wildcards don't seem to work, so
|
||||
# deleting both the files separately
|
||||
utils.execute("sudo", "rm", "%s/ib_logfile0" % mysql_base_dir)
|
||||
utils.execute("sudo", "rm", "%s/ib_logfile1" % mysql_base_dir)
|
||||
|
||||
utils.execute("sudo", "service", "mysql", "start")
|
||||
except ProcessExecutionError:
|
||||
LOG.error(_("Unable to restart mysql server."))
|
||||
|
||||
def prepare(self):
|
||||
def install_and_secure(self, pkg, memory_mb):
|
||||
"""Prepare the guest machine with a secure mysql server installation"""
|
||||
LOG.info(_("Preparing Guest as MySQL Server"))
|
||||
try:
|
||||
utils.execute("apt-get", "update", run_as_root=True)
|
||||
except ProcessExecutionError as e:
|
||||
LOG.error(_("Error updating the apt sources"))
|
||||
|
||||
self._install_mysql()
|
||||
|
||||
#TODO(tim.simpson): Check that MySQL is not already installed.
|
||||
self.status.begin_mysql_install()
|
||||
self._install_mysql(pkg)
|
||||
LOG.info(_("Generating root password..."))
|
||||
admin_password = generate_random_password()
|
||||
|
||||
client = LocalSqlClient(self.engine)
|
||||
engine = create_engine("mysql://root:@localhost:3306", echo=True)
|
||||
client = LocalSqlClient(engine)
|
||||
with client:
|
||||
self._generate_root_password(client)
|
||||
self._remove_anonymous_user(client)
|
||||
self._remove_remote_root_access(client)
|
||||
self._create_admin_user(client, admin_password)
|
||||
|
||||
self._init_mycnf(admin_password)
|
||||
self._restart_mysql()
|
||||
self._internal_stop_mysql()
|
||||
self._write_mycnf(pkg, memory_mb, admin_password)
|
||||
self._start_mysql()
|
||||
|
||||
self.status.end_install_or_restart()
|
||||
LOG.info(_("Dbaas preparation complete."))
|
||||
|
||||
def _install_mysql(self, pkg):
|
||||
"""Install mysql server. The current version is 5.1"""
|
||||
LOG.debug(_("Installing mysql server"))
|
||||
pkg.pkg_install("mysql-server-5.1", self.TIME_OUT)
|
||||
LOG.debug(_("Finished installing mysql server"))
|
||||
#TODO(rnirmal): Add checks to make sure the package got installed
|
||||
|
||||
def _internal_stop_mysql(self, update_db=False):
|
||||
LOG.info(_("Stopping mysql..."))
|
||||
utils.execute_with_timeout("sudo", "/etc/init.d/mysql", "stop")
|
||||
if not self.status.wait_for_real_status_to_change_to(
|
||||
rd_models.ServiceStatuses.SHUTDOWN,
|
||||
self.state_change_wait_time, update_db):
|
||||
LOG.error(_("Could not stop MySQL!"))
|
||||
self.status.end_install_or_restart()
|
||||
raise RuntimeError("Could not stop MySQL!")
|
||||
|
||||
def _remove_anonymous_user(self, client):
|
||||
t = text("""DELETE FROM mysql.user WHERE User='';""")
|
||||
client.execute(t)
|
||||
|
||||
def _remove_remote_root_access(self, client):
|
||||
t = text("""DELETE FROM mysql.user
|
||||
WHERE User='root'
|
||||
AND Host!='localhost';""")
|
||||
client.execute(t)
|
||||
|
||||
def restart(self):
|
||||
try:
|
||||
self.status.begin_mysql_restart()
|
||||
self._internal_stop_mysql()
|
||||
self._start_mysql()
|
||||
finally:
|
||||
self.status.end_install_or_restart()
|
||||
|
||||
# def _restart_mysql_and_wipe_ib_logfiles(self):
|
||||
# """Stops MySQL and restarts it, wiping the ib_logfiles in-between.
|
||||
|
||||
# This should never be done unless the innodb_log_file_size changes.
|
||||
# """
|
||||
# LOG.info("Restarting mysql...")
|
||||
# self._internal_stop_mysql()
|
||||
# self._wipe_ib_logfiles()
|
||||
# self._start_mysql()
|
||||
|
||||
def _replace_mycnf_with_template(self, template_path, original_path):
|
||||
if os.path.isfile(template_path):
|
||||
utils.execute_with_timeout("sudo", "mv", original_path,
|
||||
"%(name)s.%(date)s" % {'name': original_path,
|
||||
'date': date.today().isoformat()})
|
||||
utils.execute_with_timeout("sudo", "cp", template_path,
|
||||
original_path)
|
||||
|
||||
def _write_temp_mycnf_with_admin_account(self, original_file_path,
|
||||
temp_file_path, password):
|
||||
mycnf_file = open(original_file_path, 'r')
|
||||
tmp_file = open(temp_file_path, 'w')
|
||||
for line in mycnf_file:
|
||||
tmp_file.write(line)
|
||||
if "[client]" in line:
|
||||
tmp_file.write("user\t\t= %s\n" % ADMIN_USER_NAME)
|
||||
tmp_file.write("password\t= %s\n" % password)
|
||||
mycnf_file.close()
|
||||
tmp_file.close()
|
||||
|
||||
def wipe_ib_logfiles(self):
|
||||
LOG.info(_("Wiping ib_logfiles..."))
|
||||
utils.execute_with_timeout("sudo", "rm", "%s/ib_logfile0"
|
||||
% MYSQL_BASE_DIR)
|
||||
utils.execute_with_timeout("sudo", "rm", "%s/ib_logfile1"
|
||||
% MYSQL_BASE_DIR)
|
||||
|
||||
def _write_mycnf(self, pkg, update_memory_mb, admin_password):
|
||||
"""
|
||||
Install the set of mysql my.cnf templates from dbaas-mycnf package.
|
||||
The package generates a template suited for the current
|
||||
container flavor. Update the os_admin user and password
|
||||
to the my.cnf file for direct login from localhost
|
||||
"""
|
||||
LOG.info(_("Writing my.cnf templates."))
|
||||
if admin_password is None:
|
||||
admin_password = get_auth_password()
|
||||
|
||||
# As of right here, the admin_password contains the password to be
|
||||
# applied to the my.cnf file, whether it was there before (and we
|
||||
# passed it in) or we generated a new one just now (because we didn't
|
||||
# find it).
|
||||
|
||||
LOG.debug(_("Installing my.cnf templates"))
|
||||
pkg.pkg_install("dbaas-mycnf", self.TIME_OUT)
|
||||
|
||||
LOG.info(_("Replacing my.cnf with template."))
|
||||
template_path = DBAAS_MYCNF % update_memory_mb
|
||||
|
||||
# replace my.cnf with template.
|
||||
self._replace_mycnf_with_template(template_path, ORIG_MYCNF)
|
||||
|
||||
LOG.info(_("Writing new temp my.cnf."))
|
||||
self._write_temp_mycnf_with_admin_account(ORIG_MYCNF, TMP_MYCNF,
|
||||
admin_password)
|
||||
# permissions work-around
|
||||
LOG.info(_("Moving tmp into final."))
|
||||
utils.execute_with_timeout("sudo", "mv", TMP_MYCNF, FINAL_MYCNF)
|
||||
LOG.info(_("Removing original my.cnf."))
|
||||
utils.execute_with_timeout("sudo", "rm", ORIG_MYCNF)
|
||||
LOG.info(_("Symlinking final my.cnf."))
|
||||
utils.execute_with_timeout("sudo", "ln", "-s", FINAL_MYCNF, ORIG_MYCNF)
|
||||
self.wipe_ib_logfiles()
|
||||
|
||||
|
||||
def _start_mysql(self, update_db=False):
|
||||
LOG.info(_("Starting mysql..."))
|
||||
# This is the site of all the trouble in the restart tests.
|
||||
# Essentially what happens is thaty mysql start fails, but does not
|
||||
# die. It is then impossible to kill the original, so
|
||||
|
||||
try:
|
||||
utils.execute_with_timeout("sudo", "/etc/init.d/mysql", "start")
|
||||
except ProcessExecutionError:
|
||||
# If it won't start, but won't die either, kill it by hand so we
|
||||
# don't let a rouge process wander around.
|
||||
try:
|
||||
utils.execute_with_timeout("sudo", "pkill", "-9", "mysql")
|
||||
except ProcessExecutionError, p:
|
||||
LOG.error("Error killing stalled mysql start command.")
|
||||
LOG.error(p)
|
||||
# There's nothing more we can do...
|
||||
raise RuntimeError("Can't start MySQL!")
|
||||
|
||||
if not self.status.wait_for_real_status_to_change_to(
|
||||
rd_models.ServiceStatuses.RUNNING,
|
||||
self.state_change_wait_time, update_db):
|
||||
LOG.error(_("Start up of MySQL failed!"))
|
||||
self.status.end_install_or_restart()
|
||||
raise RuntimeError("Could not start MySQL!")
|
||||
|
||||
def start_mysl_with_conf_changes(self, pkg, updated_memory_mb):
|
||||
LOG.info(_("Starting mysql with conf changes..."))
|
||||
if self.status.is_mysql_running:
|
||||
LOG.error(_("Cannot execute start_mysql_with_conf_changes because "
|
||||
"MySQL state == %s!") % self.status)
|
||||
raise RuntimeError("MySQL not stopped.")
|
||||
LOG.info(_("Initiating config."))
|
||||
self._write_mycnf(pkg, update_memory_mb, None)
|
||||
self._start_mysql(True)
|
||||
|
||||
def stop_mysql(self):
|
||||
self._internal_stop_mysql(True)
|
||||
|
@ -26,6 +26,7 @@ handles RPC calls relating to Platform specific operations.
|
||||
|
||||
import functools
|
||||
import logging
|
||||
import traceback
|
||||
|
||||
from reddwarf.common import exception
|
||||
from reddwarf.common import utils
|
||||
@ -69,12 +70,17 @@ class GuestManager(service.Manager):
|
||||
Right now does the status updates"""
|
||||
status_method = "update_status"
|
||||
try:
|
||||
getattr(self.driver, status_method)()
|
||||
method = getattr(self.driver, status_method)
|
||||
except AttributeError as ae:
|
||||
LOG.error(_("Method %s not found for driver %s"), status_method,
|
||||
self.driver)
|
||||
if raise_on_error:
|
||||
raise ae
|
||||
try:
|
||||
method()
|
||||
except Exception as e:
|
||||
LOG.error("Got an error during periodic tasks!")
|
||||
LOG.debug(traceback.format_exc())
|
||||
|
||||
def upgrade(self, context):
|
||||
"""Upgrade the guest agent and restart the agent"""
|
||||
@ -87,8 +93,14 @@ class GuestManager(service.Manager):
|
||||
def _mapper(self, method, context, *args, **kwargs):
|
||||
""" Tries to call the respective driver method """
|
||||
try:
|
||||
return getattr(self.driver, method)(*args, **kwargs)
|
||||
func = getattr(self.driver, method)
|
||||
except AttributeError:
|
||||
LOG.error(_("Method %s not found for driver %s"), method, self.driver)
|
||||
raise exception.NotFound("Method not available for the "
|
||||
"chosen driver")
|
||||
LOG.error(_("Method %s not found for driver %s"), method,
|
||||
self.driver)
|
||||
raise exception.NotFound("Method %s is not available for the "
|
||||
"chosen driver.")
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except Exception as e:
|
||||
LOG.error("Got an error running %s!" % method)
|
||||
LOG.debug(traceback.format_exc())
|
||||
|
@ -22,6 +22,8 @@ import logging
|
||||
import pexpect
|
||||
|
||||
from reddwarf.common import exception
|
||||
from reddwarf.common.exception import ProcessExecutionError
|
||||
from reddwarf.common import utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -102,10 +104,16 @@ class PkgAgent(object):
|
||||
return RUN_DPKG_FIRST
|
||||
elif i == 4:
|
||||
raise PkgAdminLockError()
|
||||
wait_and_close_proc(child)
|
||||
except pexpect.TIMEOUT:
|
||||
kill_proc(child)
|
||||
raise PkgTimeout("Process timeout after %i seconds." % time_out)
|
||||
try:
|
||||
wait_and_close_proc(child)
|
||||
except pexpect.TIMEOUT as e:
|
||||
LOG.error("wait_and_close_proc failed: %s" % e)
|
||||
#TODO(tim.simpson): As of RDL, and on my machine exclusively (in
|
||||
# both Virtual Box and VmWare!) this fails, but
|
||||
# the package is installed.
|
||||
return OK
|
||||
|
||||
def _remove(self, package_name, time_out):
|
||||
@ -148,6 +156,11 @@ class PkgAgent(object):
|
||||
|
||||
def pkg_install(self, package_name, time_out):
|
||||
"""Installs a package."""
|
||||
try:
|
||||
utils.execute("apt-get", "update", run_as_root=True)
|
||||
except ProcessExecutionError as e:
|
||||
LOG.error(_("Error updating the apt sources"))
|
||||
|
||||
result = self._install(package_name, time_out)
|
||||
if result != OK:
|
||||
if result == RUN_DPKG_FIRST:
|
||||
|
@ -23,7 +23,7 @@ import netaddr
|
||||
from reddwarf import db
|
||||
|
||||
from reddwarf.common import config
|
||||
from reddwarf.guestagent import api as guest_api
|
||||
#from reddwarf.guestagent import api as guest_api
|
||||
from reddwarf.common import exception as rd_exceptions
|
||||
from reddwarf.common import utils
|
||||
from reddwarf.instance.tasks import InstanceTask
|
||||
@ -34,7 +34,6 @@ from novaclient import exceptions as nova_exceptions
|
||||
from reddwarf.common.models import NovaRemoteModelBase
|
||||
from reddwarf.common.remote import create_nova_client
|
||||
from reddwarf.common.remote import create_guest_client
|
||||
from reddwarf.guestagent.db import models as guest_models
|
||||
|
||||
|
||||
CONFIG = config.Config
|
||||
@ -61,6 +60,7 @@ def populate_databases(dbs):
|
||||
Create a serializable request with user provided data
|
||||
for creating new databases.
|
||||
"""
|
||||
from reddwarf.guestagent.db import models as guest_models
|
||||
try:
|
||||
databases = []
|
||||
for database in dbs:
|
||||
@ -83,8 +83,9 @@ class InstanceStatus(object):
|
||||
SHUTDOWN = "SHUTDOWN"
|
||||
|
||||
|
||||
# If the compute server is in any of these states we can't delete it.
|
||||
SERVER_INVALID_DELETE_STATUSES = ["BUILD", "REBOOT", "REBUILD"]
|
||||
# If the compute server is in any of these states we can't perform any
|
||||
# actions (delete, resize, etc).
|
||||
SERVER_INVALID_ACTION_STATUSES = ["BUILD", "REBOOT", "REBUILD"]
|
||||
|
||||
# Statuses in which an instance can have an action performed.
|
||||
VALID_ACTION_STATUSES = ["ACTIVE"]
|
||||
@ -118,8 +119,7 @@ class Instance(object):
|
||||
return Instance(context, db_info, server, service_status)
|
||||
|
||||
def delete(self, force=False):
|
||||
LOG.debug(_("Deleting instance %s...") % self.id)
|
||||
if not force and self.server.status in SERVER_INVALID_DELETE_STATUSES:
|
||||
if not force and self.server.status in SERVER_INVALID_ACTION_STATUSES:
|
||||
raise rd_exceptions.UnprocessableEntity("Instance %s is not ready."
|
||||
% self.id)
|
||||
LOG.debug(_(" ... deleting compute id = %s") %
|
||||
@ -142,7 +142,7 @@ class Instance(object):
|
||||
@classmethod
|
||||
def create(cls, context, name, flavor_ref, image_id, databases):
|
||||
db_info = DBInstance.create(name=name,
|
||||
task_status=InstanceTasks.BUILDING)
|
||||
task_status=InstanceTasks.NONE)
|
||||
LOG.debug(_("Created new Reddwarf instance %s...") % db_info.id)
|
||||
client = create_nova_client(context)
|
||||
server = client.servers.create(name, image_id, flavor_ref,
|
||||
@ -154,11 +154,12 @@ class Instance(object):
|
||||
status=ServiceStatuses.NEW)
|
||||
# Now wait for the response from the create to do additional work
|
||||
guest = create_guest_client(context, db_info.id)
|
||||
# populate the databases
|
||||
model_schemas = populate_databases(databases)
|
||||
guest.prepare(512, model_schemas)
|
||||
guest.prepare(databases=[], memory_mb=512, users=[])
|
||||
return Instance(context, db_info, server, service_status)
|
||||
|
||||
def get_guest(self):
|
||||
return create_guest_client(self.context, self.db_info.id)
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
return self.db_info.id
|
||||
@ -181,6 +182,8 @@ class Instance(object):
|
||||
#TODO(tim.simpson): As we enter more advanced cases dealing with
|
||||
# timeouts determine if the task_status should be integrated here
|
||||
# or removed entirely.
|
||||
if InstanceTasks.REBOOTING == self.db_info.task_status:
|
||||
return "REBOOT"
|
||||
# If the server is in any of these states they take precedence.
|
||||
if self.server.status in ["BUILD", "ERROR", "REBOOT", "RESIZE"]:
|
||||
return self.server.status
|
||||
@ -242,6 +245,68 @@ class Instance(object):
|
||||
LOG.debug(_(msg) % self.status)
|
||||
raise rd_exceptions.UnprocessableEntity(_(msg) % self.status)
|
||||
|
||||
def resize_flavor(self, new_flavor_id):
|
||||
LOG.info("Resizing flavor of instance %s..." % self.id)
|
||||
# TODO(tim.simpson): Validate the new flavor ID can be found or
|
||||
# raise FlavorNotFound exception.
|
||||
# TODO(tim.simpson): Actually perform flavor resize.
|
||||
raise RuntimeError("Not implemented (yet).")
|
||||
|
||||
def resize_volume(self, new_size):
|
||||
LOG.info("Resizing volume of instance %s..." % self.id)
|
||||
# TODO(tim.simpson): Validate old_size < new_size, or raise
|
||||
# rd_exceptions.BadRequest.
|
||||
# TODO(tim.simpson): resize volume.
|
||||
raise RuntimeError("Not implemented (yet).")
|
||||
|
||||
def restart(self):
|
||||
if self.server.status in SERVER_INVALID_ACTION_STATUSES:
|
||||
msg = _("Restart instance not allowed while instance %s is in %s "
|
||||
"status.") % (self.id, instance_state)
|
||||
LOG.debug(msg)
|
||||
# If the state is building then we throw an exception back
|
||||
raise rd_exceptions.UnprocessableEntity(msg)
|
||||
else:
|
||||
LOG.info("Restarting instance %s..." % self.id)
|
||||
# Set our local status since Nova might not change it quick enough.
|
||||
#TODO(tim.simpson): Possible bad stuff can happen if this service
|
||||
# shuts down before it can set status to NONE.
|
||||
# We need a last updated time to mitigate this;
|
||||
# after some period of tolerance, we'll assume the
|
||||
# status is no longer in effect.
|
||||
self.db_info.task_status = InstanceTasks.REBOOTING
|
||||
self.db_info.save()
|
||||
try:
|
||||
self.get_guest().restart()
|
||||
except rd_exceptions.GuestError:
|
||||
LOG.error("Failure to restart MySQL.")
|
||||
finally:
|
||||
self.db_info.task_status = InstanceTasks.NONE
|
||||
self.db_info.save()
|
||||
|
||||
def validate_can_perform_restart_or_reboot(self):
|
||||
"""
|
||||
Raises exception if an instance action cannot currently be performed.
|
||||
"""
|
||||
if self.db_info.task_status != InstanceTasks.NONE or \
|
||||
not self.service_status.status.restart_is_allowed:
|
||||
msg = "Instance is not currently available for an action to be " \
|
||||
"performed (task status was %s, service status was %s)." \
|
||||
% (self.db_info.task_status, self.service_status.status)
|
||||
LOG.error(msg)
|
||||
raise rd_exceptions.UnprocessableEntity(msg)
|
||||
|
||||
def validate_can_perform_resize(self):
|
||||
"""
|
||||
Raises exception if an instance action cannot currently be performed.
|
||||
"""
|
||||
if self.status != InstanceStatus.ACTIVE:
|
||||
msg = "Instance is not currently available for an action to be " \
|
||||
"performed (status was %s)." % self.status
|
||||
LOG.error(msg)
|
||||
raise rd_exceptions.UnprocessableEntity(msg)
|
||||
|
||||
|
||||
|
||||
def create_server_list_matcher(server_list):
|
||||
# Returns a method which finds a server from the given list.
|
||||
@ -472,6 +537,15 @@ class ServiceStatus(object):
|
||||
def is_valid_code(code):
|
||||
return code in ServiceStatus._lookup
|
||||
|
||||
@property
|
||||
def restart_is_allowed(self):
|
||||
return self._code in [ServiceStatuses.RUNNING._code,
|
||||
ServiceStatuses.SHUTDOWN._code, ServiceStatuses.CRASHED._code,
|
||||
ServiceStatuses.BLOCKED._code]
|
||||
|
||||
def __str__(self):
|
||||
return self._description
|
||||
|
||||
|
||||
class ServiceStatuses(object):
|
||||
RUNNING = ServiceStatus(0x01, 'running', 'ACTIVE')
|
||||
|
@ -86,6 +86,83 @@ class api_validation:
|
||||
class InstanceController(BaseController):
|
||||
"""Controller for instance functionality"""
|
||||
|
||||
def action(self, req, body, tenant_id, id):
|
||||
LOG.info("req : '%s'\n\n" % req)
|
||||
LOG.info("Comitting an ACTION again instance %s for tenant '%s'"
|
||||
% (id, tenant_id))
|
||||
context = req.environ[wsgi.CONTEXT_KEY]
|
||||
instance = models.Instance.load(context, id)
|
||||
_actions = {
|
||||
'restart': self._action_restart,
|
||||
'resize': self._action_resize
|
||||
}
|
||||
selected_action = None
|
||||
for key in body:
|
||||
if key in _actions:
|
||||
if selected_action is not None:
|
||||
msg = _("Only one action can be specified per request.")
|
||||
raise rd_exceptions.BadRequest(msg)
|
||||
selected_action = _actions[key]
|
||||
else:
|
||||
msg = _("Invalid instance action: %s") % key
|
||||
raise rd_exceptions.BadRequest(msg)
|
||||
|
||||
if selected_action:
|
||||
return selected_action(instance, body)
|
||||
else:
|
||||
raise rd_exceptions.BadRequest(_("Invalid request body."))
|
||||
|
||||
def _action_restart(self, instance, body):
|
||||
instance.validate_can_perform_restart_or_reboot()
|
||||
instance.restart()
|
||||
return webob.exc.HTTPAccepted()
|
||||
|
||||
def _action_resize(self, instance, body):
|
||||
"""
|
||||
Handles 2 cases
|
||||
1. resize volume
|
||||
body only contains {volume: {size: x}}
|
||||
2. resize instance
|
||||
body only contains {flavorRef: http.../2}
|
||||
|
||||
If the body has both we will throw back an error.
|
||||
"""
|
||||
instance.validate_can_perform_resize()
|
||||
options = {
|
||||
'volume': self._action_resize_volume,
|
||||
'flavorRef': self._action_resize_flavor
|
||||
}
|
||||
selected_option = None
|
||||
args = None
|
||||
for key in body['resize']:
|
||||
if key in options:
|
||||
if selected_option is not None:
|
||||
msg = _("Not allowed to resize volume and flavor at the "
|
||||
"same time.")
|
||||
raise rd_exceptions.BadRequest(msg)
|
||||
selected_option = options[key]
|
||||
args = body['resize'][key]
|
||||
else:
|
||||
raise rd_exceptions.BadRequest("Invalid resize argument %s"
|
||||
% key)
|
||||
if selected_option:
|
||||
return selected_option(self, instance, args)
|
||||
else:
|
||||
raise rd_exceptions.BadRequest(_("Missing resize arguments."))
|
||||
|
||||
def _action_resize_volume(self, instance, volume):
|
||||
if 'size' not in volume:
|
||||
raise rd_exceptions.BadRequest(
|
||||
"Missing 'size' property of 'volume' in request body.")
|
||||
new_size = volume['size']
|
||||
instance.resize_volume(new_size)
|
||||
return webob.exc.HTTPAccepted()
|
||||
|
||||
def _action_resize_flavor(self, instance, flavorRef):
|
||||
new_flavor_id = utils.get_id_from_href(flavorRef)
|
||||
instance.resize_flavor(new_flavor_id)
|
||||
return webob.exc.HTTPAccepted()
|
||||
|
||||
def detail(self, req, tenant_id):
|
||||
"""Return all instances."""
|
||||
LOG.info(_("req : '%s'\n\n") % req)
|
||||
@ -179,7 +256,7 @@ class InstanceController(BaseController):
|
||||
return wsgi.Result(views.InstanceDetailView(instance).data(), 200)
|
||||
|
||||
@staticmethod
|
||||
def _validate_empty_body(body):
|
||||
def _validate_body_not_empty(body):
|
||||
"""Check that the body is not empty"""
|
||||
if not body:
|
||||
msg = "The request contains an empty body"
|
||||
@ -211,7 +288,7 @@ class InstanceController(BaseController):
|
||||
@staticmethod
|
||||
def _validate(body):
|
||||
"""Validate that the request has all the required parameters"""
|
||||
InstanceController._validate_empty_body(body)
|
||||
InstanceController._validate_body_not_empty(body)
|
||||
try:
|
||||
body['instance']
|
||||
body['instance']['flavorRef']
|
||||
@ -234,41 +311,6 @@ class InstanceController(BaseController):
|
||||
raise rd_exceptions.ReddwarfError("Required element/key - %s "
|
||||
"was not specified" % e)
|
||||
|
||||
@staticmethod
|
||||
def _validate_single_resize_in_body(body):
|
||||
# Validate body resize does not have both volume and flavorRef
|
||||
try:
|
||||
resize = body['resize']
|
||||
if 'volume' in resize and 'flavorRef' in resize:
|
||||
msg = ("Not allowed to resize volume "
|
||||
"and flavor at the same time")
|
||||
LOG.error(_(msg))
|
||||
raise rd_exceptions.ReddwarfError(msg)
|
||||
except KeyError as e:
|
||||
LOG.error(_("Resize Instance Required field(s) - %s") % e)
|
||||
raise rd_exceptions.ReddwarfError("Required element/key - %s "
|
||||
"was not specified" % e)
|
||||
|
||||
@staticmethod
|
||||
def _validate_resize(body, old_volume_size):
|
||||
"""
|
||||
We are going to check that volume resizing data is present.
|
||||
"""
|
||||
InstanceController._validate_empty_body(body)
|
||||
try:
|
||||
body['resize']
|
||||
body['resize']['volume']
|
||||
new_volume_size = body['resize']['volume']['size']
|
||||
except KeyError as e:
|
||||
LOG.error(_("Resize Instance Required field(s) - %s") % e)
|
||||
raise rd_exceptions.ReddwarfError("Required element/key - %s "
|
||||
"was not specified" % e)
|
||||
Instance._validate_volume_size(new_volume_size)
|
||||
if int(new_volume_size) <= old_volume_size:
|
||||
raise rd_exceptions.ReddwarfError("The new volume 'size' cannot "
|
||||
"be less than the current volume size "
|
||||
"of '%s'" % old_volume_size)
|
||||
|
||||
|
||||
class API(wsgi.Router):
|
||||
"""API"""
|
||||
@ -283,7 +325,8 @@ class API(wsgi.Router):
|
||||
instance_resource = InstanceController().create_resource()
|
||||
path = "/{tenant_id}/instances"
|
||||
mapper.resource("instance", path, controller=instance_resource,
|
||||
collection={'detail': 'GET'})
|
||||
collection={'detail': 'GET'},
|
||||
member={'action': 'POST'})
|
||||
|
||||
# TODO(ed-): remove this when all mention of flavorservice
|
||||
# et cetera are moved away
|
||||
|
@ -55,8 +55,9 @@ class InstanceTask(object):
|
||||
|
||||
|
||||
class InstanceTasks(object):
|
||||
BUILDING = InstanceTask(0x01, 'BUILDING')
|
||||
NONE = InstanceTask(0x01, 'NONE')
|
||||
DELETING = InstanceTask(0x02, 'DELETING')
|
||||
REBOOTING = InstanceTask(0x03, 'REBOOTING')
|
||||
|
||||
|
||||
# Dissuade further additions at run-time.
|
||||
|
@ -310,6 +310,8 @@ def load_paste_app(app_name, options, args, config_dir=None):
|
||||
logger.debug("*" * 80)
|
||||
app = deploy.loadapp("config:%s" % conf_file, name=app_name)
|
||||
except (LookupError, ImportError), e:
|
||||
import traceback
|
||||
print traceback.format_exc()
|
||||
raise RuntimeError("Unable to load %(app_name)s from "
|
||||
"configuration file %(conf_file)s."
|
||||
"\nGot: %(e)r" % locals())
|
||||
|
@ -65,7 +65,7 @@ class FakeGuest(object):
|
||||
def list_users(self):
|
||||
return [self.users[name] for name in self.users]
|
||||
|
||||
def prepare(self, memory_mb, databases):
|
||||
def prepare(self, memory_mb, databases, users):
|
||||
from reddwarf.instance.models import InstanceServiceStatus
|
||||
from reddwarf.instance.models import ServiceStatuses
|
||||
def update_db():
|
||||
|
Loading…
x
Reference in New Issue
Block a user