From 205031efa2405c7160d0a3cab76530959e84426c Mon Sep 17 00:00:00 2001 From: Anton Beloglazov Date: Tue, 14 Aug 2012 17:33:25 +1000 Subject: [PATCH] Starting implementing the collector's functions --- neat/collector.py | 161 +++++++++++++++++++++++++++++++++++++++- neat/config.py | 4 +- tests/test_collector.py | 17 ++++- tests/test_config.py | 14 ++-- 4 files changed, 183 insertions(+), 13 deletions(-) diff --git a/neat/collector.py b/neat/collector.py index ec51227..e429d36 100755 --- a/neat/collector.py +++ b/neat/collector.py @@ -14,12 +14,88 @@ """ The main data collector module. +The data collector is deployed on every compute host and is executed +periodically to collect the CPU utilization data for each VM running +on the host and stores the data in the local file-based data store. +The data is stored as the average number of MHz consumed by a VM +during the last measurement interval. The CPU usage data are stored as +integers. This data format is portable: the stored values can be +converted to the CPU utilization for any host or VM type, supporting +heterogeneous hosts and VMs. + +The actual data is obtained from Libvirt in the form of the CPU time +consumed by a VM to date. Using the CPU time collected at the previous +time frame, the CPU time for the past time interval is calculated. +According to the CPU frequency of the host and the length of the time +interval, the CPU time is converted into the required average MHz +consumed by the VM over the last time interval. The collected data are +stored both locally and submitted to the central database. The number +of the latest data values stored locally and passed to the underload / +overload detection and VM selection algorithms is defined using the +`data_collector_data_length` option in the configuration file. + +At the beginning of every execution, the data collector obtains the +set of VMs currently running on the host using the Nova API and +compares them to the VMs running on the host at the previous time +step. If new VMs have been found, the data collector fetches the +historical data about them from the central database and stores the +data in the local file-based data store. If some VMs have been +removed, the data collector removes the data about these VMs from the +local data store. + +The data collector stores the resource usage information locally in +files in the /vm directory, where + is defined in the configuration file using +the local_data_directory option. The data for each VM are stored in +a separate file named according to the UUID of the corresponding VM. +The format of the files is a new line separated list of integers +representing the average CPU consumption by the VMs in MHz during the +last measurement interval. + +The data collector will be implemented as a Linux daemon running in +the background and collecting data on the resource usage by VMs every +data_collector_interval seconds. When the data collection phase is +invoked, the component performs the following steps: + +1. Read the names of the files from the /vm + directory to determine the list of VMs running on the host at the + last data collection. + +2. Call the Nova API to obtain the list of VMs that are currently + active on the host. + +3. Compare the old and new lists of VMs and determine the newly added + or removed VMs. + +4. Delete the files from the /vm directory + corresponding to the VMs that have been removed from the host. + +5. Fetch the latest data_collector_data_length data values from the + central database for each newly added VM using the database + connection information specified in the sql_connection option and + save the data in the /vm directory. + +6. Call the Libvirt API to obtain the CPU time for each VM active on + the host. + +7. Transform the data obtained from the Libvirt API into the average + MHz according to the frequency of the host's CPU and time interval + from the previous data collection. + +8. Store the converted data in the /vm + directory in separate files for each VM, and submit the data to the + central database. + +9. Schedule the next execution after data_collector_interval + seconds. """ from contracts import contract import sys import libvirt +from neat.config import * + @contract def start(iterations): @@ -31,19 +107,98 @@ def start(iterations): :return: The number of iterations performed. :rtype: int """ + config = read_config([DEFAILT_CONFIG_PATH, CONFIG_PATH]) + if not validate_config(config, REQUIRED_FIELDS): + raise KeyError("The config dictionary does not contain all the required fields") + if iterations == -1: while True: - collect() + collect(config) else: for _ in xrange(iterations): - collect() + collect(config) return iterations -def collect(): +def collect(config): + """ Execute a data collection iteration. + +1. Read the names of the files from the /vm + directory to determine the list of VMs running on the host at the + last data collection. + +2. Call the Nova API to obtain the list of VMs that are currently + active on the host. + +3. Compare the old and new lists of VMs and determine the newly added + or removed VMs. + +4. Delete the files from the /vm directory + corresponding to the VMs that have been removed from the host. + +5. Fetch the latest data_collector_data_length data values from the + central database for each newly added VM using the database + connection information specified in the sql_connection option and + save the data in the /vm directory. + +6. Call the Libvirt API to obtain the CPU time for each VM active on + the host. + +7. Transform the data obtained from the Libvirt API into the average + MHz according to the frequency of the host's CPU and time interval + from the previous data collection. + +8. Store the converted data in the /vm + directory in separate files for each VM, and submit the data to the + central database. + +9. Schedule the next execution after data_collector_interval + seconds. + + :param config: A config dictionary. + :type config: dict(str: *) + """ + vms_previous = get_previous_vms( + build_local_vm_path(config.get('local_data_directory'))) + vms_current = get_current_vms() + + +@contract +def get_previous_vms(path): + """ Get a list of VM UUIDs from the path. + + :param path: A path to read VM UUIDs from. + :type path: str + + :return: The list of VM UUIDs from the path. + :rtype: list(str) + """ + return os.listdir(path) + + +@contract +def get_current_vms(): + """ Get a list of VM UUIDs from libvirt. + + :return: The list of VM UUIDs from libvirt. + :rtype: list(str) + """ pass +@contract +def build_local_vm_path(local_data_directory): + """ Build the path to the local VM data directory. + + :param local_data_directory: The base local data path. + :type local_data_directory: str + + :return: The path to the local VM data directory. + :rtype: str + """ + return os.path.join(local_data_directory, 'vms') + + def getNumberOfPhysicalCpus(connection): return connection.getInfo()[2] diff --git a/neat/config.py b/neat/config.py index 984e44f..50b49fe 100644 --- a/neat/config.py +++ b/neat/config.py @@ -54,7 +54,7 @@ REQUIRED_FIELDS = [ @contract -def readConfig(paths): +def read_config(paths): """ Read the configuration files and return the options. :param paths: A list of required configuration file paths. @@ -70,7 +70,7 @@ def readConfig(paths): @contract -def validateConfig(config, required_fields): +def validate_config(config, required_fields): """ Check that the config contains all the required fields. :param config: A config dictionary to check. diff --git a/tests/test_collector.py b/tests/test_collector.py index 11ae5e6..283e20d 100644 --- a/tests/test_collector.py +++ b/tests/test_collector.py @@ -29,5 +29,20 @@ class Collector(TestCase): @qc(10) def start(iterations=int_(0, 10)): with MockTransaction: - expect(collector).collect().exactly(iterations).times() + expect(collector).collect(any_dict).exactly(iterations).times() assert collector.start(iterations) == iterations + + @qc(1) + def get_previous_vms(): + local_data_directory = os.path.join( + os.path.dirname(__file__), 'resources', 'vms') + assert collector.get_previous_vms(local_data_directory) == \ + ['ec452be0-e5d0-11e1-aff1-0800200c9a66', + 'e615c450-e5d0-11e1-aff1-0800200c9a66', + 'f3e142d0-e5d0-11e1-aff1-0800200c9a66'] + + @qc + def build_local_vm_path( + x=str_(of='abc123_-/') + ): + assert collector.build_local_vm_path(x) == os.path.join(x, 'vms') diff --git a/tests/test_config.py b/tests/test_config.py index 8e587a5..309ad15 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -21,20 +21,20 @@ class Config(TestCase): @qc def read_default_config(): - config = readConfig([DEFAILT_CONFIG_PATH]) - assert validateConfig(config, REQUIRED_FIELDS) + config = read_config([DEFAILT_CONFIG_PATH]) + assert validate_config(config, REQUIRED_FIELDS) @qc def read_config(): - config = readConfig([DEFAILT_CONFIG_PATH, CONFIG_PATH]) - assert validateConfig(config, REQUIRED_FIELDS) + config = read_config([DEFAILT_CONFIG_PATH, CONFIG_PATH]) + assert validate_config(config, REQUIRED_FIELDS) @qc def validate_valid_config( x=list_(of=str_(of='abc123_', max_length=20), min_length=0, max_length=10) ): test_config = dict(zip(x, x)) - assert validateConfig(test_config, x) + assert validate_config(test_config, x) @qc def validate_invalid_config( @@ -43,6 +43,6 @@ class Config(TestCase): ): test_config = dict(zip(x, x)) if not y: - assert validateConfig(test_config, y) + assert validate_config(test_config, y) else: - assert not validateConfig(test_config, y) + assert not validate_config(test_config, y)