Merge "distil-collector concurrency support"

This commit is contained in:
Jenkins 2017-02-26 22:01:08 +00:00 committed by Gerrit Code Review
commit 94107539ff
13 changed files with 257 additions and 23 deletions

View File

@ -13,6 +13,7 @@
# under the License.
import abc
from datetime import timedelta
import re
import yaml
@ -64,11 +65,16 @@ class BaseCollector(object):
resources = {}
usage_entries = []
# NOTE(kong): Set 10min as leadin time when getting samples, this
# helps us to get correct instance uptime, in case the instance
# status in first sample is not in our tracked status list.
actual_start = window_start - timedelta(minutes=10)
try:
for mapping in self.meter_mappings:
# Invoke get_meter function of specific collector.
usage = self.get_meter(project['id'], mapping['meter'],
window_start, window_end)
actual_start, window_end)
usage_by_resource = {}
self._filter_and_group(usage, usage_by_resource)
@ -125,7 +131,9 @@ class BaseCollector(object):
os_distro = image_meta.get('os_distro', 'unknown')
else:
# Boot from image
image_id = entry['metadata']['image.id']
image_id = entry['metadata'].get('image.id', None)
image_id = image_id or entry['metadata'].get('image_ref', None)
os_distro = getattr(
openstack.get_image(image_id),
'os_distro',
@ -180,6 +188,11 @@ class BaseCollector(object):
service, entries, window_start, window_end
)
LOG.debug(
'After transformation, usage for resource %s: %s' %
(res_id, transformed)
)
if transformed:
res_id = mapping.get('res_id_template', '%s') % res_id
res_info = self._get_resource_info(
@ -192,7 +205,6 @@ class BaseCollector(object):
res = resources.setdefault(res_id, res_info)
res.update(res_info)
LOG.debug('resource info: %s', res)
for service, volume in transformed.items():
entry = {
@ -205,4 +217,3 @@ class BaseCollector(object):
'tenant_id': project_id
}
usage_entries.append(entry)
LOG.debug('new entry: %s', entry)

View File

@ -17,6 +17,7 @@ from datetime import timedelta
from decimal import Decimal
import functools
import math
import socket
import warnings
import yaml
@ -101,3 +102,8 @@ def convert_to(value, from_unit, to_unit):
if from_unit == to_unit:
return value
return conversions[from_unit][to_unit](value)
def get_process_identifier():
"""Gets current running process identifier."""
return "%s_%s" % (socket.gethostname(), CONF.collector.partitioning_suffix)

View File

@ -16,6 +16,7 @@ from keystoneauth1 import loading as ka_loading
from oslo_cache import core as cache
from oslo_config import cfg
from oslo_log import log
from oslo_utils import uuidutils
from distil import version
@ -65,6 +66,9 @@ COLLECTOR_OPTS = [
help=('The list of resources that handled by collector.')),
cfg.StrOpt('dawn_of_time', default='2014-04-01 00:00:00',
help=('The earlist starting time for new tenant.')),
cfg.StrOpt('partitioning_suffix',
help=('Collector partitioning group suffix. It is used when '
'running multiple collectors in favor of lock.'))
]
ODOO_OPTS = [

View File

@ -31,6 +31,7 @@ interface.
`sqlite:///var/lib/distil/distil.sqlite`.
"""
import contextlib
from oslo_config import cfg
from oslo_db import api as db_api
@ -119,3 +120,27 @@ def project_get(project_id):
def project_get_all():
return IMPL.project_get_all()
# Project Locks.
def create_project_lock(project_id, owner):
return IMPL.create_project_lock(project_id, owner)
def get_project_locks(project_id):
return IMPL.get_project_locks(project_id)
def ensure_project_lock(project_id, owner):
return IMPL.ensure_project_lock(project_id, owner)
def delete_project_lock(project_id):
return IMPL.delete_project_lock(project_id)
@contextlib.contextmanager
def project_lock(project_id, owner):
with IMPL.project_lock(project_id, owner):
yield

View File

@ -0,0 +1,39 @@
# Copyright 2017 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""add-project-lock-table
Revision ID: 002
Revises: 001
Create Date: 2017-01-11 11:45:05.107888
"""
# revision identifiers, used by Alembic.
revision = '002'
down_revision = '001'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'project_locks',
sa.Column('created', sa.DateTime(), nullable=False),
sa.Column('project_id', sa.String(length=100), nullable=False),
sa.Column('owner', sa.String(length=100), nullable=False),
sa.PrimaryKeyConstraint('project_id'),
)

View File

@ -15,21 +15,23 @@
"""Implementation of SQLAlchemy backend."""
import contextlib
from datetime import datetime
import json
import six
import sys
import sqlalchemy as sa
from sqlalchemy import func
import threading
from oslo_config import cfg
from oslo_db import exception as db_exception
from oslo_db.sqlalchemy import session as db_session
from oslo_log import log as logging
from retrying import retry
import six
import sqlalchemy as sa
from sqlalchemy import func
from distil.db.sqlalchemy import models as m
from distil.db.sqlalchemy.models import ProjectLock
from distil.db.sqlalchemy.models import Resource
from distil.db.sqlalchemy.models import Tenant
from distil.db.sqlalchemy.models import UsageEntry
@ -76,7 +78,7 @@ def get_backend():
def setup_db():
try:
engine = get_engine()
m.Cluster.metadata.create_all(engine)
m.Tenant.metadata.create_all(engine)
except sa.exc.OperationalError as e:
LOG.exception("Database registration exception: %s", e)
return False
@ -226,8 +228,9 @@ def usages_add(project_id, resources, usage_entries, last_collect):
for (id, res_info) in six.iteritems(resources):
res_db = _get_resource(session, project_id, id)
if res_db:
orig_info = json.loads(res_db.info)
res_db.info = json.dumps(orig_info.update(res_info))
orig_info = json.loads(res_db.info) or {}
orig_info.update(res_info)
res_db.info = json.dumps(orig_info)
else:
resource_ref = Resource(
id=id,
@ -306,3 +309,82 @@ def _merge_resource_metadata(md_dict, entry, md_def):
pass
return md_dict
def get_project_locks(project_id):
session = get_session()
query = session.query(ProjectLock)
query = query.filter(ProjectLock.project_id == project_id)
try:
return query.all()
except Exception as e:
raise exceptions.DBException(
"Failed when querying database, error type: %s, "
"error message: %s" % (e.__class__.__name__, str(e))
)
@retry(stop_max_attempt_number=3, wait_fixed=5000)
def create_project_lock(project_id, owner):
"""Creates project lock record.
This method has to work without SQLAlchemy session because session may not
immediately issue an SQL query to a database and instead just schedule it
whereas we need to make sure to issue a operation immediately.
If there are more than 2 transactions trying to get same project lock
(although with little chance), after the first one's commit, only one of
the others will succeed to continue, all others will fail with exception.
Using retry mechanism here to avoid that happen.
"""
session = get_session()
session.flush()
insert = ProjectLock.__table__.insert()
session.execute(insert.values(project_id=project_id, owner=owner,
created=datetime.utcnow()))
session.flush()
def ensure_project_lock(project_id, owner):
"""Make sure project lock record exists."""
session = get_session()
query = session.query(ProjectLock)
query = query.filter(ProjectLock.project_id == project_id,
ProjectLock.owner == owner)
# If there is already lock existing for the process, do not recreate. This
# helps the process continue with the project it was handling before it was
# killed.
if not query.all():
create_project_lock(project_id, owner)
def delete_project_lock(project_id):
"""Deletes project lock record.
This method has to work without SQLAlchemy session because session may not
immediately issue an SQL query to a database and instead just schedule it
whereas we need to make sure to issue a operation immediately.
"""
session = get_session()
session.flush()
table = ProjectLock.__table__
delete = table.delete()
session.execute(delete.where(table.c.project_id == project_id))
session.flush()
@contextlib.contextmanager
def project_lock(project_id, owner):
try:
ensure_project_lock(project_id, owner)
yield
finally:
delete_project_lock(project_id)

View File

@ -119,3 +119,11 @@ class SalesOrder(DistilBase):
@hybrid_method
def intersects(self, other):
return (self.start <= other.end and other.start <= self.end)
class ProjectLock(DistilBase):
__tablename__ = 'project_locks'
project_id = Column(String(100), primary_key=True, nullable=False)
owner = Column(String(100), nullable=False)
created = Column(DateTime, nullable=False)

View File

@ -22,6 +22,7 @@ from stevedore import driver
from distil.db import api as db_api
from distil import exceptions
from distil.common import general
from distil.common import openstack
LOG = logging.getLogger(__name__)
@ -53,6 +54,8 @@ class CollectorService(service.Service):
self.validate_config()
self.identifier = general.get_process_identifier()
collector_args = {}
self.collector = driver.DriverManager(
'distil.collector',
@ -93,15 +96,36 @@ class CollectorService(service.Service):
logging.setup(CONF, 'distil-collector')
def collect_usage(self):
LOG.info("Begin to collect usage for all projects...")
LOG.info("Starting to collect usage...")
projects = filter_projects(openstack.get_projects())
end = datetime.utcnow().replace(minute=0, second=0, microsecond=0)
count = 0
for project in projects:
# Add a project or get last_collected of existing project.
db_project = db_api.project_add(project)
start = db_project.last_collected
# Check if the project is being processed by other collector
# instance. If no, will get a lock and continue processing,
# otherwise just skip it.
locks = db_api.get_project_locks(project['id'])
self.collector.collect_usage(project, start, end)
if locks and locks[0].owner != self.identifier:
LOG.debug(
"Project %s is being processed by collector %s." %
(project['id'], locks[0].owner)
)
continue
try:
with db_api.project_lock(project['id'], self.identifier):
# Add a project or get last_collected of existing project.
db_project = db_api.project_add(project)
start = db_project.last_collected
self.collector.collect_usage(project, start, end)
count = count + 1
except Exception:
LOG.warning('Get lock failed. Process: %s' % self.identifier)
LOG.info("Finished collecting usage for %s projects." % count)

View File

@ -22,6 +22,7 @@ from oslo_log import log
from distil import context
from distil import config
from distil.db import api as db_api
class DistilTestCase(base.BaseTestCase):
@ -34,7 +35,7 @@ class DistilTestCase(base.BaseTestCase):
if self.config_file:
self.conf = self.load_conf(self.config_file)
else:
self.conf = cfg.ConfigOpts()
self.conf = cfg.CONF
self.conf.register_opts(config.DEFAULT_OPTIONS)
self.conf.register_opts(config.ODOO_OPTS, group=config.ODOO_GROUP)
@ -72,12 +73,12 @@ class DistilTestCase(base.BaseTestCase):
:returns: Project's config object.
"""
conf = cfg.ConfigOpts()
conf = cfg.CONF
log.register_options(conf)
conf(args=[], default_config_files=[cls.conf_path(filename)])
return conf
def config(self, group=None, **kw):
def override_config(self, group=None, **kw):
"""Override some configuration values.
The keyword arguments are the names of configuration options to
@ -100,6 +101,6 @@ class DistilWithDbTestCase(DistilTestCase):
def setUp(self):
super(DistilWithDbTestCase, self).setUp()
self.override_config('connection', "sqlite://", group='database')
self.conf.set_default('connection', 'sqlite://', group='database')
db_api.setup_db()
self.addCleanup(db_api.drop_db)

View File

View File

@ -0,0 +1,29 @@
# Copyright (C) 2017 Catalyst IT Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from distil.db.sqlalchemy import api as db_api
from distil.tests.unit import base
class ProjectLockTest(base.DistilWithDbTestCase):
def test_with_project_lock(self):
project_id = 'fake_project_id'
owner = 'fake_owner'
with db_api.project_lock(project_id, owner):
# Make sure that within 'with' section the lock record exists.
self.assertEqual(1, len(db_api.get_project_locks(project_id)))
# Make sure that outside 'with' section the lock record does not exist.
self.assertEqual(0, len(db_api.get_project_locks(project_id)))

View File

@ -11,6 +11,7 @@ odoorpc==0.4.2
SQLAlchemy<1.1.0,>=1.0.10 # MIT
keystonemiddleware!=4.1.0,>=4.0.0 # Apache-2.0
keystoneauth1>=2.1.0 # Apache-2.0
retrying>=1.2.3,!=1.3.0 # Apache-2.0
python-cinderclient>=1.6.0 # Apache-2.0
python-keystoneclient!=1.8.0,!=2.1.0,>=1.6.0 # Apache-2.0

10
tox.ini
View File

@ -11,10 +11,14 @@ setenv =
DISTIL_TESTS_CONFIGS_DIR={toxinidir}/distil/tests/etc/
DISCOVER_DIRECTORY=distil/tests/unit
deps =
-r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = python setup.py testr --slowest --testr-args="{posargs}"
whitelist_externals = bash
commands =
find . -type f -name "*.pyc" -delete
python setup.py testr --slowest --testr-args="{posargs}"
whitelist_externals =
bash
rm
find
[testenv:py33]
deps = -r{toxinidir}/requirements-py3.txt