V2 Bindings

This provides bindings for:

- zones
- recordsets
- tlds
- blacklists
- limits
- nameservers

With associated unit tests.

Change-Id: Ie9b79340bd327b78916fd038633842da3ace881b
This commit is contained in:
Endre Karlson 2014-11-13 09:56:24 +01:00
parent 39d8b54d78
commit 2a2c85ce21
30 changed files with 1805 additions and 5 deletions

View File

@ -14,9 +14,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import json
from urllib import urlencode
import six
from stevedore import extension
from designateclient import exceptions
@six.add_metaclass(abc.ABCMeta)
@ -25,6 +29,57 @@ class Controller(object):
def __init__(self, client):
self.client = client
def build_url(self, url, criterion=None, marker=None, limit=None):
params = criterion or {}
if marker is not None:
params['marker'] = marker
if limit is not None:
params['limit'] = limit
q = urlencode(params) if params else ''
return '%(url)s%(params)s' % {
'url': url,
'params': '?%s' % q
}
def _serialize(self, kwargs):
if 'data' in kwargs:
kwargs['data'] = json.dumps(kwargs['data'])
def _post(self, url, response_key=None, **kwargs):
self._serialize(kwargs)
resp, body = self.client.session.post(url, **kwargs)
if response_key is not None:
return body[response_key]
return body
def _get(self, url, response_key=None):
resp, body = self.client.session.get(url)
if response_key is not None:
return body[response_key]
return body
def _patch(self, url, response_key=None, **kwargs):
self._serialize(kwargs)
resp, body = self.client.session.patch(url, **kwargs)
if response_key is not None:
return body[response_key]
return body
def _put(self, url, response_key=None, **kwargs):
self._serialize(kwargs)
resp, body = self.client.session.put(url, **kwargs)
if response_key is not None:
return body[response_key]
return body
def _delete(self, url):
resp, body = self.client.session.delete(url)
@six.add_metaclass(abc.ABCMeta)
class CrudController(Controller):
@ -58,3 +113,17 @@ class CrudController(Controller):
"""
Delete a resource
"""
def get_versions():
mgr = extension.ExtensionManager('designateclient.versions')
return dict([(ep.name, ep.plugin) for ep in mgr.extensions])
def Client(version, *args, **kwargs): # noqa
versions = get_versions()
if version not in versions:
msg = 'Version %s is not supported, use one of (%s)' % (
version, versions.keys())
raise exceptions.UnsupportedVersion(msg)
return versions[version](*args, **kwargs)

View File

@ -19,10 +19,18 @@ class Base(Exception):
pass
class UnsupportedVersion(Base):
pass
class ResourceNotFound(Base):
pass
class NoUniqueMatch(Base):
pass
class RemoteError(Base):
def __init__(self, message=None, code=None, type=None, errors=None,
request_id=None):

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2010-2011 OpenStack Foundation
# Copyright (c) 2013 Hewlett-Packard Development Company, L.P.
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -14,16 +14,23 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json as json_
import os
import fixtures
import testtools
from keystoneclient import adapter
from keystoneclient import session as keystone_session
from oslotest import base as test
from requests_mock.contrib import fixture as req_fixture
import six
from six.moves.urllib import parse as urlparse
from designateclient import client
_TRUE_VALUES = ('True', 'true', '1', 'yes')
class TestCase(testtools.TestCase):
class TestCase(test.BaseTestCase):
"""Test case base class for all unit tests."""
@ -51,3 +58,89 @@ class TestCase(testtools.TestCase):
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.log_fixture = self.useFixture(fixtures.FakeLogger())
class APITestCase(TestCase):
"""Test case base class for all unit tests."""
TEST_URL = "http://127.0.0.1:9001/"
VERSION = None
def setUp(self):
"""Run before each test method to initialize test environment."""
super(TestCase, self).setUp()
self.log_fixture = self.useFixture(fixtures.FakeLogger())
self.requests = self.useFixture(req_fixture.Fixture())
self.client = self.get_client()
def get_base(self, base_url=None):
if not base_url:
base_url = '%sv%s' % (self.TEST_URL, self.VERSION)
return base_url
def stub_url(self, method, parts=None, base_url=None, json=None, **kwargs):
base_url = self.get_base(base_url)
if json:
kwargs['text'] = json_.dumps(json)
headers = kwargs.setdefault('headers', {})
headers['Content-Type'] = 'application/json'
if parts:
url = '/'.join([p.strip('/') for p in [base_url] + parts])
else:
url = base_url
url = url.replace("/?", "?")
self.requests.register_uri(method, url, **kwargs)
def get_client(self, version=None, session=None):
version = version or self.VERSION
session = session or keystone_session.Session()
adapted = adapter.Adapter(
session=session, endpoint_override=self.get_base())
return client.Client(version, session=adapted)
def assertRequestBodyIs(self, body=None, json=None):
last_request_body = self.requests.last_request.body
if json:
val = json_.loads(last_request_body)
self.assertEqual(json, val)
elif body:
self.assertEqual(body, last_request_body)
def assertQueryStringIs(self, qs=''):
"""Verify the QueryString matches what is expected.
The qs parameter should be of the format \'foo=bar&abc=xyz\'
"""
expected = urlparse.parse_qs(qs, keep_blank_values=True)
parts = urlparse.urlparse(self.requests.last_request.url)
querystring = urlparse.parse_qs(parts.query, keep_blank_values=True)
self.assertEqual(expected, querystring)
def assertQueryStringContains(self, **kwargs):
"""Verify the query string contains the expected parameters.
This method is used to verify that the query string for the most recent
request made contains all the parameters provided as ``kwargs``, and
that the value of each parameter contains the value for the kwarg. If
the value for the kwarg is an empty string (''), then all that's
verified is that the parameter is present.
"""
parts = urlparse.urlparse(self.requests.last_request.url)
qs = urlparse.parse_qs(parts.query, keep_blank_values=True)
for k, v in six.iteritems(kwargs):
self.assertIn(k, qs)
self.assertIn(v, qs[k])
def assertRequestHeaderEqual(self, name, val):
"""Verify that the last request made contains a header and its value
The request must have already been made.
"""
headers = self.requests.last_request.headers
self.assertEqual(headers.get(name), val)

View File

@ -0,0 +1,53 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from designateclient.tests import base
class CrudMixin(object):
path_prefix = None
def new_ref(self, **kwargs):
kwargs.setdefault('id', uuid.uuid4().hex)
return kwargs
def stub_entity(self, method, parts=None, entity=None, id=None, **kwargs):
if entity:
kwargs['json'] = entity
if not parts:
parts = [self.RESOURCE]
if self.path_prefix:
parts.insert(0, self.path_prefix)
if id:
if not parts:
parts = []
parts.append(id)
self.stub_url(method, parts=parts, **kwargs)
def assertList(self, expected, actual):
self.assertEqual(len(expected), len(actual))
for i in expected:
self.assertTrue(i in actual)
class APIV2TestCase(base.APITestCase):
VERSION = "2"

View File

@ -0,0 +1,88 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from designateclient.tests import v2
class TestBlacklists(v2.APIV2TestCase, v2.CrudMixin):
RESOURCE = 'blacklists'
def new_ref(self, **kwargs):
ref = super(TestBlacklists, self).new_ref(**kwargs)
ref.setdefault("pattern", uuid.uuid4().hex)
return ref
def test_create(self):
ref = self.new_ref()
self.stub_url("POST", parts=[self.RESOURCE], json=ref)
values = ref.copy()
del values["id"]
self.client.blacklists.create(**values)
self.assertRequestBodyIs(json=values)
def test_create_with_description(self):
ref = self.new_ref(description="My Blacklist")
self.stub_url("POST", parts=[self.RESOURCE], json=ref)
values = ref.copy()
del values["id"]
self.client.blacklists.create(**values)
self.assertRequestBodyIs(json=values)
def test_get(self):
ref = self.new_ref()
self.stub_entity("GET", entity=ref, id=ref["id"])
response = self.client.blacklists.get(ref["id"])
self.assertEqual(ref, response)
def test_list(self):
items = [
self.new_ref(),
self.new_ref()
]
self.stub_url("GET", parts=[self.RESOURCE], json={"blacklists": items})
listed = self.client.blacklists.list()
self.assertList(items, listed)
self.assertQueryStringIs("")
def test_update(self):
ref = self.new_ref()
self.stub_entity("PATCH", entity=ref, id=ref["id"])
values = ref.copy()
del values["id"]
self.client.blacklists.update(ref["id"], values)
self.assertRequestBodyIs(json=values)
def test_delete(self):
ref = self.new_ref()
self.stub_entity("DELETE", id=ref["id"])
self.client.blacklists.delete(ref["id"])
self.assertRequestBodyIs(None)

View File

@ -0,0 +1,25 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designateclient.tests import v2
class TestLimits(v2.APIV2TestCase, v2.CrudMixin):
def test_get(self):
ref = {"max_zones": "foo"}
self.stub_url("GET", parts=["limits"], json=ref)
limits = self.client.limits.get()
self.assertEqual(ref, limits)

View File

@ -0,0 +1,35 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mock import patch
from designateclient.tests import v2
from designateclient.v2 import zones
class TestLimits(v2.APIV2TestCase, v2.CrudMixin):
@patch.object(zones.ZoneController, "list")
def test_get(self, zones_get):
zones_get.return_value = [{"id": "foo"}]
ref = [{
"hostname": "ns1.example.com.",
"priority": 1
}]
parts = ["zones", "foo", "nameservers"]
self.stub_url("GET", parts=parts, json={"nameservers": ref})
response = self.client.nameservers.list("foo")
self.assertEqual(ref, response)

View File

@ -0,0 +1,237 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from mock import patch
import testtools
from designateclient import exceptions
from designateclient.tests import v2
from designateclient.v2 import zones
ZONE = {
"id": str(uuid.uuid4()),
"name": "example.com."
}
class TestRecordSets(v2.APIV2TestCase, v2.CrudMixin):
RESOURCE = 'recordsets'
def new_ref(self, **kwargs):
ref = super(TestRecordSets, self).new_ref(**kwargs)
ref.setdefault("name", uuid.uuid4().hex)
ref.setdefault("type", "A")
ref.setdefault("records", ["10.0.0.1"])
return ref
def test_create_absolute_with_zone_dict(self):
ref = self.new_ref()
parts = ["zones", ZONE["id"], self.RESOURCE]
self.stub_url("POST", parts=parts, json=ref)
values = ref.copy()
del values["id"]
self.client.recordsets.create(
ZONE,
"%s.%s" % (values["name"], ZONE["name"]),
values["type"],
values["records"])
values["name"] = "%s.%s" % (ref["name"], ZONE["name"])
self.assertRequestBodyIs(json=values)
@patch.object(zones.ZoneController, "get")
def test_create_absolute_with_zone_name(self, zone_get):
ref = self.new_ref()
zone_get.return_value = ZONE
parts = ["zones", ZONE["id"], self.RESOURCE]
self.stub_url("POST", parts=parts, json=ref)
values = ref.copy()
del values["id"]
self.client.recordsets.create(
ZONE["name"],
"%s.%s" % (values["name"], ZONE["name"]),
values["type"],
values["records"])
values["name"] = "%s.%s" % (ref["name"], ZONE["name"])
self.assertRequestBodyIs(json=values)
@patch.object(zones.ZoneController, "get")
def test_create_non_absolute_with_zone_name(self, zone_get):
ref = self.new_ref()
zone_get.return_value = ZONE
parts = ["zones", ZONE["id"], self.RESOURCE]
self.stub_url("POST", parts=parts, json=ref)
values = ref.copy()
del values["id"]
self.client.recordsets.create(
ZONE["name"],
values["name"],
values["type"],
values["records"])
values["name"] = "%s.%s" % (ref["name"], ZONE["name"])
self.assertRequestBodyIs(json=values)
@patch.object(zones.ZoneController, "list")
def test_create_non_absolute_with_zone_name_non_unique(self, zone_list):
zone_list.return_value = [
1,
2
]
ref = self.new_ref()
values = ref.copy()
del values["id"]
with testtools.ExpectedException(exceptions.NoUniqueMatch):
self.client.recordsets.create(
ZONE["name"],
"%s.%s" % (values["name"], ZONE["name"]),
values["type"],
values["records"])
def test_create_absolute_with_zone_id(self):
ref = self.new_ref()
parts = ["zones", ZONE["id"], self.RESOURCE]
self.stub_url("POST", parts=parts, json=ref)
values = ref.copy()
del values["id"]
self.client.recordsets.create(
ZONE["id"],
"%s.%s" % (values["name"], ZONE["name"]),
values["type"],
values["records"])
values["name"] = "%s.%s" % (ref["name"], ZONE["name"])
self.assertRequestBodyIs(json=values)
@patch.object(zones.ZoneController, "get")
def test_create_non_absolute_with_zone_id(self, zone_get):
ref = self.new_ref()
zone_get.return_value = ZONE
parts = ["zones", ZONE["id"], self.RESOURCE]
self.stub_url("POST", parts=parts, json=ref)
values = ref.copy()
del values["id"]
self.client.recordsets.create(
ZONE["id"],
values["name"],
values["type"],
values["records"])
values["name"] = "%s.%s" % (ref["name"], ZONE["name"])
self.assertRequestBodyIs(json=values)
def test_create_with_description(self):
ref = self.new_ref(description="Foo")
parts = ["zones", ZONE["id"], self.RESOURCE]
self.stub_url("POST", parts=parts, json=ref)
values = ref.copy()
del values["id"]
self.client.recordsets.create(
ZONE["id"],
"%s.%s" % (values["name"], ZONE["name"]),
values["type"],
values["records"],
description=values["description"])
values["name"] = "%s.%s" % (ref["name"], ZONE["name"])
self.assertRequestBodyIs(json=values)
def test_create_with_ttl(self):
ref = self.new_ref(ttl=60)
parts = ["zones", ZONE["id"], self.RESOURCE]
self.stub_url("POST", parts=parts, json=ref)
values = ref.copy()
del values["id"]
self.client.recordsets.create(
ZONE["id"],
"%s.%s" % (values["name"], ZONE["name"]),
values["type"],
values["records"],
ttl=values["ttl"])
values["name"] = "%s.%s" % (ref["name"], ZONE["name"])
self.assertRequestBodyIs(json=values)
def test_get(self):
ref = self.new_ref()
parts = ["zones", ZONE["id"], self.RESOURCE]
self.stub_entity("GET", entity=ref, id=ref["id"], parts=parts)
response = self.client.recordsets.get(ZONE["id"], ref["id"])
self.assertEqual(ref, response)
def test_list(self):
items = [
self.new_ref(),
self.new_ref()
]
parts = ["zones", ZONE["id"], self.RESOURCE]
self.stub_url("GET", parts=parts, json={"recordsets": items})
listed = self.client.recordsets.list(ZONE["id"])
self.assertList(items, listed)
self.assertQueryStringIs("")
def test_update(self):
ref = self.new_ref()
parts = ["zones", ZONE["id"], self.RESOURCE]
self.stub_entity("PUT", entity=ref, id=ref["id"], parts=parts)
values = ref.copy()
del values["id"]
self.client.recordsets.update(ZONE["id"], ref["id"], values)
self.assertRequestBodyIs(json=values)
def test_delete(self):
ref = self.new_ref()
parts = ["zones", ZONE["id"], self.RESOURCE]
self.stub_entity("DELETE", id=ref["id"], parts=parts)
self.client.recordsets.delete(ZONE["id"], ref["id"])
self.assertRequestBodyIs(None)

View File

@ -0,0 +1,62 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from designateclient.tests import v2
FIP_ID = '%s:%s' % (str(uuid.uuid4()), "RegionOne")
class TestFloatingIP(v2.APIV2TestCase, v2.CrudMixin):
def test_set(self):
name = "foo.com."
ref = {
"ptrdname": name,
"description": "foo"
}
parts = ["reverse", "floatingips", FIP_ID]
self.stub_url("PATCH", parts=parts, json=ref)
self.client.floatingips.set(FIP_ID, name, "foo")
def test_list(self):
ref = [
{"ptrdname": "foo.com."}
]
self.stub_url("GET", parts=["reverse", "floatingips"],
json={"floatingips": ref})
self.client.floatingips.list()
def test_get(self):
ref = {
"ptrdname": "foo.com."
}
parts = ["reverse", "floatingips", FIP_ID]
self.stub_url("GET", parts=parts, json=ref)
self.client.floatingips.get(FIP_ID)
def test_unset(self):
parts = ["reverse", "floatingips", FIP_ID]
self.stub_url("PATCH", parts=parts, json={"ptdrname": None})
self.client.floatingips.unset(FIP_ID)
self.assertRequestBodyIs(None)

View File

@ -0,0 +1,88 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from designateclient.tests import v2
class TestTlds(v2.APIV2TestCase, v2.CrudMixin):
RESOURCE = 'tlds'
def new_ref(self, **kwargs):
ref = super(TestTlds, self).new_ref(**kwargs)
ref.setdefault("name", uuid.uuid4().hex)
return ref
def test_create(self):
ref = self.new_ref()
self.stub_url("POST", parts=[self.RESOURCE], json=ref)
values = ref.copy()
del values["id"]
self.client.tlds.create(**values)
self.assertRequestBodyIs(json=values)
def test_create_with_description(self):
ref = self.new_ref(description="My TLD")
self.stub_url("POST", parts=[self.RESOURCE], json=ref)
values = ref.copy()
del values["id"]
self.client.tlds.create(**values)
self.assertRequestBodyIs(json=values)
def test_get(self):
ref = self.new_ref()
self.stub_entity("GET", entity=ref, id=ref["id"])
response = self.client.tlds.get(ref["id"])
self.assertEqual(ref, response)
def test_list(self):
items = [
self.new_ref(),
self.new_ref()
]
self.stub_url("GET", parts=[self.RESOURCE], json={"tlds": items})
listed = self.client.tlds.list()
self.assertList(items, listed)
self.assertQueryStringIs("")
def test_update(self):
ref = self.new_ref()
self.stub_entity("PATCH", entity=ref, id=ref["id"])
values = ref.copy()
del values["id"]
self.client.tlds.update(ref["id"], values)
self.assertRequestBodyIs(json=values)
def test_delete(self):
ref = self.new_ref()
self.stub_entity("DELETE", id=ref["id"])
self.client.tlds.delete(ref["id"])
self.assertRequestBodyIs(None)

View File

@ -0,0 +1,249 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from designateclient.tests import v2
class TestZones(v2.APIV2TestCase, v2.CrudMixin):
RESOURCE = 'zones'
def new_ref(self, **kwargs):
ref = super(TestZones, self).new_ref(**kwargs)
ref.setdefault("name", uuid.uuid4().hex)
ref.setdefault("type", "PRIMARY")
return ref
def test_create_with_description(self):
ref = self.new_ref(email="root@example.com", description="Foo")
self.stub_url("POST", parts=[self.RESOURCE], json=ref)
values = ref.copy()
del values["id"]
self.client.zones.create(
values["name"],
email=values["email"],
description=values["description"])
self.assertRequestBodyIs(json=values)
def test_create_primary(self):
ref = self.new_ref(email="root@example.com")
self.stub_url("POST", parts=[self.RESOURCE], json=ref)
values = ref.copy()
del values["id"]
self.client.zones.create(
values["name"],
email=values["email"])
self.assertRequestBodyIs(json=values)
def test_create_primary_with_ttl(self):
ref = self.new_ref(email="root@example.com", ttl=60)
self.stub_url("POST", parts=[self.RESOURCE], json=ref)
values = ref.copy()
del values["id"]
self.client.zones.create(
values["name"],
email=values["email"],
ttl=values["ttl"])
self.assertRequestBodyIs(json=values)
def test_create_secondary(self):
ref = self.new_ref(type="SECONDARY", masters=["10.0.0.1"])
self.stub_url("POST", parts=[self.RESOURCE], json=ref)
values = ref.copy()
del values["id"]
self.client.zones.create(
values["name"],
type_=values["type"],
masters=values["masters"])
self.assertRequestBodyIs(json=values)
def test_get(self):
ref = self.new_ref()
self.stub_entity("GET", entity=ref, id=ref["id"])
response = self.client.zones.get(ref["id"])
self.assertEqual(ref, response)
def test_list(self):
items = [
self.new_ref(),
self.new_ref()
]
self.stub_url("GET", parts=[self.RESOURCE], json={"zones": items})
listed = self.client.zones.list()
self.assertList(items, listed)
self.assertQueryStringIs("")
def test_update(self):
ref = self.new_ref()
self.stub_entity("PATCH", entity=ref, id=ref["id"])
values = ref.copy()
del values["id"]
self.client.zones.update(ref["id"], values)
self.assertRequestBodyIs(json=values)
def test_delete(self):
ref = self.new_ref()
self.stub_entity("DELETE", id=ref["id"])
self.client.zones.delete(ref["id"])
self.assertRequestBodyIs(None)
def test_task_abandon(self):
ref = self.new_ref()
parts = [self.RESOURCE, ref["id"], "tasks", "abandon"]
self.stub_url("POST", parts=parts)
self.client.zones.abandon(ref["id"])
self.assertRequestBodyIs(None)
def test_task_axfr(self):
ref = self.new_ref()
parts = [self.RESOURCE, ref["id"], "tasks", "axfr"]
self.stub_url("POST", parts=parts)
self.client.zones.axfr(ref["id"])
self.assertRequestBodyIs(None)
class TestZoneTransfers(v2.APIV2TestCase, v2.CrudMixin):
def test_create_request(self):
zone = "098bee04-fe30-4a83-8ccd-e0c496755816"
project = "123"
ref = {
"target_project_id": project
}
parts = ["zones", zone, "tasks", "transfer_requests"]
self.stub_url('POST', parts=parts, json=ref)
self.client.zone_transfers.create_request(zone, project)
self.assertRequestBodyIs(json=ref)
def test_create_request_with_description(self):
zone = "098bee04-fe30-4a83-8ccd-e0c496755816"
project = "123"
ref = {
"target_project_id": project,
"description": "My Foo"
}
parts = ["zones", zone, "tasks", "transfer_requests"]
self.stub_url('POST', parts=parts, json=ref)
self.client.zone_transfers.create_request(
zone, project, ref["description"])
self.assertRequestBodyIs(json=ref)
def test_get_request(self):
transfer = "098bee04-fe30-4a83-8ccd-e0c496755816"
project = "098bee04-fe30-4a83-8ccd-e0c496755817"
ref = {
"target_project_id": project
}
parts = ["zones", "tasks", "transfer_requests", transfer]
self.stub_url('GET', parts=parts, json=ref)
self.client.zone_transfers.get_request(transfer)
self.assertRequestBodyIs("")
def test_list_request(self):
project = "098bee04-fe30-4a83-8ccd-e0c496755817"
ref = [{
"target_project_id": project
}]
parts = ["zones", "tasks", "transfer_requests"]
self.stub_url('GET', parts=parts, json={"transfer_requests": ref})
self.client.zone_transfers.list_requests()
self.assertRequestBodyIs("")
def test_update_request(self):
transfer = "098bee04-fe30-4a83-8ccd-e0c496755816"
project = "098bee04-fe30-4a83-8ccd-e0c496755817"
ref = {
"target_project_id": project
}
parts = ["zones", "tasks", "transfer_requests", transfer]
self.stub_url('PATCH', parts=parts, json=ref)
self.client.zone_transfers.update_request(transfer, ref)
self.assertRequestBodyIs(json=ref)
def test_delete_request(self):
transfer = "098bee04-fe30-4a83-8ccd-e0c496755816"
parts = ["zones", "tasks", "transfer_requests", transfer]
self.stub_url('DELETE', parts=parts)
self.client.zone_transfers.delete_request(transfer)
self.assertRequestBodyIs("")
def test_accept_request(self):
transfer = "098bee04-fe30-4a83-8ccd-e0c496755816"
key = "foo123"
ref = {
"status": "COMPLETE"
}
parts = ["zones", "tasks", "transfer_accepts"]
self.stub_url('POST', parts=parts, json=ref)
request = {
"key": key,
"zone_transfer_request_id": transfer
}
self.client.zone_transfers.accept_request(transfer, key)
self.assertRequestBodyIs(json=request)
def test_get_accept(self):
accept_id = "098bee04-fe30-4a83-8ccd-e0c496755816"
ref = {
"status": "COMPLETE"
}
parts = ["zones", "tasks", "transfer_accepts", accept_id]
self.stub_url('GET', parts=parts, json=ref)
response = self.client.zone_transfers.get_accept(accept_id)
self.assertEqual(ref, response)

View File

View File

@ -0,0 +1,48 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designateclient import client
class BlacklistController(client.Controller):
def create(self, pattern, description=None):
data = {
'pattern': pattern,
}
if description is not None:
data['description'] = description
return self._post('/blacklists', data=data)
def list(self, criterion=None, marker=None, limit=None):
url = self.build_url('/blacklists', criterion, marker, limit)
return self._get(url, response_key="blacklists")
def get(self, blacklist_id):
url = '/blacklists/%s' % blacklist_id
return self._get(url)
def update(self, blacklist_id, values):
url = '/blacklists/%s' % blacklist_id
return self._patch(url, data=values)
def delete(self, blacklist_id):
url = '/blacklists/%s' % blacklist_id
return self._delete(url)

View File

@ -0,0 +1,80 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystoneclient import adapter
from designateclient import exceptions
from designateclient.v2.blacklists import BlacklistController
from designateclient.v2.limits import LimitController
from designateclient.v2.nameservers import NameServerController
from designateclient.v2.recordsets import RecordSetController
from designateclient.v2.reverse import FloatingIPController
from designateclient.v2.tlds import TLDController
from designateclient.v2.zones import ZoneController
from designateclient.v2.zones import ZoneTransfersController
from designateclient import version
class DesignateAdapter(adapter.LegacyJsonAdapter):
"""
Adapter around LegacyJsonAdapter.
"""
def request(self, *args, **kwargs):
kwargs.setdefault('raise_exc', False)
kwargs.setdefault('headers', {}).setdefault(
'Content-Type', 'application/json')
response, body = super(DesignateAdapter, self).request(*args, **kwargs)
# Decode is response, if possible
try:
response_payload = response.json()
except ValueError:
response_payload = {}
if response.status_code == 400:
raise exceptions.BadRequest(**response_payload)
elif response.status_code in (401, 403):
raise exceptions.Forbidden(**response_payload)
elif response.status_code == 404:
raise exceptions.NotFound(**response_payload)
elif response.status_code == 409:
raise exceptions.Conflict(**response_payload)
elif response.status_code >= 500:
raise exceptions.Unknown(**response_payload)
return response, body
class Client(object):
def __init__(self, region_name=None, endpoint_type='publicURL',
extensions=None, service_type='dns', service_name=None,
http_log_debug=False, session=None, auth=None):
self.session = DesignateAdapter(
session,
auth=auth,
region_name=region_name,
service_type=service_type,
interface=endpoint_type.rstrip('URL'),
user_agent='python-designateclient-%s' % version.version_info,
version=('2'))
self.blacklists = BlacklistController(self)
self.floatingips = FloatingIPController(self)
self.limits = LimitController(self)
self.nameservers = NameServerController(self)
self.recordsets = RecordSetController(self)
self.tlds = TLDController(self)
self.zones = ZoneController(self)
self.zone_transfers = ZoneTransfersController(self)

View File

@ -0,0 +1,21 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designateclient import client
class LimitController(client.Controller):
def get(self):
return self._get('/limits')

View File

@ -0,0 +1,26 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designateclient import client
from designateclient.v2 import utils as v2_utils
class NameServerController(client.Controller):
def list(self, zone):
zone = v2_utils.resolve_by_name(self.client.zones.list, zone)
url = '/zones/%s/nameservers' % zone
return self._get(url, response_key='nameservers')

View File

@ -0,0 +1,98 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
from designateclient import client
from designateclient.v2 import utils as v2_utils
class RecordSetController(client.Controller):
def _canonicalize_record_name(self, zone, name):
zone_info = None
# If we get a zone name we'll need to get the ID of it before POST.
if isinstance(zone, basestring) and not uuidutils.is_uuid_like(zone):
zone_info = self.client.zones.get(zone)
elif isinstance(zone, dict):
zone_info = zone
# We where given a name like "www" vs www.i.io., attempt to fix it on
# the behalf of the actor.
if not name.endswith("."):
if not isinstance(zone_info, dict):
zone_info = self.client.zones.get(zone)
name = "%s.%s" % (name, zone_info["name"])
return name, zone_info
def create(self, zone, name, type_, records, description=None,
ttl=None):
name, zone_info = self._canonicalize_record_name(zone, name)
data = {
'name': name,
'type': type_,
'records': records
}
if ttl is not None:
data['ttl'] = ttl
if description is not None:
data['description'] = description
if zone_info is not None:
zone_id = zone_info["id"]
else:
zone_id = zone
url = '/zones/%s/recordsets' % zone_id
return self._post(url, data=data)
def list(self, zone, criterion=None, marker=None, limit=None):
zone = v2_utils.resolve_by_name(self.client.zones.list, zone)
url = self.build_url(
'/zones/%s/recordsets' % zone,
criterion, marker, limit)
return self._get(url, response_key='recordsets')
def get(self, zone, recordset):
zone = v2_utils.resolve_by_name(self.client.zones.list, zone)
recordset = v2_utils.resolve_by_name(self.list, recordset, zone)
url = self.build_url('/zones/%s/recordsets/%s' % (
zone, recordset))
return self._get(url)
def update(self, zone, recordset, values):
zone = v2_utils.resolve_by_name(self.client.zones.list, zone)
recordset = v2_utils.resolve_by_name(self.list, recordset, zone)
url = '/zones/%s/recordsets/%s' % (zone, recordset)
return self._put(url, data=values)
def delete(self, zone, recordset):
zone = v2_utils.resolve_by_name(self.client.zones.list, zone)
recordset = v2_utils.resolve_by_name(self.list, recordset, zone)
url = '/zones/%s/recordsets/%s' % (zone, recordset)
return self._delete(url)

View File

@ -0,0 +1,49 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designateclient import client
class FloatingIPController(client.Controller):
def set(self, floatingip_id, ptrdname, description=None, ttl=None):
data = {
'ptrdname': ptrdname
}
if description is not None:
data["description"] = description
if ttl is not None:
data["ttl"] = ttl
url = '/reverse/floatingips/%s' % floatingip_id
return self._patch(url, data=data)
def list(self):
url = '/reverse/floatingips'
return self._get(url, response_key='floatingips')
def get(self, floatingip_id):
url = '/reverse/floatingips/%s' % floatingip_id
return self._get(url)
def unset(self, floatingip_id):
data = {"ptrdname": None}
url = '/reverse/floatingips/%s' % floatingip_id
return self._patch(url, data=data)

View File

@ -0,0 +1,48 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designateclient import client
class TLDController(client.Controller):
def create(self, name, description=None):
data = {
'name': name,
}
if description is not None:
data["description"] = description
return self._post('/tlds', data=data)
def list(self, criterion=None, marker=None, limit=None):
url = self.build_url('/tlds', criterion, marker, limit)
return self._get(url, response_key='tlds')
def get(self, tld_id):
url = '/tlds/%s' % tld_id
return self._get(url)
def update(self, tld_id, values):
url = '/tlds/%s' % tld_id
return self._patch(url, data=values)
def delete(self, tld_id):
url = '/tlds/%s' % tld_id
return self._delete(url)

View File

@ -0,0 +1,38 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
from designateclient import exceptions
def resolve_by_name(func, name, *args):
"""
Helper to resolve a "name" a'la foo.com to it's ID by using REST api's
query support and filtering on name.
"""
if uuidutils.is_uuid_like(name):
return name
results = func(criterion={"name": "*%s*" % name}, *args)
length = len(results)
if length == 1:
return results[0]["id"]
elif length == 0:
raise exceptions.NotFound("Name %s didn't resolve" % name)
else:
msg = "Multiple matches found for %s, please use ID instead." % name
raise exceptions.NoUniqueMatch(msg)

125
designateclient/v2/zones.py Normal file
View File

@ -0,0 +1,125 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designateclient import client
from designateclient.v2 import utils as v2_utils
class ZoneController(client.Controller):
def create(self, name, type_=None, email=None, description=None, ttl=None,
masters=None):
type_ = type_ or "PRIMARY"
data = {
"name": name,
"type": type_
}
if type_ == "PRIMARY":
data["email"] = email
if ttl is not None:
data["ttl"] = ttl
elif type_ == "SECONDARY":
data["masters"] = masters
if description is not None:
data["description"] = description
return self._post('/zones', data=data)
def list(self, criterion=None, marker=None, limit=None):
url = self.build_url('/zones', criterion, marker, limit)
return self._get(url, response_key="zones")
def get(self, zone):
zone = v2_utils.resolve_by_name(self.list, zone)
return self._get('/zones/%s' % zone)
def update(self, zone, values):
zone = v2_utils.resolve_by_name(self.list, zone)
url = self.build_url('/zones/%s' % zone)
return self._patch(url, data=values)
def delete(self, zone):
zone = v2_utils.resolve_by_name(self.list, zone)
url = self.build_url('/zones/%s' % zone)
return self._delete(url)
def abandon(self, zone):
zone = v2_utils.resolve_by_name(self.list, zone)
url = '/zones/%s/tasks/abandon' % zone
self.client.session.post(url)
def axfr(self, zone):
zone = v2_utils.resolve_by_name(self.list, zone)
url = '/zones/%s/tasks/axfr' % zone
self.client.session.post(url)
class ZoneTransfersController(client.Controller):
def create_request(self, zone, target_project_id, description=None):
zone = v2_utils.resolve_by_name(self.client.zones.list, zone)
data = {
"target_project_id": target_project_id
}
if description is not None:
data["description"] = description
url = '/zones/%s/tasks/transfer_requests' % zone
return self._post(url, data=data)
def get_request(self, transfer_id):
url = '/zones/tasks/transfer_requests/%s' % transfer_id
return self._get(url)
def list_requests(self):
url = '/zones/tasks/transfer_requests'
return self._get(url, response_key="transfer_requests")
def update_request(self, transfer_id, values):
url = '/zones/tasks/transfer_requests/%s' % transfer_id
return self._patch(url, data=values)
def delete_request(self, transfer_id):
url = '/zones/tasks/transfer_requests/%s' % transfer_id
self._delete(url)
def accept_request(self, transfer_id, key):
url = '/zones/tasks/transfer_accepts'
data = {
"key": key,
"zone_transfer_request_id": transfer_id
}
return self._post(url, data=data)
def get_accept(self, accept_id):
url = '/zones/tasks/transfer_accepts/%s' % accept_id
return self._get(url)

View File

@ -0,0 +1,44 @@
import logging
from designateclient.v2 import client
from designateclient import exceptions
from designateclient import shell
from keystoneclient.auth.identity import generic
from keystoneclient import session as keystone_session
logging.basicConfig(level='DEBUG')
"""
Example script to create or get a domain and add some records to it.
"""
auth = generic.Password(
auth_url=shell.env('OS_AUTH_URL'),
username=shell.env('OS_USERNAME'),
password=shell.env('OS_PASSWORD'),
tenant_name=shell.env('OS_TENANT_NAME'))
session = keystone_session.Session(auth=auth)
client = client.Client(session=session)
try:
zone = client.zones.create('i.io.', email='i@i.io')
except exceptions.RemoteError:
zone = dict([(z['name'], z) for z in client.zones.list()])['i.io.']
print("Recordset list...")
for rs in client.recordsets.list(zone['id']):
print rs
# Here's an example of just passing "www" as the record name vs "www.i.io."
records = ["10.0.0.1"]
rs = client.recordsets.create(zone['id'], 'www', 'A', records)
# Here we're replacing the records with new ones
records = ["10.0.0.1", "10.0.0.5"]
client.recordsets.update(zone['id'], rs['id'], {'records': records})

View File

@ -0,0 +1,72 @@
import logging
from designateclient.v2 import client
from designateclient import exceptions
from designateclient import shell
from keystoneclient.auth.identity import generic
from keystoneclient import session as keystone_session
logging.basicConfig(level='DEBUG')
auth = generic.Password(
auth_url=shell.env('OS_AUTH_URL'),
username=shell.env('OS_USERNAME'),
password=shell.env('OS_PASSWORD'),
tenant_name=shell.env('OS_TENANT_NAME'))
session = keystone_session.Session(auth=auth)
client = client.Client(session=session)
try:
zone = dict([(z['name'], z) for z in client.zones.list()])['i.io.']
client.zones.delete(zone['id'])
except exceptions.NotFound:
pass
zone = client.zones.create(name='i.io.', email='i@i.io')
# Clean all recordsets first in this zone (for sanity sake)
for rrset in client.recordsets.list(zone['id']):
if rrset['type'] in ('NS', 'SOA'):
continue
client.recordsets.delete(zone['id'], rrset['id'])
# Make some A records
www = client.recordsets.create(
zone['id'],
'www.%s' % zone['name'],
'A',
['10.0.0.1'])
values = {
'records': ['10.0.1.1', '10.0.0.2']
}
client.recordsets.update(zone['id'], www['id'], values)
cname = client.recordsets.create(
zone['id'],
'my-site.%s' % zone['name'],
'CNAME',
[www['name']])
# Now let's do some Mailserver examples
# First create the A record
mail1 = client.recordsets.create(
zone['id'], 'mail1.' + zone['name'], 'A', ["10.0.0.11"])
mail2 = client.recordsets.create(
zone['id'], 'mail2.' + zone['name'], 'A', ["10.0.0.12"])
# Create the MX records - it will be 1 recordset with multiple records pointing
# to the A records we created above
mx_rrset = client.recordsets.create(
zone['id'], zone['name'], 'MX',
['0 ' + mail1['name'], '5 ' + mail2['name']])
print(zone['id'])

View File

@ -0,0 +1,29 @@
import logging
from designateclient import exceptions
from designateclient import shell
from designateclient.v2 import client
from keystoneclient.auth.identity import generic
from keystoneclient import session as keystone_session
logging.basicConfig(level='DEBUG')
auth = generic.Password(
auth_url=shell.env('OS_AUTH_URL'),
username=shell.env('OS_USERNAME'),
password=shell.env('OS_PASSWORD'),
tenant_name=shell.env('OS_TENANT_NAME'))
session = keystone_session.Session(auth=auth)
client = client.Client(session=session)
try:
zone = client.zones.create('i.io.', email='i@i.io')
except exceptions.RemoteError:
zone = dict([(z['name'], z) for z in client.zones.list()])['i.io.']
print client.recordsets.list(zone['id'])

View File

@ -0,0 +1,43 @@
import logging
import uuid
from keystoneclient.auth.identity import generic
from keystoneclient import session as keystone_session
from designateclient import exceptions
from designateclient import shell
from designateclient.v2 import client
logging.basicConfig(level='DEBUG')
auth = generic.Password(
auth_url=shell.env('OS_AUTH_URL'),
username=shell.env('OS_USERNAME'),
password=shell.env('OS_PASSWORD'),
tenant_name=shell.env('OS_TENANT_NAME'))
session = keystone_session.Session(auth=auth)
client = client.Client(session=session)
# Primary Zone
primary = client.zones.create(
'primary-%s.io.' % str(uuid.uuid4()),
'PRIMARY',
'root@x.com')
# Secondary Zone
slave = client.zones.create(
'secondary-%s.io.' % str(uuid.uuid4()),
'SECONDARY',
masters=["127.0.1.1"])
# Try updating Masters for the Secondary
new_slave = client.zones.update(
slave['id'],
{"masters": ["10.0.0.1", "10.0.0.10"]}
)
# List all Zones
zones = client.zones.list()

View File

@ -0,0 +1,29 @@
import logging
import uuid
from designateclient.v2 import client
from designateclient import shell
from designateclient import utils
from keystoneclient.auth.identity import generic
from keystoneclient import session as keystone_session
logging.basicConfig(level='DEBUG')
auth = generic.Password(
auth_url=shell.env('OS_AUTH_URL'),
username=shell.env('OS_USERNAME'),
password=shell.env('OS_PASSWORD'),
tenant_name=shell.env('OS_TENANT_NAME'))
session = keystone_session.Session(auth=auth)
client = client.Client(session=session)
zone = client.zones.create(
'primary-%s.io.' % str(uuid.uuid4()),
'PRIMARY',
'root@x.com')
client.nameservers.list(zone['id'])

View File

@ -0,0 +1,36 @@
import logging
from keystoneclient.auth.identity import generic
from keystoneclient import session as keystone_session
from designateclient import shell
from designateclient.v2 import client
logging.basicConfig(level='DEBUG')
auth = generic.Password(
auth_url=shell.env('OS_AUTH_URL'),
username=shell.env('OS_USERNAME'),
password=shell.env('OS_PASSWORD'),
tenant_name=shell.env('OS_TENANT_NAME'))
session = keystone_session.Session(auth=auth)
client = client.Client(session=session)
pages = []
fetch = 1
while fetch:
kw = {'limit': 3}
if pages:
# marker is the latest page with the last item.
kw['marker'] = pages[-1][-1]['id']
page = client.zones.list(**kw)
if not page:
break
pages.append(page)
for page in pages:
print page

View File

@ -3,6 +3,7 @@
# process, which may cause wedges in the gate later.
cliff>=1.10.0 # Apache-2.0
jsonschema>=2.0.0,<3.0.0
oslo.utils>=1.4.0 # Apache-2.0
pbr>=0.6,!=0.7,<1.0
python-keystoneclient>=1.1.0
requests>=2.2.0,!=2.4.0

View File

@ -79,6 +79,10 @@ designateclient.cli =
quota-update = designateclient.cli.quotas:UpdateQuotaCommand
quota-reset = designateclient.cli.quotas:ResetQuotaCommand
designateclient.versions =
1 = designateclient.v1:Client
2 = designateclient.v2.client:Client
[build_sphinx]
all_files = 1
build-dir = doc/build

View File

@ -5,7 +5,9 @@
hacking>=0.9.2,<0.10
coverage>=3.6
discover
oslotest>=1.5.1
python-subunit>=0.0.18
requests-mock>=0.6.0 # Apache-2.0
sphinx>=1.1.2,!=1.2.0,!=1.3b1,<1.3
testrepository>=0.0.18
# Needed for the incubation theme on oslosphinx