diff --git a/bin/ceilometer-agent-central b/bin/ceilometer-agent-central index 67cb63048..a4cfcba41 100755 --- a/bin/ceilometer-agent-central +++ b/bin/ceilometer-agent-central @@ -22,18 +22,17 @@ eventlet.monkey_patch() import sys from oslo.config import cfg - -from ceilometer import service as ceilo_service from ceilometer.central import manager from ceilometer.service import prepare_service from ceilometer.openstack.common import service +from ceilometer.openstack.common.rpc import service as rpc_service if __name__ == '__main__': prepare_service(sys.argv) mgr = manager.AgentManager() topic = 'ceilometer.agent.central' - ceilo = ceilo_service.PeriodicService(cfg.CONF.host, - topic, mgr) + ceilo = rpc_service.Service(cfg.CONF.host, + topic, mgr) launcher = service.launch(ceilo) launcher.wait() diff --git a/bin/ceilometer-agent-compute b/bin/ceilometer-agent-compute index 4ebce5107..95770740a 100755 --- a/bin/ceilometer-agent-compute +++ b/bin/ceilometer-agent-compute @@ -22,11 +22,10 @@ eventlet.monkey_patch() import sys from oslo.config import cfg - -from ceilometer import service as ceilo_service from ceilometer.compute import manager from ceilometer.service import prepare_service from ceilometer.openstack.common import service +from ceilometer.openstack.common.rpc import service as rpc_service if __name__ == '__main__': @@ -34,7 +33,7 @@ if __name__ == '__main__': prepare_service(sys.argv) mgr = manager.AgentManager() topic = 'ceilometer.agent.compute' - ceilo = ceilo_service.PeriodicService(cfg.CONF.host, - topic, mgr) + ceilo = rpc_service.Service(cfg.CONF.host, + topic, mgr) launcher = service.launch(ceilo) launcher.wait() diff --git a/ceilometer/agent.py b/ceilometer/agent.py index b631b3e9f..faf3c3724 100644 --- a/ceilometer/agent.py +++ b/ceilometer/agent.py @@ -16,15 +16,38 @@ # License for the specific language governing permissions and limitations # under the License. +import abc +import itertools + from oslo.config import cfg from stevedore import dispatch - +from ceilometer.openstack.common import context from ceilometer.openstack.common import log from ceilometer import pipeline LOG = log.getLogger(__name__) +class PollingTask(object): + """Polling task for polling counters and inject into pipeline + A polling task can be invoked periodically or only once""" + + def __init__(self, agent_manager): + self.manager = agent_manager + self.pollsters = set() + self.publish_context = pipeline.PublishContext( + agent_manager.context, + cfg.CONF.counter_source) + + def add(self, pollster, pipelines): + self.publish_context.add_pipelines(pipelines) + self.pollsters.update([pollster]) + + @abc.abstractmethod + def poll_and_publish(self): + """Polling counter and publish into pipeline.""" + + class AgentManager(object): def __init__(self, extension_manager): @@ -38,19 +61,34 @@ class AgentManager(object): self.pollster_manager = extension_manager - def publish_counters_from_one_pollster(self, ext, manager, context, - *args, **kwargs): - """Used to invoke the plugins loaded by the ExtensionManager. - """ - try: - publisher = manager.pipeline_manager.publisher( - context, - cfg.CONF.counter_source, - ) - with publisher as p: - LOG.debug('Polling and publishing %s', ext.name) - p(ext.obj.get_counters(manager, *args, **kwargs)) - except Exception as err: - LOG.warning('Continuing after error from %s: %s', - ext.name, err) - LOG.exception(err) + self.context = context.RequestContext('admin', 'admin', is_admin=True) + + @abc.abstractmethod + def create_polling_task(self): + """Create an empty polling task""" + + def setup_polling_tasks(self): + polling_tasks = {} + for pipeline, pollster in itertools.product( + self.pipeline_manager.pipelines, + self.pollster_manager.extensions): + for counter in pollster.obj.get_counter_names(): + if pipeline.support_counter(counter): + polling_task = polling_tasks.get(pipeline.interval, None) + if not polling_task: + polling_task = self.create_polling_task() + polling_tasks[pipeline.interval] = polling_task + polling_task.add(pollster, [pipeline]) + break + + return polling_tasks + + def initialize_service_hook(self, service): + self.service = service + for interval, task in self.setup_polling_tasks().iteritems(): + self.service.tg.add_timer(interval, + self.interval_task, + task=task) + + def interval_task(self, task): + task.poll_and_publish() diff --git a/ceilometer/central/manager.py b/ceilometer/central/manager.py index 78bed4a63..ef6281a61 100644 --- a/ceilometer/central/manager.py +++ b/ceilometer/central/manager.py @@ -21,6 +21,7 @@ from oslo.config import cfg from ceilometer import agent from ceilometer import extension_manager +from ceilometer.openstack.common import log from ceilometer import service # For cfg.CONF.os_* OPTS = [ @@ -33,6 +34,26 @@ OPTS = [ cfg.CONF.register_opts(OPTS) +LOG = log.getLogger(__name__) + + +class PollingTask(agent.PollingTask): + def poll_and_publish(self): + """Tasks to be run at a periodic interval.""" + with self.publish_context as publisher: + # TODO(yjiang5) passing counters into get_counters to avoid + # polling all counters one by one + for pollster in self.pollsters: + try: + LOG.info("Polling pollster %s", pollster.name) + publisher(list(pollster.obj.get_counters( + self.manager))) + except Exception as err: + LOG.warning('Continue after error from %s: %s', + pollster.name, err) + LOG.exception(err) + + class AgentManager(agent.AgentManager): def __init__(self): @@ -43,15 +64,15 @@ class AgentManager(agent.AgentManager): ), ) - def periodic_tasks(self, context, raise_on_error=False): - """Tasks to be run at a periodic interval.""" - self.keystone = ksclient.Client(username=cfg.CONF.os_username, - password=cfg.CONF.os_password, - tenant_id=cfg.CONF.os_tenant_id, - tenant_name=cfg.CONF.os_tenant_name, - auth_url=cfg.CONF.os_auth_url) + def create_polling_task(self): + return PollingTask(self) - self.pollster_manager.map(self.publish_counters_from_one_pollster, - manager=self, - context=context, - ) + def interval_task(self, task): + self.keystone = ksclient.Client( + username=cfg.CONF.os_username, + password=cfg.CONF.os_password, + tenant_id=cfg.CONF.os_tenant_id, + tenant_name=cfg.CONF.os_tenant_name, + auth_url=cfg.CONF.os_auth_url) + + super(AgentManager, self).interval_task(task) diff --git a/ceilometer/compute/manager.py b/ceilometer/compute/manager.py index b1d78b8fe..f741ff400 100644 --- a/ceilometer/compute/manager.py +++ b/ceilometer/compute/manager.py @@ -41,6 +41,29 @@ cfg.CONF.register_opts(OPTS) LOG = log.getLogger(__name__) +class PollingTask(agent.PollingTask): + def poll_and_publish_instances(self, instances): + with self.publish_context as publisher: + for instance in instances: + if getattr(instance, 'OS-EXT-STS:vm_state', None) != 'error': + # TODO(yjiang5) passing counters to get_counters to avoid + # polling all counters one by one + for pollster in self.pollsters: + try: + LOG.info("Polling pollster %s", pollster.name) + publisher(list(pollster.obj.get_counters( + self.manager, + instance))) + except Exception as err: + LOG.warning('Continue after error from %s: %s', + pollster.name, err) + LOG.exception(err) + + def poll_and_publish(self): + self.poll_and_publish_instances( + self.manager.nv.instance_get_all_by_host(cfg.CONF.host)) + + def get_hypervisor_inspector(): try: namespace = 'ceilometer.compute.virt' @@ -63,20 +86,23 @@ class AgentManager(agent.AgentManager): ), ) self._inspector = get_hypervisor_inspector() + self.nv = nova_client.Client() + + def create_polling_task(self): + return PollingTask(self) + + def setup_notifier_task(self): + """For nova notifier usage""" + task = PollingTask(self) + for pollster in self.pollster_manager.extensions: + task.add( + pollster, + self.pipeline_manager.pipelines) + self.notifier_task = task def poll_instance(self, context, instance): """Poll one instance.""" - self.pollster_manager.map(self.publish_counters_from_one_pollster, - manager=self, - context=context, - instance=instance) - - def periodic_tasks(self, context, raise_on_error=False): - """Tasks to be run at a periodic interval.""" - nv = nova_client.Client() - for instance in nv.instance_get_all_by_host(cfg.CONF.host): - if getattr(instance, 'OS-EXT-STS:vm_state', None) != 'error': - self.poll_instance(context, instance) + self.notifier_task.poll_and_publish_instances([instance]) @property def inspector(self): diff --git a/ceilometer/compute/nova_notifier.py b/ceilometer/compute/nova_notifier.py index ad4aac7a4..eebdf6a14 100644 --- a/ceilometer/compute/nova_notifier.py +++ b/ceilometer/compute/nova_notifier.py @@ -46,6 +46,7 @@ def initialize_manager(agent_manager=None): _agent_manager = AgentManager() else: _agent_manager = agent_manager + _agent_manager.setup_notifier_task() def notify(context, message): diff --git a/ceilometer/objectstore/swift_middleware.py b/ceilometer/objectstore/swift_middleware.py index afc7c22aa..b76468122 100644 --- a/ceilometer/objectstore/swift_middleware.py +++ b/ceilometer/objectstore/swift_middleware.py @@ -95,10 +95,10 @@ class CeilometerMiddleware(object): req = Request(env) version, account, container, obj = split_path(req.path, 1, 4, True) now = timeutils.utcnow().isoformat() - - with self.pipeline_manager.publisher( + with pipeline.PublishContext( context.get_admin_context(), - cfg.CONF.counter_source + cfg.CONF.counter_source, + self.pipeline_manager.pipelines, ) as publisher: if bytes_received: publisher([counter.Counter( diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 5fcf45418..6598545eb 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -64,13 +64,16 @@ class TransformerExtensionManager(extension.ExtensionManager): return self.by_name[name] -class Publisher(object): +class PublishContext(object): - def __init__(self, pipelines, context, source): - self.pipelines = pipelines + def __init__(self, context, source, pipelines=[]): + self.pipelines = set(pipelines) self.context = context self.source = source + def add_pipelines(self, pipelines): + self.pipelines.update(pipelines) + def __enter__(self): def p(counters): for p in self.pipelines: @@ -360,7 +363,7 @@ class PipelineManager(object): :param context: The context. :param source: Counter source. """ - return Publisher(self.pipelines, context, source) + return PublishContext(context, source, self.pipelines) def setup_pipeline(publisher_manager): diff --git a/tests/agentbase.py b/tests/agentbase.py new file mode 100644 index 000000000..dab59c5d9 --- /dev/null +++ b/tests/agentbase.py @@ -0,0 +1,242 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2012 New Dream Network, LLC (DreamHost) +# Copyright © 2013 Intel corp. +# +# Author: Yunhong Jiang +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import datetime +import mock + +from stevedore import extension +from stevedore import dispatch +from stevedore.tests import manager as extension_tests + +from ceilometer import counter +from ceilometer import pipeline +from ceilometer.tests import base + + +default_test_data = counter.Counter( + name='test', + type=counter.TYPE_CUMULATIVE, + unit='', + volume=1, + user_id='test', + project_id='test', + resource_id='test_run_tasks', + timestamp=datetime.datetime.utcnow().isoformat(), + resource_metadata={'name': 'Pollster'}, +) + + +class TestPollster: + test_data = default_test_data + + @classmethod + def get_counter_names(self): + return [self.test_data.name] + + def get_counters(self, manager, instance=None): + self.counters.append((manager, instance)) + return [self.test_data] + + +class TestPollsterException(TestPollster): + def get_counters(self, manager, instance=None): + # Put an instance parameter here so that it can be used + # by both central manager and compute manager + # In future, we possibly don't need such hack if we + # combin the get_counters() function again + self.counters.append((manager, instance)) + raise Exception() + + +class BaseAgentManagerTestCase(base.TestCase): + + class PublisherClass(): + def __init__(self): + self.counters = [] + + def publish_counters(self, ctxt, counter, source): + self.counters.extend(counter) + + class Pollster(TestPollster): + counters = [] + test_data = default_test_data + + class PollsterAnother(TestPollster): + counters = [] + test_data = default_test_data._replace(name='testanother') + + class PollsterException(TestPollsterException): + counters = [] + test_data = default_test_data._replace(name='testexception') + + class PollsterExceptionAnother(TestPollsterException): + counters = [] + test_data = default_test_data._replace(name='testexceptionanother') + + def setup_pipeline(self): + self.publisher = self.PublisherClass() + self.publisher_manager = dispatch.NameDispatchExtensionManager( + 'fake', + check_func=lambda x: True, + invoke_on_load=False, + ) + self.publisher_manager.extensions = [ + extension.Extension( + 'test_pub', + None, + None, + self.publisher, + ), ] + self.publisher_manager.by_name = dict( + (e.name, e) + for e + in self.publisher_manager.extensions) + + self.mgr.pipeline_manager = pipeline.PipelineManager( + self.pipeline_cfg, + self.publisher_manager) + + def create_extension_manager(self): + return extension_tests.TestExtensionManager( + [ + extension.Extension( + 'test', + None, + None, + self.Pollster(), ), + extension.Extension( + 'testanother', + None, + None, + self.PollsterAnother(), ), + extension.Extension( + 'testexception', + None, + None, + self.PollsterException(), ), + extension.Extension( + 'testexceptionanother', + None, + None, + self.PollsterExceptionAnother(), ), + ], + 'fake', + invoke_on_load=False, + ) + + @abc.abstractmethod + def setup_manager(self): + """Setup subclass specific managers""" + + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) + def setUp(self): + super(BaseAgentManagerTestCase, self).setUp() + self.setup_manager() + self.mgr.pollster_manager = self.create_extension_manager() + self.pipeline_cfg = [{ + 'name': "test_pipeline", + 'interval': 60, + 'counters': ['test'], + 'transformers': [], + 'publishers': ["test_pub"], + }, ] + self.setup_pipeline() + + def tearDown(self): + self.Pollster.counters = [] + self.PollsterAnother.counters = [] + self.PollsterException.counters = [] + self.PollsterExceptionAnother.counters = [] + super(BaseAgentManagerTestCase, self).tearDown() + + def test_setup_polling_tasks(self): + polling_tasks = self.mgr.setup_polling_tasks() + self.assertEqual(len(polling_tasks), 1) + self.assertTrue(60 in polling_tasks.keys()) + self.mgr.interval_task(polling_tasks.values()[0]) + self.assertEqual(self.publisher.counters[0], self.Pollster.test_data) + + def test_setup_polling_tasks_multiple_interval(self): + self.pipeline_cfg.append({ + 'name': "test_pipeline", + 'interval': 10, + 'counters': ['test'], + 'transformers': [], + 'publishers': ["test_pub"], + }) + self.setup_pipeline() + polling_tasks = self.mgr.setup_polling_tasks() + self.assertEqual(len(polling_tasks), 2) + self.assertTrue(60 in polling_tasks.keys()) + self.assertTrue(10 in polling_tasks.keys()) + + def test_setup_polling_tasks_mismatch_counter(self): + self.pipeline_cfg.append( + { + 'name': "test_pipeline_1", + 'interval': 10, + 'counters': ['test_invalid'], + 'transformers': [], + 'publishers': ["test_pub"], + }) + polling_tasks = self.mgr.setup_polling_tasks() + self.assertEqual(len(polling_tasks), 1) + self.assertTrue(60 in polling_tasks.keys()) + + def test_setup_polling_task_same_interval(self): + self.pipeline_cfg.append({ + 'name': "test_pipeline", + 'interval': 60, + 'counters': ['testanother'], + 'transformers': [], + 'publishers': ["test_pub"], + }) + self.setup_pipeline() + polling_tasks = self.mgr.setup_polling_tasks() + self.assertEqual(len(polling_tasks), 1) + pollsters = polling_tasks.get(60).pollsters + self.assertEqual(len(pollsters), 2) + + def test_interval_exception_isolation(self): + self.pipeline_cfg = [ + { + 'name': "test_pipeline_1", + 'interval': 10, + 'counters': ['testexceptionanother'], + 'transformers': [], + 'publishers': ["test_pub"], + }, + { + 'name': "test_pipeline_2", + 'interval': 10, + 'counters': ['testexception'], + 'transformers': [], + 'publishers': ["test_pub"], + }, + ] + self.mgr.pipeline_manager = pipeline.PipelineManager( + self.pipeline_cfg, + self.publisher_manager) + + polling_tasks = self.mgr.setup_polling_tasks() + self.assertEqual(len(polling_tasks.keys()), 1) + task = polling_tasks.get(10) + self.mgr.interval_task(polling_tasks.get(10)) + self.assertEqual(len(self.publisher.counters), 0) diff --git a/tests/central/test_manager.py b/tests/central/test_manager.py index 48ea9857e..068113f32 100644 --- a/tests/central/test_manager.py +++ b/tests/central/test_manager.py @@ -22,12 +22,14 @@ import datetime import mock from oslo.config import cfg -from keystoneclient.v2_0 import client as ksclient from stevedore import extension from ceilometer.central import manager from ceilometer import counter from ceilometer.tests import base +from keystoneclient.v2_0 import client as ksclient + +from tests import agentbase @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) @@ -37,55 +39,14 @@ def test_load_plugins(): return -class TestRunTasks(base.TestCase): +class TestRunTasks(agentbase.BaseAgentManagerTestCase): - class Pollster: - counters = [] - test_data = counter.Counter( - name='test', - type=counter.TYPE_CUMULATIVE, - unit='', - volume=1, - user_id='test', - project_id='test', - resource_id='test_run_tasks', - timestamp=datetime.datetime.utcnow().isoformat(), - resource_metadata={'name': 'Pollster'}, - ) + def setup_manager(self): + self.mgr = manager.AgentManager() - def get_counters(self, manager): - self.counters.append((manager, self.test_data)) - return [self.test_data] - - @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): super(TestRunTasks, self).setUp() self.stubs.Set(ksclient, 'Client', lambda *args, **kwargs: None) - self.mgr = manager.AgentManager() - - self.mgr.pollster_manager = extension.ExtensionManager( - 'fake', - invoke_on_load=False, - ) - self.mgr.pollster_manager.extensions = [ - extension.Extension('test', - None, - None, - self.Pollster(), ), - ] - # Invoke the periodic tasks to call the pollsters. - self.mgr.periodic_tasks(None) def tearDown(self): - self.Pollster.counters = [] super(TestRunTasks, self).tearDown() - - def test_message(self): - self.assertEqual(len(self.Pollster.counters), 1) - self.assertTrue(self.Pollster.counters[0][1] is - self.Pollster.test_data) - - def test_notifications(self): - self.assertTrue(self.mgr.pipeline_manager.publisher.called) - args, _ = self.mgr.pipeline_manager.publisher.call_args - self.assertEqual(args[1], cfg.CONF.counter_source) diff --git a/tests/compute/test_manager.py b/tests/compute/test_manager.py index 802139f5b..80ceee1aa 100644 --- a/tests/compute/test_manager.py +++ b/tests/compute/test_manager.py @@ -23,12 +23,17 @@ import datetime import mock from oslo.config import cfg from stevedore import extension +from stevedore.tests import manager as extension_tests +from stevedore import dispatch from ceilometer import nova_client from ceilometer.compute import manager from ceilometer import counter +from ceilometer import pipeline from ceilometer.tests import base +from tests import agentbase + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def test_load_plugins(): @@ -37,25 +42,7 @@ def test_load_plugins(): return -class TestRunTasks(base.TestCase): - - class Pollster: - counters = [] - test_data = counter.Counter( - name='test', - type=counter.TYPE_CUMULATIVE, - unit='', - volume=1, - user_id='test', - project_id='test', - resource_id='test_run_tasks', - timestamp=datetime.datetime.utcnow().isoformat(), - resource_metadata={'name': 'Pollster'}, - ) - - def get_counters(self, manager, instance): - self.counters.append((manager, instance)) - return [self.test_data] +class TestRunTasks(agentbase.BaseAgentManagerTestCase): def _fake_instance(self, name, state): instance = mock.MagicMock() @@ -63,20 +50,12 @@ class TestRunTasks(base.TestCase): setattr(instance, 'OS-EXT-STS:vm_state', state) return instance + def setup_manager(self): + self.mgr = manager.AgentManager() + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) def setUp(self): super(TestRunTasks, self).setUp() - self.mgr = manager.AgentManager() - self.mgr.pollster_manager = extension.ExtensionManager( - 'fake', - invoke_on_load=False, - ) - self.mgr.pollster_manager.extensions = [ - extension.Extension('test', - None, - None, - self.Pollster(), ), - ] # Set up a fake instance value to be returned by # instance_get_all_by_host() so when the manager gets the list @@ -85,19 +64,18 @@ class TestRunTasks(base.TestCase): stillborn_instance = self._fake_instance('stillborn', 'error') self.stubs.Set(nova_client.Client, 'instance_get_all_by_host', lambda *x: [self.instance, stillborn_instance]) - self.mox.ReplayAll() - # Invoke the periodic tasks to call the pollsters. - self.mgr.periodic_tasks(None) - def tearDown(self): - self.Pollster.counters = [] - super(TestRunTasks, self).tearDown() - - def test_message(self): + def test_notifier_task(self): + self.mgr.setup_notifier_task() + self.mgr.poll_instance(None, self.instance) self.assertEqual(len(self.Pollster.counters), 1) + assert self.publisher.counters[0] == self.Pollster.test_data + + def test_setup_polling_tasks(self): + super(TestRunTasks, self).test_setup_polling_tasks() self.assertTrue(self.Pollster.counters[0][1] is self.instance) - def test_notifications(self): - self.assertTrue(self.mgr.pipeline_manager.publisher.called) - args, _ = self.mgr.pipeline_manager.publisher.call_args - self.assertEqual(args[1], cfg.CONF.counter_source) + def test_interval_exception_isolation(self): + super(TestRunTasks, self).test_interval_exception_isolation() + self.assertEqual(len(self.PollsterException.counters), 1) + self.assertEqual(len(self.PollsterExceptionAnother.counters), 1) diff --git a/tests/compute/test_nova_notifier.py b/tests/compute/test_nova_notifier.py index a8f72427c..d03371212 100644 --- a/tests/compute/test_nova_notifier.py +++ b/tests/compute/test_nova_notifier.py @@ -78,6 +78,9 @@ class TestNovaNotifier(base.TestCase): self.counters.append((manager, instance)) return [self.test_data] + def get_counter_names(self): + return ['test'] + def fake_db_instance_get(self, context, id_): if self.instance['uuid'] == id_: return mock.MagicMock(name=self.instance['name'], diff --git a/tests/objectstore/test_swift_middleware.py b/tests/objectstore/test_swift_middleware.py index 586b8a704..4b165b7bf 100644 --- a/tests/objectstore/test_swift_middleware.py +++ b/tests/objectstore/test_swift_middleware.py @@ -45,23 +45,21 @@ class TestSwiftMiddleware(base.TestCase): class _faux_pipeline_manager(object): class _faux_pipeline(object): - def __init__(self): + def __init__(self, pipeline_manager): + self.pipeline_manager = pipeline_manager self.counters = [] def publish_counters(self, ctxt, counters, source): self.counters.extend(counters) - def flush(self, ctx, source): + def flush(self, context, source): pass def __init__(self): - self.pipelines = [self._faux_pipeline()] + self.pipelines = [self._faux_pipeline(self)] - def publisher(self, context, source): - return pipeline.Publisher(self.pipelines, context, source) - - def flush(self, context, source): - pass + def flush(self, ctx, source): + pass def _faux_setup_pipeline(self, publisher_manager): return self.pipeline_manager