[Sahara] Add Swift to output data sources
Added support of Swift in Sahara Data Sources. The context does not verify the output of the job. The cleanup deletes the output container. Change-Id: I7b4cdc14f6c8a6af227a953be298f266e5851c92
This commit is contained in:
parent
e7c9d436df
commit
90dcd43282
@ -17,6 +17,10 @@ from rally.common.i18n import _
|
||||
from rally.common import log as logging
|
||||
from rally.common import utils as rutils
|
||||
from rally import consts
|
||||
from rally import osclients
|
||||
from rally.plugins.openstack.context.cleanup import manager as resource_manager
|
||||
from rally.plugins.openstack.context.cleanup import resources as res_cleanup
|
||||
from rally.plugins.openstack.scenarios.swift import utils as swift_utils
|
||||
from rally.task import context
|
||||
|
||||
|
||||
@ -47,13 +51,67 @@ class SaharaOutputDataSources(context.Context):
|
||||
def setup(self):
|
||||
for user, tenant_id in rutils.iterate_per_tenants(
|
||||
self.context["users"]):
|
||||
self.context["tenants"][tenant_id]["sahara_output_conf"] = {
|
||||
"output_type": self.config["output_type"],
|
||||
"output_url_prefix": self.config["output_url_prefix"]}
|
||||
|
||||
@logging.log_task_wrapper(LOG.info, _("Exit context: `Sahara Output Data"
|
||||
"Sources`"))
|
||||
clients = osclients.Clients(user["endpoint"])
|
||||
sahara = clients.sahara()
|
||||
|
||||
if self.config["output_type"] == "swift":
|
||||
swift = swift_utils.SwiftScenario(clients=clients,
|
||||
context=self.context)
|
||||
container_name = rutils.generate_random_name(
|
||||
prefix=self.config["output_url_prefix"])
|
||||
self.context["tenants"][tenant_id]["sahara_container"] = {
|
||||
"name": swift._create_container(
|
||||
container_name=container_name),
|
||||
"output_swift_objects": []
|
||||
}
|
||||
self.setup_outputs_swift(swift, sahara, tenant_id,
|
||||
container_name,
|
||||
user["endpoint"].username,
|
||||
user["endpoint"].password)
|
||||
else:
|
||||
self.setup_outputs_hdfs(sahara, tenant_id,
|
||||
self.config["output_url_prefix"])
|
||||
|
||||
def setup_outputs_hdfs(self, sahara, tenant_id, output_url):
|
||||
output_ds = sahara.data_sources.create(
|
||||
name=self.generate_random_name(),
|
||||
description="",
|
||||
data_source_type="hdfs",
|
||||
url=output_url)
|
||||
|
||||
self.context["tenants"][tenant_id]["sahara_output"] = output_ds.id
|
||||
|
||||
def setup_outputs_swift(self, swift, sahara, tenant_id, container_name,
|
||||
username, password):
|
||||
output_ds_swift = sahara.data_sources.create(
|
||||
name=self.generate_random_name(),
|
||||
description="",
|
||||
data_source_type="swift",
|
||||
url="swift://" + container_name + ".sahara/",
|
||||
credential_user=username,
|
||||
credential_pass=password)
|
||||
|
||||
self.context["tenants"][tenant_id]["sahara_output"] = (
|
||||
output_ds_swift.id
|
||||
)
|
||||
|
||||
@logging.log_task_wrapper(LOG.info,
|
||||
_("Exit context: `Sahara Output Data Sources`"))
|
||||
def cleanup(self):
|
||||
# TODO(esikachev): Cleanup must iterate by output_url_prefix of
|
||||
# resources from setup() and delete them
|
||||
pass
|
||||
for user, tenant_id in rutils.iterate_per_tenants(
|
||||
self.context["users"]):
|
||||
if self.context["tenants"][tenant_id].get("sahara_container",
|
||||
{}).get(
|
||||
"name") is not None:
|
||||
for swift_object in (
|
||||
self.context["tenants"][tenant_id]["sahara_container"][
|
||||
"output_swift_objects"]):
|
||||
res_cleanup.SwiftObject(swift_object[1])
|
||||
res_cleanup.SwiftContainer(
|
||||
self.context["tenants"][tenant_id].get("sahara_container",
|
||||
{}).get("name"))
|
||||
resources = ["data_sources"]
|
||||
resource_manager.cleanup(
|
||||
names=["sahara.%s" % res for res in resources],
|
||||
users=self.context.get("users", []))
|
||||
|
@ -12,14 +12,61 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from rally.plugins.openstack.context.sahara import sahara_output_data_sources
|
||||
from tests.unit import test
|
||||
|
||||
CTX = "rally.plugins.openstack.context.sahara"
|
||||
|
||||
class SaharaOutputDataSourcesTestCase(test.TestCase):
|
||||
|
||||
class FakeDict(object):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
|
||||
class SaharaOutputDataSourcesTestCase(test.ScenarioTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(SaharaOutputDataSourcesTestCase, self).setUp()
|
||||
fake_dict = FakeDict()
|
||||
fake_dict.__dict__["username"] = "user"
|
||||
fake_dict.__dict__["password"] = "passwd"
|
||||
self.tenants_num = 2
|
||||
self.users_per_tenant = 2
|
||||
self.users = self.tenants_num * self.users_per_tenant
|
||||
self.task = mock.MagicMock()
|
||||
|
||||
self.tenants = {}
|
||||
self.users_key = []
|
||||
|
||||
for i in range(self.tenants_num):
|
||||
self.tenants[str(i)] = {"id": str(i), "name": str(i),
|
||||
"sahara_image": "42"}
|
||||
for j in range(self.users_per_tenant):
|
||||
self.users_key.append({"id": "%s_%s" % (str(i), str(j)),
|
||||
"tenant_id": str(i),
|
||||
"endpoint": fake_dict})
|
||||
|
||||
self.user_key = [{"id": i, "tenant_id": j, "endpoint": "endpoint"}
|
||||
for j in range(self.tenants_num)
|
||||
for i in range(self.users_per_tenant)]
|
||||
self.context.update({
|
||||
"config": {
|
||||
"users": {
|
||||
"tenants": self.tenants_num,
|
||||
"users_per_tenant": self.users_per_tenant,
|
||||
},
|
||||
"sahara_output_data_sources": {
|
||||
"output_type": "hdfs",
|
||||
"output_url_prefix": "hdfs://test_host/",
|
||||
},
|
||||
},
|
||||
"admin": {"endpoint": mock.MagicMock()},
|
||||
"task": mock.MagicMock(),
|
||||
"users": self.users_key,
|
||||
"tenants": self.tenants,
|
||||
})
|
||||
|
||||
def check_setup(self):
|
||||
context = sahara_output_data_sources.SaharaOutputDataSources.context[
|
||||
@ -27,6 +74,80 @@ class SaharaOutputDataSourcesTestCase(test.TestCase):
|
||||
self.assertIsNotNone(context.get("output_type"))
|
||||
self.assertIsNotNone(context.get("output_url_prefix"))
|
||||
|
||||
def check_cleanup(self):
|
||||
self.assertIsNone(
|
||||
sahara_output_data_sources.SaharaOutputDataSources.cleanup())
|
||||
@mock.patch("%s.sahara_output_data_sources.resource_manager.cleanup" % CTX)
|
||||
@mock.patch("%s.sahara_output_data_sources.osclients" % CTX)
|
||||
def test_setup_and_cleanup_hdfs(self, mock_osclients, mock_cleanup):
|
||||
|
||||
mock_sahara = mock_osclients.Clients(mock.MagicMock()).sahara()
|
||||
mock_sahara.data_sources.create.return_value = mock.MagicMock(
|
||||
id=42)
|
||||
|
||||
sahara_ctx = sahara_output_data_sources.SaharaOutputDataSources(
|
||||
self.context)
|
||||
sahara_ctx.generate_random_name = mock.Mock()
|
||||
|
||||
output_ds_crete_calls = []
|
||||
|
||||
for i in range(self.tenants_num):
|
||||
output_ds_crete_calls.append(mock.call(
|
||||
name=sahara_ctx.generate_random_name.return_value,
|
||||
description="",
|
||||
data_source_type="hdfs",
|
||||
url="hdfs://test_host/"))
|
||||
|
||||
sahara_ctx.setup()
|
||||
|
||||
mock_sahara.data_sources.create.assert_has_calls(
|
||||
output_ds_crete_calls)
|
||||
|
||||
sahara_ctx.cleanup()
|
||||
|
||||
mock_cleanup.assert_called_once_with(
|
||||
names=["sahara.data_sources"],
|
||||
users=self.context["users"])
|
||||
|
||||
@mock.patch("rally.common.utils.generate_random_name",
|
||||
return_value="rally")
|
||||
@mock.patch("%s.sahara_output_data_sources.osclients" % CTX)
|
||||
def test_setup_inputs_swift(self, mock_osclients,
|
||||
mock_generate_random_name):
|
||||
mock_sahara = mock_osclients.Clients(mock.MagicMock()).sahara()
|
||||
|
||||
self.context.update({
|
||||
"config": {
|
||||
"users": {
|
||||
"tenants": self.tenants_num,
|
||||
"users_per_tenant": self.users_per_tenant,
|
||||
},
|
||||
"sahara_output_data_sources": {
|
||||
"output_type": "swift",
|
||||
"output_url_prefix": "rally",
|
||||
},
|
||||
},
|
||||
"admin": {"endpoint": mock.MagicMock()},
|
||||
"task": mock.MagicMock(),
|
||||
"users": self.users_key,
|
||||
"tenants": self.tenants
|
||||
})
|
||||
|
||||
sahara_ctx = sahara_output_data_sources.SaharaOutputDataSources(
|
||||
self.context)
|
||||
sahara_ctx.generate_random_name = mock.Mock()
|
||||
|
||||
output_ds_crete_calls = []
|
||||
for i in range(self.tenants_num):
|
||||
output_ds_crete_calls.append(mock.call(
|
||||
name=sahara_ctx.generate_random_name.return_value,
|
||||
description="",
|
||||
data_source_type="swift",
|
||||
url="swift://rally.sahara/",
|
||||
credential_user="user",
|
||||
credential_pass="passwd"
|
||||
))
|
||||
|
||||
sahara_ctx.setup()
|
||||
|
||||
mock_sahara.data_sources.create.assert_has_calls(
|
||||
output_ds_crete_calls)
|
||||
|
||||
sahara_ctx.cleanup()
|
||||
|
Loading…
Reference in New Issue
Block a user