Starting implementing the collector's functions

This commit is contained in:
Anton Beloglazov 2012-08-14 17:33:25 +10:00
parent 8d89257cc4
commit 205031efa2
4 changed files with 183 additions and 13 deletions

View File

@ -14,12 +14,88 @@
""" The main data collector module.
The data collector is deployed on every compute host and is executed
periodically to collect the CPU utilization data for each VM running
on the host and stores the data in the local file-based data store.
The data is stored as the average number of MHz consumed by a VM
during the last measurement interval. The CPU usage data are stored as
integers. This data format is portable: the stored values can be
converted to the CPU utilization for any host or VM type, supporting
heterogeneous hosts and VMs.
The actual data is obtained from Libvirt in the form of the CPU time
consumed by a VM to date. Using the CPU time collected at the previous
time frame, the CPU time for the past time interval is calculated.
According to the CPU frequency of the host and the length of the time
interval, the CPU time is converted into the required average MHz
consumed by the VM over the last time interval. The collected data are
stored both locally and submitted to the central database. The number
of the latest data values stored locally and passed to the underload /
overload detection and VM selection algorithms is defined using the
`data_collector_data_length` option in the configuration file.
At the beginning of every execution, the data collector obtains the
set of VMs currently running on the host using the Nova API and
compares them to the VMs running on the host at the previous time
step. If new VMs have been found, the data collector fetches the
historical data about them from the central database and stores the
data in the local file-based data store. If some VMs have been
removed, the data collector removes the data about these VMs from the
local data store.
The data collector stores the resource usage information locally in
files in the <local_data_directory>/vm directory, where
<local_data_directory> is defined in the configuration file using
the local_data_directory option. The data for each VM are stored in
a separate file named according to the UUID of the corresponding VM.
The format of the files is a new line separated list of integers
representing the average CPU consumption by the VMs in MHz during the
last measurement interval.
The data collector will be implemented as a Linux daemon running in
the background and collecting data on the resource usage by VMs every
data_collector_interval seconds. When the data collection phase is
invoked, the component performs the following steps:
1. Read the names of the files from the <local_data_directory>/vm
directory to determine the list of VMs running on the host at the
last data collection.
2. Call the Nova API to obtain the list of VMs that are currently
active on the host.
3. Compare the old and new lists of VMs and determine the newly added
or removed VMs.
4. Delete the files from the <local_data_directory>/vm directory
corresponding to the VMs that have been removed from the host.
5. Fetch the latest data_collector_data_length data values from the
central database for each newly added VM using the database
connection information specified in the sql_connection option and
save the data in the <local_data_directory>/vm directory.
6. Call the Libvirt API to obtain the CPU time for each VM active on
the host.
7. Transform the data obtained from the Libvirt API into the average
MHz according to the frequency of the host's CPU and time interval
from the previous data collection.
8. Store the converted data in the <local_data_directory>/vm
directory in separate files for each VM, and submit the data to the
central database.
9. Schedule the next execution after data_collector_interval
seconds.
"""
from contracts import contract
import sys
import libvirt
from neat.config import *
@contract
def start(iterations):
@ -31,19 +107,98 @@ def start(iterations):
:return: The number of iterations performed.
:rtype: int
"""
config = read_config([DEFAILT_CONFIG_PATH, CONFIG_PATH])
if not validate_config(config, REQUIRED_FIELDS):
raise KeyError("The config dictionary does not contain all the required fields")
if iterations == -1:
while True:
collect()
collect(config)
else:
for _ in xrange(iterations):
collect()
collect(config)
return iterations
def collect():
def collect(config):
""" Execute a data collection iteration.
1. Read the names of the files from the <local_data_directory>/vm
directory to determine the list of VMs running on the host at the
last data collection.
2. Call the Nova API to obtain the list of VMs that are currently
active on the host.
3. Compare the old and new lists of VMs and determine the newly added
or removed VMs.
4. Delete the files from the <local_data_directory>/vm directory
corresponding to the VMs that have been removed from the host.
5. Fetch the latest data_collector_data_length data values from the
central database for each newly added VM using the database
connection information specified in the sql_connection option and
save the data in the <local_data_directory>/vm directory.
6. Call the Libvirt API to obtain the CPU time for each VM active on
the host.
7. Transform the data obtained from the Libvirt API into the average
MHz according to the frequency of the host's CPU and time interval
from the previous data collection.
8. Store the converted data in the <local_data_directory>/vm
directory in separate files for each VM, and submit the data to the
central database.
9. Schedule the next execution after data_collector_interval
seconds.
:param config: A config dictionary.
:type config: dict(str: *)
"""
vms_previous = get_previous_vms(
build_local_vm_path(config.get('local_data_directory')))
vms_current = get_current_vms()
@contract
def get_previous_vms(path):
""" Get a list of VM UUIDs from the path.
:param path: A path to read VM UUIDs from.
:type path: str
:return: The list of VM UUIDs from the path.
:rtype: list(str)
"""
return os.listdir(path)
@contract
def get_current_vms():
""" Get a list of VM UUIDs from libvirt.
:return: The list of VM UUIDs from libvirt.
:rtype: list(str)
"""
pass
@contract
def build_local_vm_path(local_data_directory):
""" Build the path to the local VM data directory.
:param local_data_directory: The base local data path.
:type local_data_directory: str
:return: The path to the local VM data directory.
:rtype: str
"""
return os.path.join(local_data_directory, 'vms')
def getNumberOfPhysicalCpus(connection):
return connection.getInfo()[2]

View File

@ -54,7 +54,7 @@ REQUIRED_FIELDS = [
@contract
def readConfig(paths):
def read_config(paths):
""" Read the configuration files and return the options.
:param paths: A list of required configuration file paths.
@ -70,7 +70,7 @@ def readConfig(paths):
@contract
def validateConfig(config, required_fields):
def validate_config(config, required_fields):
""" Check that the config contains all the required fields.
:param config: A config dictionary to check.

View File

@ -29,5 +29,20 @@ class Collector(TestCase):
@qc(10)
def start(iterations=int_(0, 10)):
with MockTransaction:
expect(collector).collect().exactly(iterations).times()
expect(collector).collect(any_dict).exactly(iterations).times()
assert collector.start(iterations) == iterations
@qc(1)
def get_previous_vms():
local_data_directory = os.path.join(
os.path.dirname(__file__), 'resources', 'vms')
assert collector.get_previous_vms(local_data_directory) == \
['ec452be0-e5d0-11e1-aff1-0800200c9a66',
'e615c450-e5d0-11e1-aff1-0800200c9a66',
'f3e142d0-e5d0-11e1-aff1-0800200c9a66']
@qc
def build_local_vm_path(
x=str_(of='abc123_-/')
):
assert collector.build_local_vm_path(x) == os.path.join(x, 'vms')

View File

@ -21,20 +21,20 @@ class Config(TestCase):
@qc
def read_default_config():
config = readConfig([DEFAILT_CONFIG_PATH])
assert validateConfig(config, REQUIRED_FIELDS)
config = read_config([DEFAILT_CONFIG_PATH])
assert validate_config(config, REQUIRED_FIELDS)
@qc
def read_config():
config = readConfig([DEFAILT_CONFIG_PATH, CONFIG_PATH])
assert validateConfig(config, REQUIRED_FIELDS)
config = read_config([DEFAILT_CONFIG_PATH, CONFIG_PATH])
assert validate_config(config, REQUIRED_FIELDS)
@qc
def validate_valid_config(
x=list_(of=str_(of='abc123_', max_length=20), min_length=0, max_length=10)
):
test_config = dict(zip(x, x))
assert validateConfig(test_config, x)
assert validate_config(test_config, x)
@qc
def validate_invalid_config(
@ -43,6 +43,6 @@ class Config(TestCase):
):
test_config = dict(zip(x, x))
if not y:
assert validateConfig(test_config, y)
assert validate_config(test_config, y)
else:
assert not validateConfig(test_config, y)
assert not validate_config(test_config, y)