Emit cpu_util from transformer instead of pollster
The emission of the cpu_util counter from the compute CPUUtilPollster can be superceeded by the RateOfChangeTransformer (the intended target usercase for the new transformer). The caching of the CPUStats is now no longer required. Note that the pipeline.yaml must be updated in deployments, otherwise the cpu_util meter will no longer be collected. Change-Id: I79ff047595b960de7a2acb2ec81c357fe2ee21e5
This commit is contained in:
parent
ac2b21b722
commit
74c1b0d7aa
@ -18,9 +18,6 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import abc
|
|
||||||
import datetime
|
|
||||||
|
|
||||||
from ceilometer import counter
|
from ceilometer import counter
|
||||||
from ceilometer.compute import plugin
|
from ceilometer.compute import plugin
|
||||||
from ceilometer.compute.pollsters import util
|
from ceilometer.compute.pollsters import util
|
||||||
@ -29,91 +26,25 @@ from ceilometer.openstack.common import log
|
|||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class _Base(plugin.ComputePollster):
|
class CPUPollster(plugin.ComputePollster):
|
||||||
|
|
||||||
CACHE_KEY_CPU = 'cpu'
|
|
||||||
|
|
||||||
def _get_cpu_info(self, inspector, instance_name, cache):
|
|
||||||
i_cache = cache.setdefault(self.CACHE_KEY_CPU, {})
|
|
||||||
if instance_name not in i_cache:
|
|
||||||
i_cache[instance_name] = inspector.inspect_cpus(instance_name)
|
|
||||||
return i_cache[instance_name]
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _get_counter(instance, instance_name, cpu_info):
|
|
||||||
"""Return one Counter."""
|
|
||||||
|
|
||||||
def get_counters(self, manager, cache, instance):
|
def get_counters(self, manager, cache, instance):
|
||||||
LOG.info('checking instance %s', instance.id)
|
LOG.info('checking instance %s', instance.id)
|
||||||
instance_name = util.instance_name(instance)
|
instance_name = util.instance_name(instance)
|
||||||
try:
|
try:
|
||||||
cpu_info = self._get_cpu_info(
|
cpu_info = manager.inspector.inspect_cpus(instance_name)
|
||||||
manager.inspector,
|
LOG.info("CPUTIME USAGE: %s %d",
|
||||||
instance_name,
|
instance.__dict__, cpu_info.time)
|
||||||
cache,
|
cpu_num = {'cpu_number': cpu_info.number}
|
||||||
)
|
yield util.make_counter_from_instance(
|
||||||
yield self._get_counter(
|
|
||||||
instance,
|
instance,
|
||||||
instance_name,
|
name='cpu',
|
||||||
cpu_info,
|
type=counter.TYPE_CUMULATIVE,
|
||||||
|
unit='ns',
|
||||||
|
volume=cpu_info.time,
|
||||||
|
additional_metadata=cpu_num,
|
||||||
)
|
)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
LOG.error('could not get CPU time for %s: %s',
|
LOG.error('could not get CPU time for %s: %s',
|
||||||
instance.id, err)
|
instance.id, err)
|
||||||
LOG.exception(err)
|
LOG.exception(err)
|
||||||
|
|
||||||
|
|
||||||
class CPUPollster(_Base):
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_counter(instance, instance_name, cpu_info):
|
|
||||||
LOG.info("CPUTIME USAGE: %s %d",
|
|
||||||
instance.__dict__, cpu_info.time)
|
|
||||||
cpu_num = {'cpu_number': cpu_info.number}
|
|
||||||
return util.make_counter_from_instance(
|
|
||||||
instance,
|
|
||||||
name='cpu',
|
|
||||||
type=counter.TYPE_CUMULATIVE,
|
|
||||||
unit='ns',
|
|
||||||
volume=cpu_info.time,
|
|
||||||
additional_metadata=cpu_num,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class CPUUtilPollster(_Base):
|
|
||||||
# FIXME(eglynn): once we have a way of configuring which measures
|
|
||||||
# are published to each sink, we should by default
|
|
||||||
# disable publishing this derived measure to the
|
|
||||||
# metering store, only publishing to those sinks
|
|
||||||
# that specifically need it
|
|
||||||
|
|
||||||
utilization_map = {}
|
|
||||||
|
|
||||||
def _get_cpu_util(self, instance, cpu_info):
|
|
||||||
prev_times = self.utilization_map.get(instance.id)
|
|
||||||
self.utilization_map[instance.id] = (cpu_info.time,
|
|
||||||
datetime.datetime.now())
|
|
||||||
cpu_util = 0.0
|
|
||||||
if prev_times:
|
|
||||||
prev_cpu = prev_times[0]
|
|
||||||
prev_timestamp = prev_times[1]
|
|
||||||
delta = self.utilization_map[instance.id][1] - prev_timestamp
|
|
||||||
elapsed = (delta.seconds * (10 ** 6) + delta.microseconds) * 1000
|
|
||||||
cores_fraction = 1.0 / cpu_info.number
|
|
||||||
# account for cpu_time being reset when the instance is restarted
|
|
||||||
time_used = (cpu_info.time - prev_cpu
|
|
||||||
if prev_cpu <= cpu_info.time else cpu_info.time)
|
|
||||||
cpu_util = 100 * cores_fraction * time_used / elapsed
|
|
||||||
return cpu_util
|
|
||||||
|
|
||||||
def _get_counter(self, instance, instance_name, cpu_info):
|
|
||||||
cpu_util = self._get_cpu_util(instance, cpu_info)
|
|
||||||
LOG.info("CPU UTILIZATION %%: %s %0.2f",
|
|
||||||
instance.__dict__, cpu_util)
|
|
||||||
return util.make_counter_from_instance(
|
|
||||||
instance,
|
|
||||||
name='cpu_util',
|
|
||||||
type=counter.TYPE_GAUGE,
|
|
||||||
unit='%',
|
|
||||||
volume=cpu_util,
|
|
||||||
)
|
|
||||||
|
@ -7,3 +7,19 @@
|
|||||||
transformers:
|
transformers:
|
||||||
publishers:
|
publishers:
|
||||||
- rpc://
|
- rpc://
|
||||||
|
-
|
||||||
|
name: cpu_pipeline
|
||||||
|
interval: 600
|
||||||
|
counters:
|
||||||
|
- "cpu"
|
||||||
|
transformers:
|
||||||
|
- name: "rate_of_change"
|
||||||
|
parameters:
|
||||||
|
target:
|
||||||
|
name: "cpu_util"
|
||||||
|
unit: "%"
|
||||||
|
type: "gauge"
|
||||||
|
scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
|
||||||
|
replace: False
|
||||||
|
publishers:
|
||||||
|
- rpc://
|
||||||
|
@ -56,7 +56,6 @@ ceilometer.poll.compute =
|
|||||||
disk.read.bytes = ceilometer.compute.pollsters.disk:ReadBytesPollster
|
disk.read.bytes = ceilometer.compute.pollsters.disk:ReadBytesPollster
|
||||||
disk.write.bytes = ceilometer.compute.pollsters.disk:WriteBytesPollster
|
disk.write.bytes = ceilometer.compute.pollsters.disk:WriteBytesPollster
|
||||||
cpu = ceilometer.compute.pollsters.cpu:CPUPollster
|
cpu = ceilometer.compute.pollsters.cpu:CPUPollster
|
||||||
cpu_util = ceilometer.compute.pollsters.cpu:CPUUtilPollster
|
|
||||||
network.incoming.bytes = ceilometer.compute.pollsters.net:IncomingBytesPollster
|
network.incoming.bytes = ceilometer.compute.pollsters.net:IncomingBytesPollster
|
||||||
network.incoming.packets = ceilometer.compute.pollsters.net:IncomingPacketsPollster
|
network.incoming.packets = ceilometer.compute.pollsters.net:IncomingPacketsPollster
|
||||||
network.outgoing.bytes = ceilometer.compute.pollsters.net:OutgoingBytesPollster
|
network.outgoing.bytes = ceilometer.compute.pollsters.net:OutgoingBytesPollster
|
||||||
|
@ -55,8 +55,6 @@ class TestCPUPollster(base.TestPollsterBase):
|
|||||||
self.assertEqual(set([c.name for c in counters]),
|
self.assertEqual(set([c.name for c in counters]),
|
||||||
set(['cpu']))
|
set(['cpu']))
|
||||||
assert counters[0].volume == expected_time
|
assert counters[0].volume == expected_time
|
||||||
assert pollster.CACHE_KEY_CPU in cache
|
|
||||||
assert self.instance.name in cache[pollster.CACHE_KEY_CPU]
|
|
||||||
self.assertEquals(counters[0].resource_metadata.get('cpu_number'),
|
self.assertEquals(counters[0].resource_metadata.get('cpu_number'),
|
||||||
2)
|
2)
|
||||||
# ensure elapsed time between polling cycles is non-zero
|
# ensure elapsed time between polling cycles is non-zero
|
||||||
@ -67,78 +65,16 @@ class TestCPUPollster(base.TestPollsterBase):
|
|||||||
_verify_cpu_metering(2 * (10 ** 6))
|
_verify_cpu_metering(2 * (10 ** 6))
|
||||||
|
|
||||||
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
|
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
|
||||||
def test_get_counters_cache(self):
|
def test_get_counters_no_caching(self):
|
||||||
|
self.inspector.inspect_cpus(self.instance.name).AndReturn(
|
||||||
|
virt_inspector.CPUStats(time=1 * (10 ** 6), number=2))
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
mgr = manager.AgentManager()
|
mgr = manager.AgentManager()
|
||||||
pollster = cpu.CPUPollster()
|
pollster = cpu.CPUPollster()
|
||||||
|
|
||||||
cache = {
|
cache = {}
|
||||||
pollster.CACHE_KEY_CPU: {
|
|
||||||
self.instance.name: virt_inspector.CPUStats(
|
|
||||||
time=10 ** 6,
|
|
||||||
number=2,
|
|
||||||
),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
counters = list(pollster.get_counters(mgr, cache, self.instance))
|
counters = list(pollster.get_counters(mgr, cache, self.instance))
|
||||||
self.assertEquals(len(counters), 1)
|
self.assertEquals(len(counters), 1)
|
||||||
self.assertEquals(counters[0].volume, 10 ** 6)
|
self.assertEquals(counters[0].volume, 10 ** 6)
|
||||||
|
self.assertEquals(len(cache), 0)
|
||||||
|
|
||||||
class TestCPUUtilPollster(base.TestPollsterBase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestCPUUtilPollster, self).setUp()
|
|
||||||
|
|
||||||
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
|
|
||||||
def test_get_counters(self):
|
|
||||||
self.inspector.inspect_cpus(self.instance.name).AndReturn(
|
|
||||||
virt_inspector.CPUStats(time=1 * (10 ** 6), number=2))
|
|
||||||
self.inspector.inspect_cpus(self.instance.name).AndReturn(
|
|
||||||
virt_inspector.CPUStats(time=3 * (10 ** 6), number=2))
|
|
||||||
# cpu_time resets on instance restart
|
|
||||||
self.inspector.inspect_cpus(self.instance.name).AndReturn(
|
|
||||||
virt_inspector.CPUStats(time=2 * (10 ** 6), number=2))
|
|
||||||
self.mox.ReplayAll()
|
|
||||||
|
|
||||||
mgr = manager.AgentManager()
|
|
||||||
pollster = cpu.CPUUtilPollster()
|
|
||||||
pollster.utilization_map = {} # clear the internal cache
|
|
||||||
|
|
||||||
def _verify_cpu_metering(zero):
|
|
||||||
cache = {}
|
|
||||||
counters = list(pollster.get_counters(mgr, cache, self.instance))
|
|
||||||
self.assertEquals(len(counters), 1)
|
|
||||||
self.assertEqual(set([c.name for c in counters]),
|
|
||||||
set(['cpu_util']))
|
|
||||||
assert (counters[0].volume == 0.0 if zero else
|
|
||||||
counters[0].volume > 0.0)
|
|
||||||
assert pollster.CACHE_KEY_CPU in cache
|
|
||||||
assert self.instance.name in cache[pollster.CACHE_KEY_CPU]
|
|
||||||
# ensure elapsed time between polling cycles is non-zero
|
|
||||||
time.sleep(0.001)
|
|
||||||
|
|
||||||
_verify_cpu_metering(True)
|
|
||||||
_verify_cpu_metering(False)
|
|
||||||
_verify_cpu_metering(False)
|
|
||||||
|
|
||||||
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())
|
|
||||||
def test_get_counters_cache(self):
|
|
||||||
self.mox.ReplayAll()
|
|
||||||
|
|
||||||
mgr = manager.AgentManager()
|
|
||||||
pollster = cpu.CPUUtilPollster()
|
|
||||||
pollster.utilization_map = {} # clear the internal cache
|
|
||||||
|
|
||||||
cache = {
|
|
||||||
pollster.CACHE_KEY_CPU: {
|
|
||||||
self.instance.name: virt_inspector.CPUStats(
|
|
||||||
time=10 ** 6,
|
|
||||||
number=2,
|
|
||||||
),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
counters = list(pollster.get_counters(mgr, cache, self.instance))
|
|
||||||
self.assertEquals(len(counters), 1)
|
|
||||||
self.assertEquals(counters[0].volume, 0)
|
|
||||||
|
Loading…
Reference in New Issue
Block a user