diff --git a/playbooks/base-test/post-logs.yaml b/playbooks/base-test/post-logs.yaml index b2d2064..aa9e91b 100644 --- a/playbooks/base-test/post-logs.yaml +++ b/playbooks/base-test/post-logs.yaml @@ -37,11 +37,3 @@ url: 'download-logs.sh' metadata: command: 'curl "{{ upload_results.url }}/download-logs.sh" | bash' - -- hosts: localhost - # NOTE(pabelanger): We ignore_errors for the following tasks as not to fail - # successful jobs. - ignore_errors: yes - roles: - - submit-logstash-jobs - - submit-subunit-jobs diff --git a/playbooks/base/post-logs.yaml b/playbooks/base/post-logs.yaml index 925690e..0beff84 100644 --- a/playbooks/base/post-logs.yaml +++ b/playbooks/base/post-logs.yaml @@ -37,11 +37,3 @@ url: 'download-logs.sh' metadata: command: 'curl "{{ upload_results.url }}/download-logs.sh" | bash' - -- hosts: localhost - # NOTE(pabelanger): We ignore_errors for the following tasks as not to fail - # successful jobs. - ignore_errors: yes - roles: - - submit-logstash-jobs - - submit-subunit-jobs diff --git a/roles/submit-log-processor-jobs/README.rst b/roles/submit-log-processor-jobs/README.rst deleted file mode 100644 index ccb2604..0000000 --- a/roles/submit-log-processor-jobs/README.rst +++ /dev/null @@ -1,6 +0,0 @@ -A module to submit a log processing job. - -This role is a container for an Ansible module which processes a log -directory and submits jobs to a log processing gearman queue. The -role itself performs no actions, and is intended only to be used by -other roles as a dependency to supply the module. diff --git a/roles/submit-log-processor-jobs/library/submit_log_processor_jobs.py b/roles/submit-log-processor-jobs/library/submit_log_processor_jobs.py deleted file mode 100644 index e0f2cd1..0000000 --- a/roles/submit-log-processor-jobs/library/submit_log_processor_jobs.py +++ /dev/null @@ -1,212 +0,0 @@ -# Copyright 2013 Hewlett-Packard Development Company, L.P. -# Copyright (C) 2017 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import json -import re -import traceback - -from ansible.module_utils.six.moves import urllib -from ansible.module_utils.basic import AnsibleModule - -import ansible.module_utils.gear as gear - - -class FileMatcher(object): - def __init__(self, name, tags): - self._name = name - self.name = re.compile(name) - self.tags = tags - - def matches(self, s): - if self.name.search(s): - return True - - -class File(object): - def __init__(self, name, tags): - self._name = name - self._tags = tags - - @property - def name(self): - return self._name - - @name.setter - def name(self, value): - raise Exception("Cannot update File() objects they must be hashable") - - @property - def tags(self): - return self._tags - - @tags.setter - def tags(self, value): - raise Exception("Cannot update File() objects they must be hashable") - - def toDict(self): - return dict(name=self.name, - tags=self.tags) - - # We need these objects to be hashable so that we can use sets - # below. - def __eq__(self, other): - return self.name == other.name - - def __ne__(self, other): - return not self.__eq__(other) - - def __hash__(self): - return hash(self.name) - - -class LogMatcher(object): - def __init__(self, server, port, config, success, log_url, host_vars): - self.client = gear.Client() - self.client.addServer(server, port) - self.hosts = host_vars - self.zuul = list(host_vars.values())[0]['zuul'] - self.success = success - self.log_url = log_url - self.matchers = [] - for f in config['files']: - self.matchers.append(FileMatcher(f['name'], f.get('tags', []))) - - def findFiles(self, path): - results = set() - for (dirpath, dirnames, filenames) in os.walk(path): - for filename in filenames: - fn = os.path.join(dirpath, filename) - partial_name = fn[len(path) + 1:] - for matcher in self.matchers: - if matcher.matches(partial_name): - results.add(File(partial_name, matcher.tags)) - break - return results - - def submitJobs(self, jobname, files): - self.client.waitForServer(90) - ret = [] - for f in files: - output = self.makeOutput(f) - output = json.dumps(output).encode('utf8') - job = gear.TextJob(jobname, output) - self.client.submitJob(job, background=True) - ret.append(dict(handle=job.handle, - arguments=output)) - return ret - - def makeOutput(self, file_object): - output = {} - output['retry'] = False - output['event'] = self.makeEvent(file_object) - output['source_url'] = output['event']['fields']['log_url'] - return output - - def makeEvent(self, file_object): - out_event = {} - out_event["fields"] = self.makeFields(file_object.name) - basename = os.path.basename(file_object.name) - out_event["tags"] = [basename] + file_object.tags - if basename.endswith(".gz"): - # Backward compat for e-r which relies on tag values - # without the .gx suffix - out_event["tags"].append(basename[:-3]) - return out_event - - def makeFields(self, filename): - hosts = [h for h in self.hosts.values() if 'nodepool' in h] - zuul = self.zuul - fields = {} - fields["filename"] = filename - fields["build_name"] = zuul['job'] - fields["build_status"] = self.success and 'SUCCESS' or 'FAILURE' - # TODO: this is too simplistic for zuul v3 multinode jobs - node = hosts[0] - fields["build_node"] = node['nodepool']['label'] - fields["build_hostids"] = [h['nodepool']['host_id'] for h in hosts - if 'host_id' in h['nodepool']] - # TODO: should be build_executor, or removed completely - fields["build_master"] = zuul['executor']['hostname'] - - fields["project"] = zuul['project']['name'] - # The voting value is "1" for voting, "0" for non-voting - fields["voting"] = int(zuul['voting']) - # TODO(clarkb) can we do better without duplicated data here? - fields["build_uuid"] = zuul['build'] - fields["build_short_uuid"] = fields["build_uuid"][:7] - # TODO: this should be build_pipeline - fields["build_queue"] = zuul['pipeline'] - # TODO: this is not interesteding anymore - fields["build_ref"] = zuul['ref'] - fields["build_branch"] = zuul.get('branch', 'UNKNOWN') - # TODO: remove - fields["build_zuul_url"] = "N/A" - if 'change' in zuul: - fields["build_change"] = zuul['change'] - fields["build_patchset"] = zuul['patchset'] - elif 'newrev' in zuul: - fields["build_newrev"] = zuul.get('newrev', 'UNKNOWN') - fields["node_provider"] = node['nodepool']['provider'] - log_url = urllib.parse.urljoin(self.log_url, filename) - fields["log_url"] = log_url - if 'executor' in zuul and 'hostname' in zuul['executor']: - fields["zuul_executor"] = zuul['executor']['hostname'] - if 'attempts' in zuul: - fields["zuul_attempts"] = zuul['attempts'] - return fields - - -def main(): - module = AnsibleModule( - argument_spec=dict( - gearman_server=dict(type='str'), - gearman_port=dict(type='int', default=4730), - # TODO: add ssl support - host_vars=dict(type='dict'), - path=dict(type='path'), - config=dict(type='dict'), - success=dict(type='bool'), - log_url=dict(type='str'), - job=dict(type='str'), - ), - ) - - p = module.params - results = dict(files=[], jobs=[], invocation={}) - try: - l = LogMatcher(p.get('gearman_server'), - p.get('gearman_port'), - p.get('config'), - p.get('success'), - p.get('log_url'), - p.get('host_vars')) - files = l.findFiles(p['path']) - for f in files: - results['files'].append(f.toDict()) - for handle in l.submitJobs(p['job'], files): - results['jobs'].append(handle) - module.exit_json(**results) - except Exception: - tb = traceback.format_exc() - module.fail_json(msg='Unknown error', - details=tb, - **results) - - -if __name__ == '__main__': - main() diff --git a/roles/submit-log-processor-jobs/module_utils/gear.py b/roles/submit-log-processor-jobs/module_utils/gear.py deleted file mode 100644 index 44daf72..0000000 --- a/roles/submit-log-processor-jobs/module_utils/gear.py +++ /dev/null @@ -1,3526 +0,0 @@ -# Copyright 2013-2014 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import errno -import logging -import os -import select -import six -import socket -import ssl -import struct -import threading -import time -import uuid as uuid_module - -import ansible.module_utils.gear_constants as constants -from ansible.module_utils.gear_acl import ACLError, ACLEntry, ACL # noqa - -try: - import Queue as queue_mod -except ImportError: - import queue as queue_mod - -try: - import statsd -except ImportError: - statsd = None - -PRECEDENCE_NORMAL = 0 -PRECEDENCE_LOW = 1 -PRECEDENCE_HIGH = 2 - - -class ConnectionError(Exception): - pass - - -class InvalidDataError(Exception): - pass - - -class ConfigurationError(Exception): - pass - - -class NoConnectedServersError(Exception): - pass - - -class UnknownJobError(Exception): - pass - - -class InterruptedError(Exception): - pass - - -class TimeoutError(Exception): - pass - - -class GearmanError(Exception): - pass - - -class DisconnectError(Exception): - pass - - -class RetryIOError(Exception): - pass - - -def convert_to_bytes(data): - try: - data = data.encode('utf8') - except AttributeError: - pass - return data - - -class Task(object): - def __init__(self): - self._wait_event = threading.Event() - - def setComplete(self): - self._wait_event.set() - - def wait(self, timeout=None): - """Wait for a response from Gearman. - - :arg int timeout: If not None, return after this many seconds if no - response has been received (default: None). - """ - - self._wait_event.wait(timeout) - return self._wait_event.is_set() - - -class SubmitJobTask(Task): - def __init__(self, job): - super(SubmitJobTask, self).__init__() - self.job = job - - -class OptionReqTask(Task): - pass - - -class Connection(object): - """A Connection to a Gearman Server. - - :arg str client_id: The client ID associated with this connection. - It will be appending to the name of the logger (e.g., - gear.Connection.client_id). Defaults to 'unknown'. - :arg bool keepalive: Whether to use TCP keepalives - :arg int tcp_keepidle: Idle time after which to start keepalives sending - :arg int tcp_keepintvl: Interval in seconds between TCP keepalives - :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect - """ - - def __init__(self, host, port, ssl_key=None, ssl_cert=None, ssl_ca=None, - client_id='unknown', keepalive=False, tcp_keepidle=7200, - tcp_keepintvl=75, tcp_keepcnt=9): - self.log = logging.getLogger("gear.Connection.%s" % (client_id,)) - self.host = host - self.port = port - self.ssl_key = ssl_key - self.ssl_cert = ssl_cert - self.ssl_ca = ssl_ca - self.keepalive = keepalive - self.tcp_keepcnt = tcp_keepcnt - self.tcp_keepintvl = tcp_keepintvl - self.tcp_keepidle = tcp_keepidle - - self.use_ssl = False - if all([self.ssl_key, self.ssl_cert, self.ssl_ca]): - self.use_ssl = True - - self.input_buffer = b'' - self.need_bytes = False - self.echo_lock = threading.Lock() - self.send_lock = threading.Lock() - self._init() - - def _init(self): - self.conn = None - self.connected = False - self.connect_time = None - self.related_jobs = {} - self.pending_tasks = [] - self.admin_requests = [] - self.echo_conditions = {} - self.options = set() - self.changeState("INIT") - - def changeState(self, state): - # The state variables are provided as a convenience (and used by - # the Worker implementation). They aren't used or modified within - # the connection object itself except to reset to "INIT" immediately - # after reconnection. - self.log.debug("Setting state to: %s" % state) - self.state = state - self.state_time = time.time() - - def __repr__(self): - return '' % ( - id(self), self.host, self.port) - - def connect(self): - """Open a connection to the server. - - :raises ConnectionError: If unable to open the socket. - """ - - self.log.debug("Connecting to %s port %s" % (self.host, self.port)) - s = None - for res in socket.getaddrinfo(self.host, self.port, - socket.AF_UNSPEC, socket.SOCK_STREAM): - af, socktype, proto, canonname, sa = res - try: - s = socket.socket(af, socktype, proto) - if self.keepalive and hasattr(socket, 'TCP_KEEPIDLE'): - s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, - self.tcp_keepidle) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, - self.tcp_keepintvl) - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, - self.tcp_keepcnt) - elif self.keepalive: - self.log.warning('Keepalive requested but not available ' - 'on this platform') - except socket.error: - s = None - continue - - if self.use_ssl: - self.log.debug("Using SSL") - context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) - context.verify_mode = ssl.CERT_REQUIRED - context.check_hostname = False - context.load_cert_chain(self.ssl_cert, self.ssl_key) - context.load_verify_locations(self.ssl_ca) - s = context.wrap_socket(s, server_hostname=self.host) - - try: - s.connect(sa) - except socket.error: - s.close() - s = None - continue - break - if s is None: - self.log.debug("Error connecting to %s port %s" % ( - self.host, self.port)) - raise ConnectionError("Unable to open socket") - self.log.info("Connected to %s port %s" % (self.host, self.port)) - self.conn = s - self.connected = True - self.connect_time = time.time() - self.input_buffer = b'' - self.need_bytes = False - - def disconnect(self): - """Disconnect from the server and remove all associated state - data. - """ - - if self.conn: - try: - self.conn.close() - except Exception: - pass - - self.log.info("Disconnected from %s port %s" % (self.host, self.port)) - self._init() - - def reconnect(self): - """Disconnect from and reconnect to the server, removing all - associated state data. - """ - self.disconnect() - self.connect() - - def sendRaw(self, data): - """Send raw data over the socket. - - :arg bytes data The raw data to send - """ - with self.send_lock: - sent = 0 - while sent < len(data): - try: - sent += self.conn.send(data) - except ssl.SSLError as e: - if e.errno == ssl.SSL_ERROR_WANT_READ: - continue - elif e.errno == ssl.SSL_ERROR_WANT_WRITE: - continue - else: - raise - - def sendPacket(self, packet): - """Send a packet to the server. - - :arg Packet packet: The :py:class:`Packet` to send. - """ - self.log.info("Sending packet to %s: %s" % (self, packet)) - self.sendRaw(packet.toBinary()) - - def _getAdminRequest(self): - return self.admin_requests.pop(0) - - def _readRawBytes(self, bytes_to_read): - while True: - try: - buff = self.conn.recv(bytes_to_read) - except ssl.SSLError as e: - if e.errno == ssl.SSL_ERROR_WANT_READ: - continue - elif e.errno == ssl.SSL_ERROR_WANT_WRITE: - continue - else: - raise - break - return buff - - def _putAdminRequest(self, req): - self.admin_requests.insert(0, req) - - def readPacket(self): - """Read one packet or administrative response from the server. - - :returns: The :py:class:`Packet` or :py:class:`AdminRequest` read. - :rtype: :py:class:`Packet` or :py:class:`AdminRequest` - """ - # This handles non-blocking or blocking IO. - datalen = 0 - code = None - ptype = None - admin = None - admin_request = None - need_bytes = self.need_bytes - raw_bytes = self.input_buffer - try: - while True: - try: - if not raw_bytes or need_bytes: - segment = self._readRawBytes(4096) - if not segment: - # This occurs when the connection is closed. The - # the connect method will reset input_buffer and - # need_bytes for us. - return None - raw_bytes += segment - need_bytes = False - except RetryIOError: - if admin_request: - self._putAdminRequest(admin_request) - raise - if admin is None: - if raw_bytes[0:1] == b'\x00': - admin = False - else: - admin = True - admin_request = self._getAdminRequest() - if admin: - complete, remainder = admin_request.isComplete(raw_bytes) - if remainder is not None: - raw_bytes = remainder - if complete: - return admin_request - else: - length = len(raw_bytes) - if code is None and length >= 12: - code, ptype, datalen = struct.unpack('!4sii', - raw_bytes[:12]) - if length >= datalen + 12: - end = 12 + datalen - p = Packet(code, ptype, raw_bytes[12:end], - connection=self) - raw_bytes = raw_bytes[end:] - return p - # If we don't return a packet above then we need more data - need_bytes = True - finally: - self.input_buffer = raw_bytes - self.need_bytes = need_bytes - - def hasPendingData(self): - return self.input_buffer != b'' - - def sendAdminRequest(self, request, timeout=90): - """Send an administrative request to the server. - - :arg AdminRequest request: The :py:class:`AdminRequest` to send. - :arg numeric timeout: Number of seconds to wait until the response - is received. If None, wait forever (default: 90 seconds). - :raises TimeoutError: If the timeout is reached before the response - is received. - """ - self.admin_requests.append(request) - self.sendRaw(request.getCommand()) - complete = request.waitForResponse(timeout) - if not complete: - raise TimeoutError() - - def echo(self, data=None, timeout=30): - """Perform an echo test on the server. - - This method waits until the echo response has been received or the - timeout has been reached. - - :arg bytes data: The data to request be echoed. If None, a random - unique byte string will be generated. - :arg numeric timeout: Number of seconds to wait until the response - is received. If None, wait forever (default: 30 seconds). - :raises TimeoutError: If the timeout is reached before the response - is received. - """ - if data is None: - data = uuid_module.uuid4().hex.encode('utf8') - self.echo_lock.acquire() - try: - if data in self.echo_conditions: - raise InvalidDataError("This client is already waiting on an " - "echo response of: %s" % data) - condition = threading.Condition() - self.echo_conditions[data] = condition - finally: - self.echo_lock.release() - - self.sendEchoReq(data) - - condition.acquire() - condition.wait(timeout) - condition.release() - - if data in self.echo_conditions: - return data - raise TimeoutError() - - def sendEchoReq(self, data): - p = Packet(constants.REQ, constants.ECHO_REQ, data) - self.sendPacket(p) - - def handleEchoRes(self, data): - condition = None - self.echo_lock.acquire() - try: - condition = self.echo_conditions.get(data) - if condition: - del self.echo_conditions[data] - finally: - self.echo_lock.release() - - if not condition: - return False - condition.notifyAll() - return True - - def handleOptionRes(self, option): - self.options.add(option) - - -class AdminRequest(object): - """Encapsulates a request (and response) sent over the - administrative protocol. This is a base class that may not be - instantiated dircectly; a subclass implementing a specific command - must be used instead. - - :arg list arguments: A list of byte string arguments for the command. - - The following instance attributes are available: - - **response** (bytes) - The response from the server. - **arguments** (bytes) - The argument supplied with the constructor. - **command** (bytes) - The administrative command. - """ - - command = None - arguments = [] - response = None - _complete_position = 0 - - def __init__(self, *arguments): - self.wait_event = threading.Event() - self.arguments = arguments - if type(self) == AdminRequest: - raise NotImplementedError("AdminRequest must be subclassed") - - def __repr__(self): - return '' % ( - id(self), self.command) - - def getCommand(self): - cmd = self.command - if self.arguments: - cmd += b' ' + b' '.join(self.arguments) - cmd += b'\n' - return cmd - - def isComplete(self, data): - x = -1 - start = self._complete_position - start = max(self._complete_position - 4, 0) - end_index_newline = data.find(b'\n.\n', start) - end_index_return = data.find(b'\r\n.\r\n', start) - if end_index_newline != -1: - x = end_index_newline + 3 - elif end_index_return != -1: - x = end_index_return + 5 - elif data.startswith(b'.\n'): - x = 2 - elif data.startswith(b'.\r\n'): - x = 3 - self._complete_position = len(data) - if x != -1: - self.response = data[:x] - return (True, data[x:]) - else: - return (False, None) - - def setComplete(self): - self.wait_event.set() - - def waitForResponse(self, timeout=None): - self.wait_event.wait(timeout) - return self.wait_event.is_set() - - -class StatusAdminRequest(AdminRequest): - """A "status" administrative request. - - The response from gearman may be found in the **response** attribute. - """ - command = b'status' - - def __init__(self): - super(StatusAdminRequest, self).__init__() - - -class ShowJobsAdminRequest(AdminRequest): - """A "show jobs" administrative request. - - The response from gearman may be found in the **response** attribute. - """ - command = b'show jobs' - - def __init__(self): - super(ShowJobsAdminRequest, self).__init__() - - -class ShowUniqueJobsAdminRequest(AdminRequest): - """A "show unique jobs" administrative request. - - The response from gearman may be found in the **response** attribute. - """ - - command = b'show unique jobs' - - def __init__(self): - super(ShowUniqueJobsAdminRequest, self).__init__() - - -class CancelJobAdminRequest(AdminRequest): - """A "cancel job" administrative request. - - :arg str handle: The job handle to be canceled. - - The response from gearman may be found in the **response** attribute. - """ - - command = b'cancel job' - - def __init__(self, handle): - handle = convert_to_bytes(handle) - super(CancelJobAdminRequest, self).__init__(handle) - - def isComplete(self, data): - end_index_newline = data.find(b'\n') - if end_index_newline != -1: - x = end_index_newline + 1 - self.response = data[:x] - return (True, data[x:]) - else: - return (False, None) - - -class VersionAdminRequest(AdminRequest): - """A "version" administrative request. - - The response from gearman may be found in the **response** attribute. - """ - - command = b'version' - - def __init__(self): - super(VersionAdminRequest, self).__init__() - - def isComplete(self, data): - end_index_newline = data.find(b'\n') - if end_index_newline != -1: - x = end_index_newline + 1 - self.response = data[:x] - return (True, data[x:]) - else: - return (False, None) - - -class WorkersAdminRequest(AdminRequest): - """A "workers" administrative request. - - The response from gearman may be found in the **response** attribute. - """ - command = b'workers' - - def __init__(self): - super(WorkersAdminRequest, self).__init__() - - -class Packet(object): - """A data packet received from or to be sent over a - :py:class:`Connection`. - - :arg bytes code: The Gearman magic code (:py:data:`constants.REQ` or - :py:data:`constants.RES`) - :arg bytes ptype: The packet type (one of the packet types in - constants). - :arg bytes data: The data portion of the packet. - :arg Connection connection: The connection on which the packet - was received (optional). - :raises InvalidDataError: If the magic code is unknown. - """ - - def __init__(self, code, ptype, data, connection=None): - if not isinstance(code, bytes) and not isinstance(code, bytearray): - raise TypeError("code must be of type bytes or bytearray") - if code[0:1] != b'\x00': - raise InvalidDataError("First byte of packet must be 0") - self.code = code - self.ptype = ptype - if not isinstance(data, bytes) and not isinstance(data, bytearray): - raise TypeError("data must be of type bytes or bytearray") - self.data = data - self.connection = connection - - def __repr__(self): - ptype = constants.types.get(self.ptype, 'UNKNOWN') - try: - extra = self._formatExtraData() - except Exception: - extra = '' - return '' % (id(self), ptype, extra) - - def __eq__(self, other): - if not isinstance(other, Packet): - return False - if (self.code == other.code and - self.ptype == other.ptype and - self.data == other.data): - return True - return False - - def __ne__(self, other): - return not self.__eq__(other) - - def _formatExtraData(self): - if self.ptype in [constants.JOB_CREATED, - constants.JOB_ASSIGN, - constants.GET_STATUS, - constants.STATUS_RES, - constants.WORK_STATUS, - constants.WORK_COMPLETE, - constants.WORK_FAIL, - constants.WORK_EXCEPTION, - constants.WORK_DATA, - constants.WORK_WARNING]: - return ' handle: %s' % self.getArgument(0) - - if self.ptype == constants.JOB_ASSIGN_UNIQ: - return (' handle: %s function: %s unique: %s' % - (self.getArgument(0), - self.getArgument(1), - self.getArgument(2))) - - if self.ptype in [constants.SUBMIT_JOB, - constants.SUBMIT_JOB_BG, - constants.SUBMIT_JOB_HIGH, - constants.SUBMIT_JOB_HIGH_BG, - constants.SUBMIT_JOB_LOW, - constants.SUBMIT_JOB_LOW_BG, - constants.SUBMIT_JOB_SCHED, - constants.SUBMIT_JOB_EPOCH]: - return ' function: %s unique: %s' % (self.getArgument(0), - self.getArgument(1)) - - if self.ptype in [constants.CAN_DO, - constants.CANT_DO, - constants.CAN_DO_TIMEOUT]: - return ' function: %s' % (self.getArgument(0),) - - if self.ptype == constants.SET_CLIENT_ID: - return ' id: %s' % (self.getArgument(0),) - - if self.ptype in [constants.OPTION_REQ, - constants.OPTION_RES]: - return ' option: %s' % (self.getArgument(0),) - - if self.ptype == constants.ERROR: - return ' code: %s message: %s' % (self.getArgument(0), - self.getArgument(1)) - return '' - - def toBinary(self): - """Return a Gearman wire protocol binary representation of the packet. - - :returns: The packet in binary form. - :rtype: bytes - """ - b = struct.pack('!4sii', self.code, self.ptype, len(self.data)) - b = bytearray(b) - b += self.data - return b - - def getArgument(self, index, last=False): - """Get the nth argument from the packet data. - - :arg int index: The argument index to look up. - :arg bool last: Whether this is the last argument (and thus - nulls should be ignored) - :returns: The argument value. - :rtype: bytes - """ - - parts = self.data.split(b'\x00') - if not last: - return parts[index] - return b'\x00'.join(parts[index:]) - - def getJob(self): - """Get the :py:class:`Job` associated with the job handle in - this packet. - - :returns: The :py:class:`Job` for this packet. - :rtype: Job - :raises UnknownJobError: If the job is not known. - """ - handle = self.getArgument(0) - job = self.connection.related_jobs.get(handle) - if not job: - raise UnknownJobError() - return job - - -class BaseClientServer(object): - def __init__(self, client_id=None): - if client_id: - self.client_id = convert_to_bytes(client_id) - self.log = logging.getLogger("gear.BaseClientServer.%s" % - (self.client_id,)) - else: - self.client_id = None - self.log = logging.getLogger("gear.BaseClientServer") - self.running = True - self.active_connections = [] - self.inactive_connections = [] - - self.connection_index = -1 - # A lock and notification mechanism to handle not having any - # current connections - self.connections_condition = threading.Condition() - - # A pipe to wake up the poll loop in case it needs to restart - self.wake_read, self.wake_write = os.pipe() - - self.poll_thread = threading.Thread(name="Gearman client poll", - target=self._doPollLoop) - self.poll_thread.daemon = True - self.poll_thread.start() - self.connect_thread = threading.Thread(name="Gearman client connect", - target=self._doConnectLoop) - self.connect_thread.daemon = True - self.connect_thread.start() - - def _doConnectLoop(self): - # Outer run method of the reconnection thread - while self.running: - self.connections_condition.acquire() - while self.running and not self.inactive_connections: - self.log.debug("Waiting for change in available servers " - "to reconnect") - self.connections_condition.wait() - self.connections_condition.release() - self.log.debug("Checking if servers need to be reconnected") - try: - if self.running and not self._connectLoop(): - # Nothing happened - time.sleep(2) - except Exception: - self.log.exception("Exception in connect loop:") - - def _connectLoop(self): - # Inner method of the reconnection loop, triggered by - # a connection change - success = False - for conn in self.inactive_connections[:]: - self.log.debug("Trying to reconnect %s" % conn) - try: - conn.reconnect() - except ConnectionError: - self.log.debug("Unable to connect to %s" % conn) - continue - except Exception: - self.log.exception("Exception while connecting to %s" % conn) - continue - - try: - self._onConnect(conn) - except Exception: - self.log.exception("Exception while performing on-connect " - "tasks for %s" % conn) - continue - self.connections_condition.acquire() - self.inactive_connections.remove(conn) - self.active_connections.append(conn) - self.connections_condition.notifyAll() - os.write(self.wake_write, b'1\n') - self.connections_condition.release() - - try: - self._onActiveConnection(conn) - except Exception: - self.log.exception("Exception while performing active conn " - "tasks for %s" % conn) - - success = True - return success - - def _onConnect(self, conn): - # Called immediately after a successful (re-)connection - pass - - def _onActiveConnection(self, conn): - # Called immediately after a connection is activated - pass - - def _lostConnection(self, conn): - # Called as soon as a connection is detected as faulty. Remove - # it and return ASAP and let the connection thread deal with it. - self.log.debug("Marking %s as disconnected" % conn) - self.connections_condition.acquire() - try: - # NOTE(notmorgan): In the loop below it is possible to change the - # jobs list on the connection. In python 3 .values() is an iter not - # a static list, meaning that a change will break the for loop - # as the object being iterated on will have changed in size. - jobs = list(conn.related_jobs.values()) - if conn in self.active_connections: - self.active_connections.remove(conn) - if conn not in self.inactive_connections: - self.inactive_connections.append(conn) - finally: - self.connections_condition.notifyAll() - self.connections_condition.release() - for job in jobs: - self.handleDisconnect(job) - - def _doPollLoop(self): - # Outer run method of poll thread. - while self.running: - self.connections_condition.acquire() - while self.running and not self.active_connections: - self.log.debug("Waiting for change in available connections " - "to poll") - self.connections_condition.wait() - self.connections_condition.release() - try: - self._pollLoop() - except socket.error as e: - if e.errno == errno.ECONNRESET: - self.log.debug("Connection reset by peer") - # This will get logged later at info level as - # "Marking ... as disconnected" - except Exception: - self.log.exception("Exception in poll loop:") - - def _pollLoop(self): - # Inner method of poll loop - self.log.debug("Preparing to poll") - poll = select.poll() - bitmask = (select.POLLIN | select.POLLERR | - select.POLLHUP | select.POLLNVAL) - # Reverse mapping of fd -> connection - conn_dict = {} - for conn in self.active_connections: - poll.register(conn.conn.fileno(), bitmask) - conn_dict[conn.conn.fileno()] = conn - # Register the wake pipe so that we can break if we need to - # reconfigure connections - poll.register(self.wake_read, bitmask) - while self.running: - self.log.debug("Polling %s connections" % - len(self.active_connections)) - ret = poll.poll() - for fd, event in ret: - if fd == self.wake_read: - self.log.debug("Woken by pipe") - while True: - if os.read(self.wake_read, 1) == b'\n': - break - return - conn = conn_dict[fd] - if event & select.POLLIN: - # Process all packets that may have been read in this - # round of recv's by readPacket. - while True: - self.log.debug("Processing input on %s" % conn) - p = conn.readPacket() - if p: - if isinstance(p, Packet): - self.handlePacket(p) - else: - self.handleAdminRequest(p) - else: - self.log.debug("Received no data on %s" % conn) - self._lostConnection(conn) - return - if not conn.hasPendingData(): - break - else: - self.log.debug("Received error event on %s" % conn) - self._lostConnection(conn) - return - - def handlePacket(self, packet): - """Handle a received packet. - - This method is called whenever a packet is received from any - connection. It normally calls the handle method appropriate - for the specific packet. - - :arg Packet packet: The :py:class:`Packet` that was received. - """ - - self.log.info("Received packet from %s: %s" % (packet.connection, - packet)) - start = time.time() - if packet.ptype == constants.JOB_CREATED: - self.handleJobCreated(packet) - elif packet.ptype == constants.WORK_COMPLETE: - self.handleWorkComplete(packet) - elif packet.ptype == constants.WORK_FAIL: - self.handleWorkFail(packet) - elif packet.ptype == constants.WORK_EXCEPTION: - self.handleWorkException(packet) - elif packet.ptype == constants.WORK_DATA: - self.handleWorkData(packet) - elif packet.ptype == constants.WORK_WARNING: - self.handleWorkWarning(packet) - elif packet.ptype == constants.WORK_STATUS: - self.handleWorkStatus(packet) - elif packet.ptype == constants.STATUS_RES: - self.handleStatusRes(packet) - elif packet.ptype == constants.GET_STATUS: - self.handleGetStatus(packet) - elif packet.ptype == constants.JOB_ASSIGN_UNIQ: - self.handleJobAssignUnique(packet) - elif packet.ptype == constants.JOB_ASSIGN: - self.handleJobAssign(packet) - elif packet.ptype == constants.NO_JOB: - self.handleNoJob(packet) - elif packet.ptype == constants.NOOP: - self.handleNoop(packet) - elif packet.ptype == constants.SUBMIT_JOB: - self.handleSubmitJob(packet) - elif packet.ptype == constants.SUBMIT_JOB_BG: - self.handleSubmitJobBg(packet) - elif packet.ptype == constants.SUBMIT_JOB_HIGH: - self.handleSubmitJobHigh(packet) - elif packet.ptype == constants.SUBMIT_JOB_HIGH_BG: - self.handleSubmitJobHighBg(packet) - elif packet.ptype == constants.SUBMIT_JOB_LOW: - self.handleSubmitJobLow(packet) - elif packet.ptype == constants.SUBMIT_JOB_LOW_BG: - self.handleSubmitJobLowBg(packet) - elif packet.ptype == constants.SUBMIT_JOB_SCHED: - self.handleSubmitJobSched(packet) - elif packet.ptype == constants.SUBMIT_JOB_EPOCH: - self.handleSubmitJobEpoch(packet) - elif packet.ptype == constants.GRAB_JOB_UNIQ: - self.handleGrabJobUniq(packet) - elif packet.ptype == constants.GRAB_JOB: - self.handleGrabJob(packet) - elif packet.ptype == constants.PRE_SLEEP: - self.handlePreSleep(packet) - elif packet.ptype == constants.SET_CLIENT_ID: - self.handleSetClientID(packet) - elif packet.ptype == constants.CAN_DO: - self.handleCanDo(packet) - elif packet.ptype == constants.CAN_DO_TIMEOUT: - self.handleCanDoTimeout(packet) - elif packet.ptype == constants.CANT_DO: - self.handleCantDo(packet) - elif packet.ptype == constants.RESET_ABILITIES: - self.handleResetAbilities(packet) - elif packet.ptype == constants.ECHO_REQ: - self.handleEchoReq(packet) - elif packet.ptype == constants.ECHO_RES: - self.handleEchoRes(packet) - elif packet.ptype == constants.ERROR: - self.handleError(packet) - elif packet.ptype == constants.ALL_YOURS: - self.handleAllYours(packet) - elif packet.ptype == constants.OPTION_REQ: - self.handleOptionReq(packet) - elif packet.ptype == constants.OPTION_RES: - self.handleOptionRes(packet) - else: - self.log.error("Received unknown packet: %s" % packet) - end = time.time() - self.reportTimingStats(packet.ptype, end - start) - - def reportTimingStats(self, ptype, duration): - """Report processing times by packet type - - This method is called by handlePacket to report how long - processing took for each packet. The default implementation - does nothing. - - :arg bytes ptype: The packet type (one of the packet types in - constants). - :arg float duration: The time (in seconds) it took to process - the packet. - """ - pass - - def _defaultPacketHandler(self, packet): - self.log.error("Received unhandled packet: %s" % packet) - - def handleJobCreated(self, packet): - return self._defaultPacketHandler(packet) - - def handleWorkComplete(self, packet): - return self._defaultPacketHandler(packet) - - def handleWorkFail(self, packet): - return self._defaultPacketHandler(packet) - - def handleWorkException(self, packet): - return self._defaultPacketHandler(packet) - - def handleWorkData(self, packet): - return self._defaultPacketHandler(packet) - - def handleWorkWarning(self, packet): - return self._defaultPacketHandler(packet) - - def handleWorkStatus(self, packet): - return self._defaultPacketHandler(packet) - - def handleStatusRes(self, packet): - return self._defaultPacketHandler(packet) - - def handleGetStatus(self, packet): - return self._defaultPacketHandler(packet) - - def handleJobAssignUnique(self, packet): - return self._defaultPacketHandler(packet) - - def handleJobAssign(self, packet): - return self._defaultPacketHandler(packet) - - def handleNoJob(self, packet): - return self._defaultPacketHandler(packet) - - def handleNoop(self, packet): - return self._defaultPacketHandler(packet) - - def handleSubmitJob(self, packet): - return self._defaultPacketHandler(packet) - - def handleSubmitJobBg(self, packet): - return self._defaultPacketHandler(packet) - - def handleSubmitJobHigh(self, packet): - return self._defaultPacketHandler(packet) - - def handleSubmitJobHighBg(self, packet): - return self._defaultPacketHandler(packet) - - def handleSubmitJobLow(self, packet): - return self._defaultPacketHandler(packet) - - def handleSubmitJobLowBg(self, packet): - return self._defaultPacketHandler(packet) - - def handleSubmitJobSched(self, packet): - return self._defaultPacketHandler(packet) - - def handleSubmitJobEpoch(self, packet): - return self._defaultPacketHandler(packet) - - def handleGrabJobUniq(self, packet): - return self._defaultPacketHandler(packet) - - def handleGrabJob(self, packet): - return self._defaultPacketHandler(packet) - - def handlePreSleep(self, packet): - return self._defaultPacketHandler(packet) - - def handleSetClientID(self, packet): - return self._defaultPacketHandler(packet) - - def handleCanDo(self, packet): - return self._defaultPacketHandler(packet) - - def handleCanDoTimeout(self, packet): - return self._defaultPacketHandler(packet) - - def handleCantDo(self, packet): - return self._defaultPacketHandler(packet) - - def handleResetAbilities(self, packet): - return self._defaultPacketHandler(packet) - - def handleEchoReq(self, packet): - return self._defaultPacketHandler(packet) - - def handleEchoRes(self, packet): - return self._defaultPacketHandler(packet) - - def handleError(self, packet): - return self._defaultPacketHandler(packet) - - def handleAllYours(self, packet): - return self._defaultPacketHandler(packet) - - def handleOptionReq(self, packet): - return self._defaultPacketHandler(packet) - - def handleOptionRes(self, packet): - return self._defaultPacketHandler(packet) - - def handleAdminRequest(self, request): - """Handle an administrative command response from Gearman. - - This method is called whenever a response to a previously - issued administrative command is received from one of this - client's connections. It normally releases the wait lock on - the initiating AdminRequest object. - - :arg AdminRequest request: The :py:class:`AdminRequest` that - initiated the received response. - """ - - self.log.info("Received admin data %s" % request) - request.setComplete() - - def shutdown(self): - """Close all connections and stop all running threads. - - The object may no longer be used after shutdown is called. - """ - if self.running: - self.log.debug("Beginning shutdown") - self._shutdown() - self.log.debug("Beginning cleanup") - self._cleanup() - self.log.debug("Finished shutdown") - else: - self.log.warning("Shutdown called when not currently running. " - "Ignoring.") - - def _shutdown(self): - # The first part of the shutdown process where all threads - # are told to exit. - self.running = False - self.connections_condition.acquire() - try: - self.connections_condition.notifyAll() - os.write(self.wake_write, b'1\n') - finally: - self.connections_condition.release() - - def _cleanup(self): - # The second part of the shutdown process where we wait for all - # threads to exit and then clean up. - self.poll_thread.join() - self.connect_thread.join() - for connection in self.active_connections: - connection.disconnect() - self.active_connections = [] - self.inactive_connections = [] - os.close(self.wake_read) - os.close(self.wake_write) - - -class BaseClient(BaseClientServer): - def __init__(self, client_id='unknown'): - super(BaseClient, self).__init__(client_id) - self.log = logging.getLogger("gear.BaseClient.%s" % (self.client_id,)) - # A lock to use when sending packets that set the state across - # all known connections. Note that it doesn't necessarily need - # to be used for all broadcasts, only those that affect multi- - # connection state, such as setting options or functions. - self.broadcast_lock = threading.RLock() - - def addServer(self, host, port=4730, - ssl_key=None, ssl_cert=None, ssl_ca=None, - keepalive=False, tcp_keepidle=7200, tcp_keepintvl=75, - tcp_keepcnt=9): - """Add a server to the client's connection pool. - - Any number of Gearman servers may be added to a client. The - client will connect to all of them and send jobs to them in a - round-robin fashion. When servers are disconnected, the - client will automatically remove them from the pool, - continuously try to reconnect to them, and return them to the - pool when reconnected. New servers may be added at any time. - - This is a non-blocking call that will return regardless of - whether the initial connection succeeded. If you need to - ensure that a connection is ready before proceeding, see - :py:meth:`waitForServer`. - - When using SSL connections, all SSL files must be specified. - - :arg str host: The hostname or IP address of the server. - :arg int port: The port on which the gearman server is listening. - :arg str ssl_key: Path to the SSL private key. - :arg str ssl_cert: Path to the SSL certificate. - :arg str ssl_ca: Path to the CA certificate. - :arg bool keepalive: Whether to use TCP keepalives - :arg int tcp_keepidle: Idle time after which to start keepalives - sending - :arg int tcp_keepintvl: Interval in seconds between TCP keepalives - :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect - :raises ConfigurationError: If the host/port combination has - already been added to the client. - """ - - self.log.debug("Adding server %s port %s" % (host, port)) - - self.connections_condition.acquire() - try: - for conn in self.active_connections + self.inactive_connections: - if conn.host == host and conn.port == port: - raise ConfigurationError("Host/port already specified") - conn = Connection(host, port, ssl_key, ssl_cert, ssl_ca, - self.client_id, keepalive, tcp_keepidle, - tcp_keepintvl, tcp_keepcnt) - self.inactive_connections.append(conn) - self.connections_condition.notifyAll() - finally: - self.connections_condition.release() - - def _checkTimeout(self, start_time, timeout): - if time.time() - start_time > timeout: - raise TimeoutError() - - def waitForServer(self, timeout=None): - """Wait for at least one server to be connected. - - Block until at least one gearman server is connected. - - :arg numeric timeout: Number of seconds to wait for a connection. - If None, wait forever (default: no timeout). - :raises TimeoutError: If the timeout is reached before any server - connects. - """ - - connected = False - start_time = time.time() - while self.running: - self.connections_condition.acquire() - while self.running and not self.active_connections: - if timeout is not None: - self._checkTimeout(start_time, timeout) - self.log.debug("Waiting for at least one active connection") - self.connections_condition.wait(timeout=1) - if self.active_connections: - self.log.debug("Active connection found") - connected = True - self.connections_condition.release() - if connected: - return - - def getConnection(self): - """Return a connected server. - - Finds the next scheduled connected server in the round-robin - rotation and returns it. It is not usually necessary to use - this method external to the library, as more consumer-oriented - methods such as submitJob already use it internally, but is - available nonetheless if necessary. - - :returns: The next scheduled :py:class:`Connection` object. - :rtype: :py:class:`Connection` - :raises NoConnectedServersError: If there are not currently - connected servers. - """ - - conn = None - try: - self.connections_condition.acquire() - if not self.active_connections: - raise NoConnectedServersError("No connected Gearman servers") - - self.connection_index += 1 - if self.connection_index >= len(self.active_connections): - self.connection_index = 0 - conn = self.active_connections[self.connection_index] - finally: - self.connections_condition.release() - return conn - - def broadcast(self, packet): - """Send a packet to all currently connected servers. - - :arg Packet packet: The :py:class:`Packet` to send. - """ - connections = self.active_connections[:] - for connection in connections: - try: - self.sendPacket(packet, connection) - except Exception: - # Error handling is all done by sendPacket - pass - - def sendPacket(self, packet, connection): - """Send a packet to a single connection, removing it from the - list of active connections if that fails. - - :arg Packet packet: The :py:class:`Packet` to send. - :arg Connection connection: The :py:class:`Connection` on - which to send the packet. - """ - try: - connection.sendPacket(packet) - return - except Exception: - self.log.exception("Exception while sending packet %s to %s" % - (packet, connection)) - # If we can't send the packet, discard the connection - self._lostConnection(connection) - raise - - def handleEchoRes(self, packet): - """Handle an ECHO_RES packet. - - Causes the blocking :py:meth:`Connection.echo` invocation to - return. - - :arg Packet packet: The :py:class:`Packet` that was received. - :returns: None - """ - packet.connection.handleEchoRes(packet.getArgument(0, True)) - - def handleError(self, packet): - """Handle an ERROR packet. - - Logs the error. - - :arg Packet packet: The :py:class:`Packet` that was received. - :returns: None - """ - self.log.error("Received ERROR packet: %s: %s" % - (packet.getArgument(0), - packet.getArgument(1))) - try: - task = packet.connection.pending_tasks.pop(0) - task.setComplete() - except Exception: - self.log.exception("Exception while handling error packet:") - self._lostConnection(packet.connection) - - -class Client(BaseClient): - """A Gearman client. - - You may wish to subclass this class in order to override the - default event handlers to react to Gearman events. Be sure to - call the superclass event handlers so that they may perform - job-related housekeeping. - - :arg str client_id: The client ID to provide to Gearman. It will - appear in administrative output and be appended to the name of - the logger (e.g., gear.Client.client_id). Defaults to - 'unknown'. - """ - - def __init__(self, client_id='unknown'): - super(Client, self).__init__(client_id) - self.log = logging.getLogger("gear.Client.%s" % (self.client_id,)) - self.options = set() - - def __repr__(self): - return '' % id(self) - - def _onConnect(self, conn): - # Called immediately after a successful (re-)connection - self.broadcast_lock.acquire() - try: - super(Client, self)._onConnect(conn) - for name in self.options: - self._setOptionConnection(name, conn) - finally: - self.broadcast_lock.release() - - def _setOptionConnection(self, name, conn): - # Set an option on a connection - packet = Packet(constants.REQ, constants.OPTION_REQ, name) - task = OptionReqTask() - try: - conn.pending_tasks.append(task) - self.sendPacket(packet, conn) - except Exception: - # Error handling is all done by sendPacket - task = None - return task - - def setOption(self, name, timeout=30): - """Set an option for all connections. - - :arg str name: The option name to set. - :arg int timeout: How long to wait (in seconds) for a response - from the server before giving up (default: 30 seconds). - :returns: True if the option was set on all connections, - otherwise False - :rtype: bool - """ - tasks = {} - name = convert_to_bytes(name) - self.broadcast_lock.acquire() - - try: - self.options.add(name) - connections = self.active_connections[:] - for connection in connections: - task = self._setOptionConnection(name, connection) - if task: - tasks[task] = connection - finally: - self.broadcast_lock.release() - - success = True - for task in tasks.keys(): - complete = task.wait(timeout) - conn = tasks[task] - if not complete: - self.log.error("Connection %s timed out waiting for a " - "response to an option request: %s" % - (conn, name)) - self._lostConnection(conn) - continue - if name not in conn.options: - success = False - return success - - def submitJob(self, job, background=False, precedence=PRECEDENCE_NORMAL, - timeout=30): - """Submit a job to a Gearman server. - - Submits the provided job to the next server in this client's - round-robin connection pool. - - If the job is a foreground job, updates will be made to the - supplied :py:class:`Job` object as they are received. - - :arg Job job: The :py:class:`Job` to submit. - :arg bool background: Whether the job should be backgrounded. - :arg int precedence: Whether the job should have normal, low, or - high precedence. One of :py:data:`PRECEDENCE_NORMAL`, - :py:data:`PRECEDENCE_LOW`, or :py:data:`PRECEDENCE_HIGH` - :arg int timeout: How long to wait (in seconds) for a response - from the server before giving up (default: 30 seconds). - :raises ConfigurationError: If an invalid precendence value - is supplied. - """ - if job.unique is None: - unique = b'' - else: - unique = job.binary_unique - data = b'\x00'.join((job.binary_name, unique, job.binary_arguments)) - if background: - if precedence == PRECEDENCE_NORMAL: - cmd = constants.SUBMIT_JOB_BG - elif precedence == PRECEDENCE_LOW: - cmd = constants.SUBMIT_JOB_LOW_BG - elif precedence == PRECEDENCE_HIGH: - cmd = constants.SUBMIT_JOB_HIGH_BG - else: - raise ConfigurationError("Invalid precedence value") - else: - if precedence == PRECEDENCE_NORMAL: - cmd = constants.SUBMIT_JOB - elif precedence == PRECEDENCE_LOW: - cmd = constants.SUBMIT_JOB_LOW - elif precedence == PRECEDENCE_HIGH: - cmd = constants.SUBMIT_JOB_HIGH - else: - raise ConfigurationError("Invalid precedence value") - packet = Packet(constants.REQ, cmd, data) - attempted_connections = set() - while True: - if attempted_connections == set(self.active_connections): - break - conn = self.getConnection() - task = SubmitJobTask(job) - conn.pending_tasks.append(task) - attempted_connections.add(conn) - try: - self.sendPacket(packet, conn) - except Exception: - # Error handling is all done by sendPacket - continue - complete = task.wait(timeout) - if not complete: - self.log.error("Connection %s timed out waiting for a " - "response to a submit job request: %s" % - (conn, job)) - self._lostConnection(conn) - continue - if not job.handle: - self.log.error("Connection %s sent an error in " - "response to a submit job request: %s" % - (conn, job)) - continue - job.connection = conn - return - raise GearmanError("Unable to submit job to any connected servers") - - def handleJobCreated(self, packet): - """Handle a JOB_CREATED packet. - - Updates the appropriate :py:class:`Job` with the newly - returned job handle. - - :arg Packet packet: The :py:class:`Packet` that was received. - :returns: The :py:class:`Job` object associated with the job request. - :rtype: :py:class:`Job` - """ - task = packet.connection.pending_tasks.pop(0) - if not isinstance(task, SubmitJobTask): - msg = ("Unexpected response received to submit job " - "request: %s" % packet) - self.log.error(msg) - self._lostConnection(packet.connection) - raise GearmanError(msg) - - job = task.job - job.handle = packet.data - packet.connection.related_jobs[job.handle] = job - task.setComplete() - self.log.debug("Job created; %s" % job) - return job - - def handleWorkComplete(self, packet): - """Handle a WORK_COMPLETE packet. - - Updates the referenced :py:class:`Job` with the returned data - and removes it from the list of jobs associated with the - connection. - - :arg Packet packet: The :py:class:`Packet` that was received. - :returns: The :py:class:`Job` object associated with the job request. - :rtype: :py:class:`Job` - """ - - job = packet.getJob() - data = packet.getArgument(1, True) - if data: - job.data.append(data) - job.complete = True - job.failure = False - del packet.connection.related_jobs[job.handle] - self.log.debug("Job complete; %s data: %s" % - (job, job.data)) - return job - - def handleWorkFail(self, packet): - """Handle a WORK_FAIL packet. - - Updates the referenced :py:class:`Job` with the returned data - and removes it from the list of jobs associated with the - connection. - - :arg Packet packet: The :py:class:`Packet` that was received. - :returns: The :py:class:`Job` object associated with the job request. - :rtype: :py:class:`Job` - """ - - job = packet.getJob() - job.complete = True - job.failure = True - del packet.connection.related_jobs[job.handle] - self.log.debug("Job failed; %s" % job) - return job - - def handleWorkException(self, packet): - """Handle a WORK_Exception packet. - - Updates the referenced :py:class:`Job` with the returned data - and removes it from the list of jobs associated with the - connection. - - :arg Packet packet: The :py:class:`Packet` that was received. - :returns: The :py:class:`Job` object associated with the job request. - :rtype: :py:class:`Job` - """ - - job = packet.getJob() - job.exception = packet.getArgument(1, True) - job.complete = True - job.failure = True - del packet.connection.related_jobs[job.handle] - self.log.debug("Job exception; %s exception: %s" % - (job, job.exception)) - return job - - def handleWorkData(self, packet): - """Handle a WORK_DATA packet. - - Updates the referenced :py:class:`Job` with the returned data. - - :arg Packet packet: The :py:class:`Packet` that was received. - :returns: The :py:class:`Job` object associated with the job request. - :rtype: :py:class:`Job` - """ - - job = packet.getJob() - data = packet.getArgument(1, True) - if data: - job.data.append(data) - self.log.debug("Job data; job: %s data: %s" % - (job, job.data)) - return job - - def handleWorkWarning(self, packet): - """Handle a WORK_WARNING packet. - - Updates the referenced :py:class:`Job` with the returned data. - - :arg Packet packet: The :py:class:`Packet` that was received. - :returns: The :py:class:`Job` object associated with the job request. - :rtype: :py:class:`Job` - """ - - job = packet.getJob() - data = packet.getArgument(1, True) - if data: - job.data.append(data) - job.warning = True - self.log.debug("Job warning; %s data: %s" % - (job, job.data)) - return job - - def handleWorkStatus(self, packet): - """Handle a WORK_STATUS packet. - - Updates the referenced :py:class:`Job` with the returned data. - - :arg Packet packet: The :py:class:`Packet` that was received. - :returns: The :py:class:`Job` object associated with the job request. - :rtype: :py:class:`Job` - """ - - job = packet.getJob() - job.numerator = packet.getArgument(1) - job.denominator = packet.getArgument(2) - try: - job.fraction_complete = (float(job.numerator) / - float(job.denominator)) - except Exception: - job.fraction_complete = None - self.log.debug("Job status; %s complete: %s/%s" % - (job, job.numerator, job.denominator)) - return job - - def handleStatusRes(self, packet): - """Handle a STATUS_RES packet. - - Updates the referenced :py:class:`Job` with the returned data. - - :arg Packet packet: The :py:class:`Packet` that was received. - :returns: The :py:class:`Job` object associated with the job request. - :rtype: :py:class:`Job` - """ - - job = packet.getJob() - job.known = (packet.getArgument(1) == b'1') - job.running = (packet.getArgument(2) == b'1') - job.numerator = packet.getArgument(3) - job.denominator = packet.getArgument(4) - - try: - job.fraction_complete = (float(job.numerator) / - float(job.denominator)) - except Exception: - job.fraction_complete = None - return job - - def handleOptionRes(self, packet): - """Handle an OPTION_RES packet. - - Updates the set of options for the connection. - - :arg Packet packet: The :py:class:`Packet` that was received. - :returns: None. - """ - task = packet.connection.pending_tasks.pop(0) - if not isinstance(task, OptionReqTask): - msg = ("Unexpected response received to option " - "request: %s" % packet) - self.log.error(msg) - self._lostConnection(packet.connection) - raise GearmanError(msg) - - packet.connection.handleOptionRes(packet.getArgument(0)) - task.setComplete() - - def handleDisconnect(self, job): - """Handle a Gearman server disconnection. - - If the Gearman server is disconnected, this will be called for any - jobs currently associated with the server. - - :arg Job packet: The :py:class:`Job` that was running when the server - disconnected. - """ - return job - - -class FunctionRecord(object): - """Represents a function that should be registered with Gearman. - - This class only directly needs to be instatiated for use with - :py:meth:`Worker.setFunctions`. If a timeout value is supplied, - the function will be registered with CAN_DO_TIMEOUT. - - :arg str name: The name of the function to register. - :arg numeric timeout: The timeout value (optional). - """ - def __init__(self, name, timeout=None): - self.name = name - self.timeout = timeout - - def __repr__(self): - return '' % ( - id(self), self.name, self.timeout) - - -class BaseJob(object): - def __init__(self, name, arguments, unique=None, handle=None): - self._name = convert_to_bytes(name) - self._validate_arguments(arguments) - self._arguments = convert_to_bytes(arguments) - self._unique = convert_to_bytes(unique) - self.handle = handle - self.connection = None - - def _validate_arguments(self, arguments): - if (not isinstance(arguments, bytes) and - not isinstance(arguments, bytearray)): - raise TypeError("arguments must be of type bytes or bytearray") - - @property - def arguments(self): - return self._arguments - - @arguments.setter - def arguments(self, value): - self._arguments = value - - @property - def unique(self): - return self._unique - - @unique.setter - def unique(self, value): - self._unique = value - - @property - def name(self): - if isinstance(self._name, six.binary_type): - return self._name.decode('utf-8') - return self._name - - @name.setter - def name(self, value): - if isinstance(value, six.text_type): - value = value.encode('utf-8') - self._name = value - - @property - def binary_name(self): - return self._name - - @property - def binary_arguments(self): - return self._arguments - - @property - def binary_unique(self): - return self._unique - - def __repr__(self): - return '' % ( - id(self), self.handle, self.name, self.unique) - - -class WorkerJob(BaseJob): - """A job that Gearman has assigned to a Worker. Not intended to - be instantiated directly, but rather returned by - :py:meth:`Worker.getJob`. - - :arg str handle: The job handle assigned by gearman. - :arg str name: The name of the job. - :arg bytes arguments: The opaque data blob passed to the worker - as arguments. - :arg str unique: A byte string to uniquely identify the job to Gearman - (optional). - - The following instance attributes are available: - - **name** (str) - The name of the job. Assumed to be utf-8. - **arguments** (bytes) - The opaque data blob passed to the worker as arguments. - **unique** (str or None) - The unique ID of the job (if supplied). - **handle** (bytes) - The Gearman job handle. - **connection** (:py:class:`Connection` or None) - The connection associated with the job. Only set after the job - has been submitted to a Gearman server. - """ - - def __init__(self, handle, name, arguments, unique=None): - super(WorkerJob, self).__init__(name, arguments, unique, handle) - - def sendWorkData(self, data=b''): - """Send a WORK_DATA packet to the client. - - :arg bytes data: The data to be sent to the client (optional). - """ - - data = self.handle + b'\x00' + data - p = Packet(constants.REQ, constants.WORK_DATA, data) - self.connection.sendPacket(p) - - def sendWorkWarning(self, data=b''): - """Send a WORK_WARNING packet to the client. - - :arg bytes data: The data to be sent to the client (optional). - """ - - data = self.handle + b'\x00' + data - p = Packet(constants.REQ, constants.WORK_WARNING, data) - self.connection.sendPacket(p) - - def sendWorkStatus(self, numerator, denominator): - """Send a WORK_STATUS packet to the client. - - Sends a numerator and denominator that together represent the - fraction complete of the job. - - :arg numeric numerator: The numerator of the fraction complete. - :arg numeric denominator: The denominator of the fraction complete. - """ - - data = (self.handle + b'\x00' + - str(numerator).encode('utf8') + b'\x00' + - str(denominator).encode('utf8')) - p = Packet(constants.REQ, constants.WORK_STATUS, data) - self.connection.sendPacket(p) - - def sendWorkComplete(self, data=b''): - """Send a WORK_COMPLETE packet to the client. - - :arg bytes data: The data to be sent to the client (optional). - """ - - data = self.handle + b'\x00' + data - p = Packet(constants.REQ, constants.WORK_COMPLETE, data) - self.connection.sendPacket(p) - - def sendWorkFail(self): - "Send a WORK_FAIL packet to the client." - - p = Packet(constants.REQ, constants.WORK_FAIL, self.handle) - self.connection.sendPacket(p) - - def sendWorkException(self, data=b''): - """Send a WORK_EXCEPTION packet to the client. - - :arg bytes data: The exception data to be sent to the client - (optional). - """ - - data = self.handle + b'\x00' + data - p = Packet(constants.REQ, constants.WORK_EXCEPTION, data) - self.connection.sendPacket(p) - - -class Worker(BaseClient): - """A Gearman worker. - - :arg str client_id: The client ID to provide to Gearman. It will - appear in administrative output and be appended to the name of - the logger (e.g., gear.Worker.client_id). - :arg str worker_id: The client ID to provide to Gearman. It will - appear in administrative output and be appended to the name of - the logger (e.g., gear.Worker.client_id). This parameter name - is deprecated, use client_id instead. - """ - - job_class = WorkerJob - - def __init__(self, client_id=None, worker_id=None): - if not client_id or worker_id: - raise Exception("A client_id must be provided") - if worker_id: - client_id = worker_id - super(Worker, self).__init__(client_id) - self.log = logging.getLogger("gear.Worker.%s" % (self.client_id,)) - self.worker_id = client_id - self.functions = {} - self.job_lock = threading.Lock() - self.waiting_for_jobs = 0 - self.job_queue = queue_mod.Queue() - - def __repr__(self): - return '' % id(self) - - def registerFunction(self, name, timeout=None): - """Register a function with Gearman. - - If a timeout value is supplied, the function will be - registered with CAN_DO_TIMEOUT. - - :arg str name: The name of the function to register. - :arg numeric timeout: The timeout value (optional). - """ - name = convert_to_bytes(name) - self.functions[name] = FunctionRecord(name, timeout) - if timeout: - self._sendCanDoTimeout(name, timeout) - else: - self._sendCanDo(name) - - connections = self.active_connections[:] - for connection in connections: - if connection.state == "SLEEP": - connection.changeState("IDLE") - self._updateStateMachines() - - def unRegisterFunction(self, name): - """Remove a function from Gearman's registry. - - :arg str name: The name of the function to remove. - """ - name = convert_to_bytes(name) - del self.functions[name] - self._sendCantDo(name) - - def setFunctions(self, functions): - """Replace the set of functions registered with Gearman. - - Accepts a list of :py:class:`FunctionRecord` objects which - represents the complete set of functions that should be - registered with Gearman. Any existing functions will be - unregistered and these registered in their place. If the - empty list is supplied, then the Gearman registered function - set will be cleared. - - :arg list functions: A list of :py:class:`FunctionRecord` objects. - """ - - self._sendResetAbilities() - self.functions = {} - for f in functions: - if not isinstance(f, FunctionRecord): - raise InvalidDataError( - "An iterable of FunctionRecords is required.") - self.functions[f.name] = f - for f in self.functions.values(): - if f.timeout: - self._sendCanDoTimeout(f.name, f.timeout) - else: - self._sendCanDo(f.name) - - def _sendCanDo(self, name): - self.broadcast_lock.acquire() - try: - p = Packet(constants.REQ, constants.CAN_DO, name) - self.broadcast(p) - finally: - self.broadcast_lock.release() - - def _sendCanDoTimeout(self, name, timeout): - self.broadcast_lock.acquire() - try: - data = name + b'\x00' + timeout - p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data) - self.broadcast(p) - finally: - self.broadcast_lock.release() - - def _sendCantDo(self, name): - self.broadcast_lock.acquire() - try: - p = Packet(constants.REQ, constants.CANT_DO, name) - self.broadcast(p) - finally: - self.broadcast_lock.release() - - def _sendResetAbilities(self): - self.broadcast_lock.acquire() - try: - p = Packet(constants.REQ, constants.RESET_ABILITIES, b'') - self.broadcast(p) - finally: - self.broadcast_lock.release() - - def _sendPreSleep(self, connection): - p = Packet(constants.REQ, constants.PRE_SLEEP, b'') - self.sendPacket(p, connection) - - def _sendGrabJobUniq(self, connection=None): - p = Packet(constants.REQ, constants.GRAB_JOB_UNIQ, b'') - if connection: - self.sendPacket(p, connection) - else: - self.broadcast(p) - - def _onConnect(self, conn): - self.broadcast_lock.acquire() - try: - # Called immediately after a successful (re-)connection - p = Packet(constants.REQ, constants.SET_CLIENT_ID, self.client_id) - conn.sendPacket(p) - super(Worker, self)._onConnect(conn) - for f in self.functions.values(): - if f.timeout: - data = f.name + b'\x00' + f.timeout - p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data) - else: - p = Packet(constants.REQ, constants.CAN_DO, f.name) - conn.sendPacket(p) - conn.changeState("IDLE") - finally: - self.broadcast_lock.release() - # Any exceptions will be handled by the calling function, and the - # connection will not be put into the pool. - - def _onActiveConnection(self, conn): - self.job_lock.acquire() - try: - if self.waiting_for_jobs > 0: - self._updateStateMachines() - finally: - self.job_lock.release() - - def _updateStateMachines(self): - connections = self.active_connections[:] - - for connection in connections: - if (connection.state == "IDLE" and self.waiting_for_jobs > 0): - self._sendGrabJobUniq(connection) - connection.changeState("GRAB_WAIT") - if (connection.state != "IDLE" and self.waiting_for_jobs < 1): - connection.changeState("IDLE") - - def getJob(self): - """Get a job from Gearman. - - Blocks until a job is received. This method is re-entrant, so - it is safe to call this method on a single worker from - multiple threads. In that case, one of them at random will - receive the job assignment. - - :returns: The :py:class:`WorkerJob` assigned. - :rtype: :py:class:`WorkerJob`. - :raises InterruptedError: If interrupted (by - :py:meth:`stopWaitingForJobs`) before a job is received. - """ - self.job_lock.acquire() - try: - # self.running gets cleared during _shutdown(), before the - # stopWaitingForJobs() is called. This check has to - # happen with the job_lock held, otherwise there would be - # a window for race conditions between manipulation of - # "running" and "waiting_for_jobs". - if not self.running: - raise InterruptedError() - - self.waiting_for_jobs += 1 - self.log.debug("Get job; number of threads waiting for jobs: %s" % - self.waiting_for_jobs) - - try: - job = self.job_queue.get(False) - except queue_mod.Empty: - job = None - - if not job: - self._updateStateMachines() - - finally: - self.job_lock.release() - - if not job: - job = self.job_queue.get() - - self.log.debug("Received job: %s" % job) - if job is None: - raise InterruptedError() - return job - - def stopWaitingForJobs(self): - """Interrupts all running :py:meth:`getJob` calls, which will raise - an exception. - """ - - self.job_lock.acquire() - try: - while True: - connections = self.active_connections[:] - now = time.time() - ok = True - for connection in connections: - if connection.state == "GRAB_WAIT": - # Replies to GRAB_JOB should be fast, give up if we've - # been waiting for more than 5 seconds. - if now - connection.state_time > 5: - self._lostConnection(connection) - else: - ok = False - if ok: - break - else: - self.job_lock.release() - time.sleep(0.1) - self.job_lock.acquire() - - while self.waiting_for_jobs > 0: - self.waiting_for_jobs -= 1 - self.job_queue.put(None) - - self._updateStateMachines() - finally: - self.job_lock.release() - - def _shutdown(self): - self.job_lock.acquire() - try: - # The upstream _shutdown() will clear the "running" bool. Because - # that is a variable which is used for proper synchronization of - # the exit within getJob() which might be about to be called from a - # separate thread, it's important to call it with a proper lock - # being held. - super(Worker, self)._shutdown() - finally: - self.job_lock.release() - self.stopWaitingForJobs() - - def handleNoop(self, packet): - """Handle a NOOP packet. - - Sends a GRAB_JOB_UNIQ packet on the same connection. - GRAB_JOB_UNIQ will return jobs regardless of whether they have - been specified with a unique identifier when submitted. If - they were not, then :py:attr:`WorkerJob.unique` attribute - will be None. - - :arg Packet packet: The :py:class:`Packet` that was received. - """ - - self.job_lock.acquire() - try: - if packet.connection.state == "SLEEP": - self.log.debug("Sending GRAB_JOB_UNIQ") - self._sendGrabJobUniq(packet.connection) - packet.connection.changeState("GRAB_WAIT") - else: - self.log.debug("Received unexpecetd NOOP packet on %s" % - packet.connection) - finally: - self.job_lock.release() - - def handleNoJob(self, packet): - """Handle a NO_JOB packet. - - Sends a PRE_SLEEP packet on the same connection. - - :arg Packet packet: The :py:class:`Packet` that was received. - """ - self.job_lock.acquire() - try: - if packet.connection.state == "GRAB_WAIT": - self.log.debug("Sending PRE_SLEEP") - self._sendPreSleep(packet.connection) - packet.connection.changeState("SLEEP") - else: - self.log.debug("Received unexpected NO_JOB packet on %s" % - packet.connection) - finally: - self.job_lock.release() - - def handleJobAssign(self, packet): - """Handle a JOB_ASSIGN packet. - - Adds a WorkerJob to the internal queue to be picked up by any - threads waiting in :py:meth:`getJob`. - - :arg Packet packet: The :py:class:`Packet` that was received. - """ - - handle = packet.getArgument(0) - name = packet.getArgument(1) - arguments = packet.getArgument(2, True) - return self._handleJobAssignment(packet, handle, name, - arguments, None) - - def handleJobAssignUnique(self, packet): - """Handle a JOB_ASSIGN_UNIQ packet. - - Adds a WorkerJob to the internal queue to be picked up by any - threads waiting in :py:meth:`getJob`. - - :arg Packet packet: The :py:class:`Packet` that was received. - """ - - handle = packet.getArgument(0) - name = packet.getArgument(1) - unique = packet.getArgument(2) - if unique == b'': - unique = None - arguments = packet.getArgument(3, True) - return self._handleJobAssignment(packet, handle, name, - arguments, unique) - - def _handleJobAssignment(self, packet, handle, name, arguments, unique): - job = self.job_class(handle, name, arguments, unique) - job.connection = packet.connection - - self.job_lock.acquire() - try: - packet.connection.changeState("IDLE") - self.waiting_for_jobs -= 1 - self.log.debug("Job assigned; number of threads waiting for " - "jobs: %s" % self.waiting_for_jobs) - self.job_queue.put(job) - - self._updateStateMachines() - finally: - self.job_lock.release() - - -class Job(BaseJob): - """A job to run or being run by Gearman. - - :arg str name: The name of the job. - :arg bytes arguments: The opaque data blob to be passed to the worker - as arguments. - :arg str unique: A byte string to uniquely identify the job to Gearman - (optional). - - The following instance attributes are available: - - **name** (str) - The name of the job. Assumed to be utf-8. - **arguments** (bytes) - The opaque data blob passed to the worker as arguments. - **unique** (str or None) - The unique ID of the job (if supplied). - **handle** (bytes or None) - The Gearman job handle. None if no job handle has been received yet. - **data** (list of byte-arrays) - The result data returned from Gearman. Each packet appends an - element to the list. Depending on the nature of the data, the - elements may need to be concatenated before use. This is returned - as a snapshot copy of the data to prevent accidental attempts at - modification which will be lost. - **exception** (bytes or None) - Exception information returned from Gearman. None if no exception - has been received. - **warning** (bool) - Whether the worker has reported a warning. - **complete** (bool) - Whether the job is complete. - **failure** (bool) - Whether the job has failed. Only set when complete is True. - **numerator** (bytes or None) - The numerator of the completion ratio reported by the worker. - Only set when a status update is sent by the worker. - **denominator** (bytes or None) - The denominator of the completion ratio reported by the - worker. Only set when a status update is sent by the worker. - **fraction_complete** (float or None) - The fractional complete ratio reported by the worker. Only set when - a status update is sent by the worker. - **known** (bool or None) - Whether the job is known to Gearman. Only set by handleStatusRes() in - response to a getStatus() query. - **running** (bool or None) - Whether the job is running. Only set by handleStatusRes() in - response to a getStatus() query. - **connection** (:py:class:`Connection` or None) - The connection associated with the job. Only set after the job - has been submitted to a Gearman server. - """ - - data_type = list - - def __init__(self, name, arguments, unique=None): - super(Job, self).__init__(name, arguments, unique) - self._data = self.data_type() - self._exception = None - self.warning = False - self.complete = False - self.failure = False - self.numerator = None - self.denominator = None - self.fraction_complete = None - self.known = None - self.running = None - - @property - def binary_data(self): - for value in self._data: - if isinstance(value, six.text_type): - value = value.encode('utf-8') - yield value - - @property - def data(self): - return self._data - - @data.setter - def data(self, value): - if not isinstance(value, self.data_type): - raise ValueError( - "data attribute must be {}".format(self.data_type)) - self._data = value - - @property - def exception(self): - return self._exception - - @exception.setter - def exception(self, value): - self._exception = value - - -class TextJobArguments(object): - """Assumes utf-8 arguments in addition to name - - If one is always dealing in valid utf-8, using this job class relieves one - of the need to encode/decode constantly.""" - - def _validate_arguments(self, arguments): - pass - - @property - def arguments(self): - args = self._arguments - if isinstance(args, six.binary_type): - return args.decode('utf-8') - return args - - @arguments.setter - def arguments(self, value): - if not isinstance(value, six.binary_type): - value = value.encode('utf-8') - self._arguments = value - - -class TextJobUnique(object): - """Assumes utf-8 unique - - If one is always dealing in valid utf-8, using this job class relieves one - of the need to encode/decode constantly.""" - - @property - def unique(self): - unique = self._unique - if isinstance(unique, six.binary_type): - return unique.decode('utf-8') - return unique - - @unique.setter - def unique(self, value): - if not isinstance(value, six.binary_type): - value = value.encode('utf-8') - self._unique = value - - -class TextList(list): - def append(self, x): - if isinstance(x, six.binary_type): - x = x.decode('utf-8') - super(TextList, self).append(x) - - def extend(self, iterable): - def _iter(): - for value in iterable: - if isinstance(value, six.binary_type): - yield value.decode('utf-8') - else: - yield value - super(TextList, self).extend(_iter) - - def insert(self, i, x): - if isinstance(x, six.binary_type): - x = x.decode('utf-8') - super(TextList, self).insert(i, x) - - -class TextJob(TextJobArguments, TextJobUnique, Job): - """ Sends and receives UTF-8 arguments and data. - - Use this instead of Job when you only expect to send valid UTF-8 through - gearman. It will automatically encode arguments and work data as UTF-8, and - any jobs fetched from this worker will have their arguments and data - decoded assuming they are valid UTF-8, and thus return strings. - - Attributes and method signatures are thes ame as Job except as noted here: - - ** arguments ** (str) This will be returned as a string. - ** data ** (tuple of str) This will be returned as a tuble of strings. - - """ - - data_type = TextList - - @property - def exception(self): - exception = self._exception - if isinstance(exception, six.binary_type): - return exception.decode('utf-8') - return exception - - @exception.setter - def exception(self, value): - if not isinstance(value, six.binary_type): - value = value.encode('utf-8') - self._exception = value - - -class TextWorkerJob(TextJobArguments, TextJobUnique, WorkerJob): - """ Sends and receives UTF-8 arguments and data. - - See TextJob. sendWorkData and sendWorkWarning accept strings - and will encode them as UTF-8. - """ - def sendWorkData(self, data=''): - """Send a WORK_DATA packet to the client. - - :arg str data: The data to be sent to the client (optional). - """ - if isinstance(data, six.text_type): - data = data.encode('utf8') - return super(TextWorkerJob, self).sendWorkData(data) - - def sendWorkWarning(self, data=''): - """Send a WORK_WARNING packet to the client. - - :arg str data: The data to be sent to the client (optional). - """ - - if isinstance(data, six.text_type): - data = data.encode('utf8') - return super(TextWorkerJob, self).sendWorkWarning(data) - - def sendWorkComplete(self, data=''): - """Send a WORK_COMPLETE packet to the client. - - :arg str data: The data to be sent to the client (optional). - """ - if isinstance(data, six.text_type): - data = data.encode('utf8') - return super(TextWorkerJob, self).sendWorkComplete(data) - - def sendWorkException(self, data=''): - """Send a WORK_EXCEPTION packet to the client. - - :arg str data: The data to be sent to the client (optional). - """ - - if isinstance(data, six.text_type): - data = data.encode('utf8') - return super(TextWorkerJob, self).sendWorkException(data) - - -class TextWorker(Worker): - """ Sends and receives UTF-8 only. - - See TextJob. - - """ - - job_class = TextWorkerJob - - -class BaseBinaryJob(object): - """ For the case where non-utf-8 job names are needed. It will function - exactly like Job, except that the job name will not be decoded.""" - - @property - def name(self): - return self._name - - -class BinaryWorkerJob(BaseBinaryJob, WorkerJob): - pass - - -class BinaryJob(BaseBinaryJob, Job): - pass - - -# Below are classes for use in the server implementation: -class ServerJob(BinaryJob): - """A job record for use in a server. - - :arg str name: The name of the job. - :arg bytes arguments: The opaque data blob to be passed to the worker - as arguments. - :arg str unique: A byte string to uniquely identify the job to Gearman - (optional). - - The following instance attributes are available: - - **name** (str) - The name of the job. - **arguments** (bytes) - The opaque data blob passed to the worker as arguments. - **unique** (str or None) - The unique ID of the job (if supplied). - **handle** (bytes or None) - The Gearman job handle. None if no job handle has been received yet. - **data** (list of byte-arrays) - The result data returned from Gearman. Each packet appends an - element to the list. Depending on the nature of the data, the - elements may need to be concatenated before use. - **exception** (bytes or None) - Exception information returned from Gearman. None if no exception - has been received. - **warning** (bool) - Whether the worker has reported a warning. - **complete** (bool) - Whether the job is complete. - **failure** (bool) - Whether the job has failed. Only set when complete is True. - **numerator** (bytes or None) - The numerator of the completion ratio reported by the worker. - Only set when a status update is sent by the worker. - **denominator** (bytes or None) - The denominator of the completion ratio reported by the - worker. Only set when a status update is sent by the worker. - **fraction_complete** (float or None) - The fractional complete ratio reported by the worker. Only set when - a status update is sent by the worker. - **known** (bool or None) - Whether the job is known to Gearman. Only set by handleStatusRes() in - response to a getStatus() query. - **running** (bool or None) - Whether the job is running. Only set by handleStatusRes() in - response to a getStatus() query. - **client_connection** :py:class:`Connection` - The client connection associated with the job. - **worker_connection** (:py:class:`Connection` or None) - The worker connection associated with the job. Only set after the job - has been assigned to a worker. - """ - - def __init__(self, handle, name, arguments, client_connection, - unique=None): - super(ServerJob, self).__init__(name, arguments, unique) - self.handle = handle - self.client_connection = client_connection - self.worker_connection = None - del self.connection - - -class ServerAdminRequest(AdminRequest): - """An administrative request sent to a server.""" - - def __init__(self, connection): - super(ServerAdminRequest, self).__init__() - self.connection = connection - - def isComplete(self, data): - end_index_newline = data.find(b'\n') - if end_index_newline != -1: - self.command = data[:end_index_newline] - # Remove newline from data - x = end_index_newline + 1 - return (True, data[x:]) - else: - return (False, None) - - -class NonBlockingConnection(Connection): - """A Non-blocking connection to a Gearman Client.""" - - def __init__(self, host, port, ssl_key=None, ssl_cert=None, - ssl_ca=None, client_id='unknown'): - super(NonBlockingConnection, self).__init__( - host, port, ssl_key, - ssl_cert, ssl_ca, client_id) - self.send_queue = [] - - def connect(self): - super(NonBlockingConnection, self).connect() - if self.connected and self.conn: - self.conn.setblocking(0) - - def _readRawBytes(self, bytes_to_read): - try: - buff = self.conn.recv(bytes_to_read) - except ssl.SSLError as e: - if e.errno == ssl.SSL_ERROR_WANT_READ: - raise RetryIOError() - elif e.errno == ssl.SSL_ERROR_WANT_WRITE: - raise RetryIOError() - raise - except socket.error as e: - if e.errno == errno.EAGAIN: - # Read operation would block, we're done until - # epoll flags this connection again - raise RetryIOError() - raise - return buff - - def sendPacket(self, packet): - """Append a packet to this connection's send queue. The Client or - Server must manage actually sending the data. - - :arg :py:class:`Packet` packet The packet to send - - """ - self.log.debug("Queuing packet to %s: %s" % (self, packet)) - self.send_queue.append(packet.toBinary()) - self.sendQueuedData() - - def sendRaw(self, data): - """Append raw data to this connection's send queue. The Client or - Server must manage actually sending the data. - - :arg bytes data The raw data to send - - """ - self.log.debug("Queuing data to %s: %s" % (self, data)) - self.send_queue.append(data) - self.sendQueuedData() - - def sendQueuedData(self): - """Send previously queued data to the socket.""" - try: - while len(self.send_queue): - data = self.send_queue.pop(0) - r = 0 - try: - r = self.conn.send(data) - except ssl.SSLError as e: - if e.errno == ssl.SSL_ERROR_WANT_READ: - raise RetryIOError() - elif e.errno == ssl.SSL_ERROR_WANT_WRITE: - raise RetryIOError() - else: - raise - except socket.error as e: - if e.errno == errno.EAGAIN: - self.log.debug("Write operation on %s would block" - % self) - raise RetryIOError() - else: - raise - finally: - data = data[r:] - if data: - self.send_queue.insert(0, data) - except RetryIOError: - pass - - -class ServerConnection(NonBlockingConnection): - """A Connection to a Gearman Client.""" - - def __init__(self, addr, conn, use_ssl, client_id): - if client_id: - self.log = logging.getLogger("gear.ServerConnection.%s" % - (client_id,)) - else: - self.log = logging.getLogger("gear.ServerConnection") - self.send_queue = [] - self.admin_requests = [] - self.host = addr[0] - self.port = addr[1] - self.conn = conn - self.conn.setblocking(0) - self.input_buffer = b'' - self.need_bytes = False - self.use_ssl = use_ssl - self.client_id = None - self.functions = set() - self.related_jobs = {} - self.ssl_subject = None - if self.use_ssl: - for x in conn.getpeercert()['subject']: - if x[0][0] == 'commonName': - self.ssl_subject = x[0][1] - self.log.debug("SSL subject: %s" % self.ssl_subject) - self.changeState("INIT") - - def _getAdminRequest(self): - return ServerAdminRequest(self) - - def _putAdminRequest(self, req): - # The server does not need to keep track of admin requests - # that have been partially received; it will simply create a - # new instance the next time it tries to read. - pass - - def __repr__(self): - return '' % ( - id(self), self.client_id, self.host, self.port) - - -class Server(BaseClientServer): - """A simple gearman server implementation for testing - (not for production use). - - :arg int port: The TCP port on which to listen. - :arg str ssl_key: Path to the SSL private key. - :arg str ssl_cert: Path to the SSL certificate. - :arg str ssl_ca: Path to the CA certificate. - :arg str statsd_host: statsd hostname. None means disabled - (the default). - :arg str statsd_port: statsd port (defaults to 8125). - :arg str statsd_prefix: statsd key prefix. - :arg str client_id: The ID associated with this server. - It will be appending to the name of the logger (e.g., - gear.Server.server_id). Defaults to None (unused). - :arg ACL acl: An :py:class:`ACL` object if the server should apply - access control rules to its connections. - :arg str host: Host name or IPv4/IPv6 address to bind to. Defaults - to "whatever getaddrinfo() returns", which might be IPv4-only. - :arg bool keepalive: Whether to use TCP keepalives - :arg int tcp_keepidle: Idle time after which to start keepalives sending - :arg int tcp_keepintvl: Interval in seconds between TCP keepalives - :arg int tcp_keepcnt: Count of TCP keepalives to send before disconnect - """ - - edge_bitmask = select.EPOLLET - error_bitmask = (select.EPOLLERR | select.EPOLLHUP | edge_bitmask) - read_bitmask = (select.EPOLLIN | error_bitmask) - readwrite_bitmask = (select.EPOLLOUT | read_bitmask) - - def __init__(self, port=4730, ssl_key=None, ssl_cert=None, ssl_ca=None, - statsd_host=None, statsd_port=8125, statsd_prefix=None, - server_id=None, acl=None, host=None, keepalive=False, - tcp_keepidle=7200, tcp_keepintvl=75, tcp_keepcnt=9): - self.port = port - self.ssl_key = ssl_key - self.ssl_cert = ssl_cert - self.ssl_ca = ssl_ca - self.high_queue = [] - self.normal_queue = [] - self.low_queue = [] - self.jobs = {} - self.running_jobs = 0 - self.waiting_jobs = 0 - self.total_jobs = 0 - self.functions = set() - self.max_handle = 0 - self.acl = acl - self.connect_wake_read, self.connect_wake_write = os.pipe() - self.poll = select.epoll() - # Reverse mapping of fd -> connection - self.connection_map = {} - - self.use_ssl = False - if all([self.ssl_key, self.ssl_cert, self.ssl_ca]): - self.use_ssl = True - - # Get all valid passive listen addresses, then sort by family to prefer - # ipv6 if available. - addrs = socket.getaddrinfo(host, self.port, socket.AF_UNSPEC, - socket.SOCK_STREAM, 0, - socket.AI_PASSIVE | - socket.AI_ADDRCONFIG) - addrs.sort(key=lambda addr: addr[0], reverse=True) - for res in addrs: - af, socktype, proto, canonname, sa = res - try: - self.socket = socket.socket(af, socktype, proto) - self.socket.setsockopt(socket.SOL_SOCKET, - socket.SO_REUSEADDR, 1) - if keepalive and hasattr(socket, 'TCP_KEEPIDLE'): - self.socket.setsockopt(socket.SOL_SOCKET, - socket.SO_KEEPALIVE, 1) - self.socket.setsockopt(socket.IPPROTO_TCP, - socket.TCP_KEEPIDLE, tcp_keepidle) - self.socket.setsockopt(socket.IPPROTO_TCP, - socket.TCP_KEEPINTVL, tcp_keepintvl) - self.socket.setsockopt(socket.IPPROTO_TCP, - socket.TCP_KEEPCNT, tcp_keepcnt) - elif keepalive: - self.log.warning('Keepalive requested but not available ' - 'on this platform') - except socket.error: - self.socket = None - continue - try: - self.socket.bind(sa) - self.socket.listen(1) - except socket.error: - self.socket.close() - self.socket = None - continue - break - - if self.socket is None: - raise Exception("Could not open socket") - - if port == 0: - self.port = self.socket.getsockname()[1] - - super(Server, self).__init__(server_id) - - # Register the wake pipe so that we can break if we need to - # reconfigure connections - self.poll.register(self.wake_read, self.read_bitmask) - - if server_id: - self.log = logging.getLogger("gear.Server.%s" % (self.client_id,)) - else: - self.log = logging.getLogger("gear.Server") - - if statsd_host: - if not statsd: - self.log.error("Unable to import statsd module") - self.statsd = None - else: - self.statsd = statsd.StatsClient(statsd_host, - statsd_port, - statsd_prefix) - else: - self.statsd = None - - def _doConnectLoop(self): - while self.running: - try: - self.connectLoop() - except Exception: - self.log.exception("Exception in connect loop:") - time.sleep(1) - - def connectLoop(self): - poll = select.poll() - bitmask = (select.POLLIN | select.POLLERR | - select.POLLHUP | select.POLLNVAL) - # Register the wake pipe so that we can break if we need to - # shutdown. - poll.register(self.connect_wake_read, bitmask) - poll.register(self.socket.fileno(), bitmask) - while self.running: - ret = poll.poll() - for fd, event in ret: - if fd == self.connect_wake_read: - self.log.debug("Accept woken by pipe") - while True: - if os.read(self.connect_wake_read, 1) == b'\n': - break - return - if event & select.POLLIN: - self.log.debug("Accepting new connection") - c, addr = self.socket.accept() - if self.use_ssl: - context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) - context.verify_mode = ssl.CERT_REQUIRED - context.load_cert_chain(self.ssl_cert, self.ssl_key) - context.load_verify_locations(self.ssl_ca) - c = context.wrap_socket(c, server_side=True) - conn = ServerConnection(addr, c, self.use_ssl, - self.client_id) - self.log.info("Accepted connection %s" % (conn,)) - self.connections_condition.acquire() - try: - self.active_connections.append(conn) - self._registerConnection(conn) - self.connections_condition.notifyAll() - finally: - self.connections_condition.release() - - def readFromConnection(self, conn): - while True: - self.log.debug("Processing input on %s" % conn) - try: - p = conn.readPacket() - except RetryIOError: - # Read operation would block, we're done until - # epoll flags this connection again - return - if p: - if isinstance(p, Packet): - self.handlePacket(p) - else: - self.handleAdminRequest(p) - else: - self.log.debug("Received no data on %s" % conn) - raise DisconnectError() - - def writeToConnection(self, conn): - self.log.debug("Processing output on %s" % conn) - conn.sendQueuedData() - - def _processPollEvent(self, conn, event): - # This should do whatever is necessary to process a connection - # that has triggered a poll event. It should generally not - # raise exceptions so as to avoid restarting the poll loop. - # The exception handlers here can raise exceptions and if they - # do, it's okay, the poll loop will be restarted. - try: - if event & (select.EPOLLERR | select.EPOLLHUP): - self.log.debug("Received error event on %s: %s" % ( - conn, event)) - raise DisconnectError() - if event & (select.POLLIN | select.POLLOUT): - self.readFromConnection(conn) - self.writeToConnection(conn) - except socket.error as e: - if e.errno == errno.ECONNRESET: - self.log.debug("Connection reset by peer: %s" % (conn,)) - self._lostConnection(conn) - return - raise - except DisconnectError: - # Our inner method says we should quietly drop - # this connection - self._lostConnection(conn) - return - except Exception: - self.log.exception("Exception reading or writing " - "from %s:" % (conn,)) - self._lostConnection(conn) - return - - def _flushAllConnections(self): - # If we need to restart the poll loop, we need to make sure - # there are no pending data on any connection. Simulate poll - # in+out events on every connection. - # - # If this method raises an exception, the poll loop wil - # restart again. - # - # No need to get the lock since this is called within the poll - # loop and therefore the list in guaranteed never to shrink. - connections = self.active_connections[:] - for conn in connections: - self._processPollEvent(conn, select.POLLIN | select.POLLOUT) - - def _doPollLoop(self): - # Outer run method of poll thread. - while self.running: - try: - self._pollLoop() - except Exception: - self.log.exception("Exception in poll loop:") - - def _pollLoop(self): - # Inner method of poll loop. - self.log.debug("Preparing to poll") - # Ensure there are no pending data. - self._flushAllConnections() - while self.running: - self.log.debug("Polling %s connections" % - len(self.active_connections)) - ret = self.poll.poll() - # Since we're using edge-triggering, we need to make sure - # that every file descriptor in 'ret' is processed. - for fd, event in ret: - if fd == self.wake_read: - # This means we're exiting, so we can ignore the - # rest of 'ret'. - self.log.debug("Woken by pipe") - while True: - if os.read(self.wake_read, 1) == b'\n': - break - return - # In the unlikely event this raises an exception, the - # loop will be restarted. - conn = self.connection_map[fd] - self._processPollEvent(conn, event) - - def _shutdown(self): - super(Server, self)._shutdown() - os.write(self.connect_wake_write, b'1\n') - - def _cleanup(self): - super(Server, self)._cleanup() - self.socket.close() - os.close(self.connect_wake_read) - os.close(self.connect_wake_write) - - def _registerConnection(self, conn): - # Register the connection with the poll object - # Call while holding the connection condition - self.log.debug("Registering %s" % conn) - self.connection_map[conn.conn.fileno()] = conn - self.poll.register(conn.conn.fileno(), self.readwrite_bitmask) - - def _unregisterConnection(self, conn): - # Unregister the connection with the poll object - # Call while holding the connection condition - self.log.debug("Unregistering %s" % conn) - fd = conn.conn.fileno() - if fd not in self.connection_map: - return - try: - self.poll.unregister(fd) - except KeyError: - pass - try: - del self.connection_map[fd] - except KeyError: - pass - - def _lostConnection(self, conn): - # Called as soon as a connection is detected as faulty. - self.log.info("Marking %s as disconnected" % conn) - self.connections_condition.acquire() - self._unregisterConnection(conn) - try: - # NOTE(notmorgan): In the loop below it is possible to change the - # jobs list on the connection. In python 3 .values() is an iter not - # a static list, meaning that a change will break the for loop - # as the object being iterated on will have changed in size. - jobs = list(conn.related_jobs.values()) - if conn in self.active_connections: - self.active_connections.remove(conn) - finally: - self.connections_condition.notifyAll() - self.connections_condition.release() - for job in jobs: - if job.worker_connection == conn: - # the worker disconnected, alert the client - try: - p = Packet(constants.REQ, constants.WORK_FAIL, job.handle) - if job.client_connection: - job.client_connection.sendPacket(p) - except Exception: - self.log.exception("Sending WORK_FAIL to client after " - "worker disconnect failed:") - self._removeJob(job) - try: - conn.conn.shutdown(socket.SHUT_RDWR) - except socket.error as e: - if e.errno != errno.ENOTCONN: - self.log.exception("Unable to shutdown socket " - "for connection %s" % (conn,)) - except Exception: - self.log.exception("Unable to shutdown socket " - "for connection %s" % (conn,)) - try: - conn.conn.close() - except Exception: - self.log.exception("Unable to close socket " - "for connection %s" % (conn,)) - self._updateStats() - - def _removeJob(self, job, dequeue=True): - # dequeue is tri-state: True, False, or a specific queue - if job.client_connection: - try: - del job.client_connection.related_jobs[job.handle] - except KeyError: - pass - if job.worker_connection: - try: - del job.worker_connection.related_jobs[job.handle] - except KeyError: - pass - try: - del self.jobs[job.handle] - except KeyError: - pass - if dequeue is True: - # Search all queues for the job - try: - self.high_queue.remove(job) - except ValueError: - pass - try: - self.normal_queue.remove(job) - except ValueError: - pass - try: - self.low_queue.remove(job) - except ValueError: - pass - elif dequeue is not False: - # A specific queue was supplied - dequeue.remove(job) - # If dequeue is false, no need to remove from any queue - self.total_jobs -= 1 - if job.running: - self.running_jobs -= 1 - else: - self.waiting_jobs -= 1 - - def getQueue(self): - """Returns a copy of all internal queues in a flattened form. - - :returns: The Gearman queue. - :rtype: list of :py:class:`WorkerJob`. - """ - ret = [] - for queue in [self.high_queue, self.normal_queue, self.low_queue]: - ret += queue - return ret - - def handleAdminRequest(self, request): - self.log.info("Received admin request %s" % (request,)) - - if request.command.startswith(b'cancel job'): - self.handleCancelJob(request) - elif request.command.startswith(b'status'): - self.handleStatus(request) - elif request.command.startswith(b'workers'): - self.handleWorkers(request) - elif request.command.startswith(b'acl list'): - self.handleACLList(request) - elif request.command.startswith(b'acl grant'): - self.handleACLGrant(request) - elif request.command.startswith(b'acl revoke'): - self.handleACLRevoke(request) - elif request.command.startswith(b'acl self-revoke'): - self.handleACLSelfRevoke(request) - - self.log.debug("Finished handling admin request %s" % (request,)) - - def _cancelJob(self, request, job, queue): - if self.acl: - if not self.acl.canInvoke(request.connection.ssl_subject, - job.name): - self.log.info("Rejecting cancel job from %s for %s " - "due to ACL" % - (request.connection.ssl_subject, job.name)) - request.connection.sendRaw(b'ERR PERMISSION_DENIED\n') - return - self._removeJob(job, dequeue=queue) - self._updateStats() - request.connection.sendRaw(b'OK\n') - return - - def handleCancelJob(self, request): - words = request.command.split() - handle = words[2] - - if handle in self.jobs: - for queue in [self.high_queue, self.normal_queue, self.low_queue]: - for job in queue: - if handle == job.handle: - return self._cancelJob(request, job, queue) - request.connection.sendRaw(b'ERR UNKNOWN_JOB\n') - - def handleACLList(self, request): - if self.acl is None: - request.connection.sendRaw(b'ERR ACL_DISABLED\n') - return - for entry in self.acl.getEntries(): - l = "%s\tregister=%s\tinvoke=%s\tgrant=%s\n" % ( - entry.subject, entry.register, entry.invoke, entry.grant) - request.connection.sendRaw(l.encode('utf8')) - request.connection.sendRaw(b'.\n') - - def handleACLGrant(self, request): - # acl grant register worker .* - words = request.command.split(None, 4) - verb = words[2] - subject = words[3] - - if self.acl is None: - request.connection.sendRaw(b'ERR ACL_DISABLED\n') - return - if not self.acl.canGrant(request.connection.ssl_subject): - request.connection.sendRaw(b'ERR PERMISSION_DENIED\n') - return - try: - if verb == 'invoke': - self.acl.grantInvoke(subject, words[4]) - elif verb == 'register': - self.acl.grantRegister(subject, words[4]) - elif verb == 'grant': - self.acl.grantGrant(subject) - else: - request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n') - return - except ACLError as e: - self.log.info("Error in grant command: %s" % (e.message,)) - request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,)) - return - request.connection.sendRaw(b'OK\n') - - def handleACLRevoke(self, request): - # acl revoke register worker - words = request.command.split() - verb = words[2] - subject = words[3] - - if self.acl is None: - request.connection.sendRaw(b'ERR ACL_DISABLED\n') - return - if subject != request.connection.ssl_subject: - if not self.acl.canGrant(request.connection.ssl_subject): - request.connection.sendRaw(b'ERR PERMISSION_DENIED\n') - return - try: - if verb == 'invoke': - self.acl.revokeInvoke(subject) - elif verb == 'register': - self.acl.revokeRegister(subject) - elif verb == 'grant': - self.acl.revokeGrant(subject) - elif verb == 'all': - try: - self.acl.remove(subject) - except ACLError: - pass - else: - request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n') - return - except ACLError as e: - self.log.info("Error in revoke command: %s" % (e.message,)) - request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,)) - return - request.connection.sendRaw(b'OK\n') - - def handleACLSelfRevoke(self, request): - # acl self-revoke register - words = request.command.split() - verb = words[2] - - if self.acl is None: - request.connection.sendRaw(b'ERR ACL_DISABLED\n') - return - subject = request.connection.ssl_subject - try: - if verb == 'invoke': - self.acl.revokeInvoke(subject) - elif verb == 'register': - self.acl.revokeRegister(subject) - elif verb == 'grant': - self.acl.revokeGrant(subject) - elif verb == 'all': - try: - self.acl.remove(subject) - except ACLError: - pass - else: - request.connection.sendRaw(b'ERR UNKNOWN_ACL_VERB\n') - return - except ACLError as e: - self.log.info("Error in self-revoke command: %s" % (e.message,)) - request.connection.sendRaw(b'ERR UNABLE %s\n' % (e.message,)) - return - request.connection.sendRaw(b'OK\n') - - def _getFunctionStats(self): - functions = {} - for function in self.functions: - # Total, running, workers - functions[function] = [0, 0, 0] - for job in self.jobs.values(): - if job.name not in functions: - functions[job.name] = [0, 0, 0] - functions[job.name][0] += 1 - if job.running: - functions[job.name][1] += 1 - for connection in self.active_connections: - for function in connection.functions: - if function not in functions: - functions[function] = [0, 0, 0] - functions[function][2] += 1 - return functions - - def handleStatus(self, request): - functions = self._getFunctionStats() - for name, values in functions.items(): - request.connection.sendRaw( - ("%s\t%s\t%s\t%s\n" % - (name.decode('utf-8'), values[0], values[1], - values[2])).encode('utf8')) - request.connection.sendRaw(b'.\n') - - def handleWorkers(self, request): - for connection in self.active_connections: - fd = connection.conn.fileno() - ip = connection.host - client_id = connection.client_id or b'-' - functions = b' '.join(connection.functions).decode('utf8') - request.connection.sendRaw(("%s %s %s : %s\n" % - (fd, ip, client_id.decode('utf8'), - functions)) - .encode('utf8')) - request.connection.sendRaw(b'.\n') - - def wakeConnection(self, connection): - p = Packet(constants.RES, constants.NOOP, b'') - if connection.state == 'SLEEP': - connection.changeState("AWAKE") - connection.sendPacket(p) - - def wakeConnections(self, job=None): - p = Packet(constants.RES, constants.NOOP, b'') - for connection in self.active_connections: - if connection.state == 'SLEEP': - if ((job and job.name in connection.functions) or - (job is None)): - connection.changeState("AWAKE") - connection.sendPacket(p) - - def reportTimingStats(self, ptype, duration): - """Report processing times by packet type - - This method is called by handlePacket to report how long - processing took for each packet. If statsd is configured, - timing and counts are reported with the key - "prefix.packet.NAME". - - :arg bytes ptype: The packet type (one of the packet types in - constants). - :arg float duration: The time (in seconds) it took to process - the packet. - """ - if not self.statsd: - return - ptype = constants.types.get(ptype, 'UNKNOWN') - key = 'packet.%s' % ptype - self.statsd.timing(key, int(duration * 1000)) - self.statsd.incr(key) - - def _updateStats(self): - if not self.statsd: - return - - # prefix.queue.total - # prefix.queue.running - # prefix.queue.waiting - self.statsd.gauge('queue.total', self.total_jobs) - self.statsd.gauge('queue.running', self.running_jobs) - self.statsd.gauge('queue.waiting', self.waiting_jobs) - - def _handleSubmitJob(self, packet, precedence, background=False): - name = packet.getArgument(0) - unique = packet.getArgument(1) - if not unique: - unique = None - arguments = packet.getArgument(2, True) - if self.acl: - if not self.acl.canInvoke(packet.connection.ssl_subject, name): - self.log.info("Rejecting SUBMIT_JOB from %s for %s " - "due to ACL" % - (packet.connection.ssl_subject, name)) - self.sendError(packet.connection, 0, - 'Permission denied by ACL') - return - self.max_handle += 1 - handle = ('H:%s:%s' % (packet.connection.host, - self.max_handle)).encode('utf8') - if not background: - conn = packet.connection - else: - conn = None - job = ServerJob(handle, name, arguments, conn, unique) - p = Packet(constants.RES, constants.JOB_CREATED, handle) - packet.connection.sendPacket(p) - self.jobs[handle] = job - self.total_jobs += 1 - self.waiting_jobs += 1 - if not background: - packet.connection.related_jobs[handle] = job - if precedence == PRECEDENCE_HIGH: - self.high_queue.append(job) - elif precedence == PRECEDENCE_NORMAL: - self.normal_queue.append(job) - elif precedence == PRECEDENCE_LOW: - self.low_queue.append(job) - self._updateStats() - self.wakeConnections(job) - - def handleSubmitJob(self, packet): - return self._handleSubmitJob(packet, PRECEDENCE_NORMAL) - - def handleSubmitJobHigh(self, packet): - return self._handleSubmitJob(packet, PRECEDENCE_HIGH) - - def handleSubmitJobLow(self, packet): - return self._handleSubmitJob(packet, PRECEDENCE_LOW) - - def handleSubmitJobBg(self, packet): - return self._handleSubmitJob(packet, PRECEDENCE_NORMAL, - background=True) - - def handleSubmitJobHighBg(self, packet): - return self._handleSubmitJob(packet, PRECEDENCE_HIGH, background=True) - - def handleSubmitJobLowBg(self, packet): - return self._handleSubmitJob(packet, PRECEDENCE_LOW, background=True) - - def getJobForConnection(self, connection, peek=False): - for queue in [self.high_queue, self.normal_queue, self.low_queue]: - for job in queue: - if job.name in connection.functions: - if not peek: - queue.remove(job) - connection.related_jobs[job.handle] = job - job.worker_connection = connection - job.running = True - self.waiting_jobs -= 1 - self.running_jobs += 1 - self._updateStats() - return job - return None - - def handleGrabJobUniq(self, packet): - job = self.getJobForConnection(packet.connection) - if job: - self.sendJobAssignUniq(packet.connection, job) - else: - self.sendNoJob(packet.connection) - - def sendJobAssignUniq(self, connection, job): - unique = job.binary_unique - if not unique: - unique = b'' - data = b'\x00'.join((job.handle, job.name, unique, job.arguments)) - p = Packet(constants.RES, constants.JOB_ASSIGN_UNIQ, data) - connection.sendPacket(p) - - def sendNoJob(self, connection): - p = Packet(constants.RES, constants.NO_JOB, b'') - connection.sendPacket(p) - - def handlePreSleep(self, packet): - packet.connection.changeState("SLEEP") - if self.getJobForConnection(packet.connection, peek=True): - self.wakeConnection(packet.connection) - - def handleWorkComplete(self, packet): - self.handlePassthrough(packet, True) - - def handleWorkFail(self, packet): - self.handlePassthrough(packet, True) - - def handleWorkException(self, packet): - self.handlePassthrough(packet, True) - - def handleWorkData(self, packet): - self.handlePassthrough(packet) - - def handleWorkWarning(self, packet): - self.handlePassthrough(packet) - - def handleWorkStatus(self, packet): - handle = packet.getArgument(0) - job = self.jobs.get(handle) - if not job: - self.log.info("Received packet %s for unknown job" % (packet,)) - return - job.numerator = packet.getArgument(1) - job.denominator = packet.getArgument(2) - self.handlePassthrough(packet) - - def handlePassthrough(self, packet, finished=False): - handle = packet.getArgument(0) - job = self.jobs.get(handle) - if not job: - self.log.info("Received packet %s for unknown job" % (packet,)) - return - packet.code = constants.RES - if job.client_connection: - job.client_connection.sendPacket(packet) - if finished: - self._removeJob(job, dequeue=False) - self._updateStats() - - def handleSetClientID(self, packet): - name = packet.getArgument(0) - packet.connection.client_id = name - - def sendError(self, connection, code, text): - data = (str(code).encode('utf8') + b'\x00' + - str(text).encode('utf8') + b'\x00') - p = Packet(constants.RES, constants.ERROR, data) - connection.sendPacket(p) - - def handleCanDo(self, packet): - name = packet.getArgument(0) - if self.acl: - if not self.acl.canRegister(packet.connection.ssl_subject, name): - self.log.info("Ignoring CAN_DO from %s for %s due to ACL" % - (packet.connection.ssl_subject, name)) - # CAN_DO normally does not merit a response so it is - # not clear that it is appropriate to send an ERROR - # response at this point. - return - self.log.debug("Adding function %s to %s" % (name, packet.connection)) - packet.connection.functions.add(name) - self.functions.add(name) - - def handleCantDo(self, packet): - name = packet.getArgument(0) - self.log.debug("Removing function %s from %s" % - (name, packet.connection)) - packet.connection.functions.remove(name) - - def handleResetAbilities(self, packet): - self.log.debug("Resetting functions for %s" % packet.connection) - packet.connection.functions = set() - - def handleGetStatus(self, packet): - handle = packet.getArgument(0) - self.log.debug("Getting status for %s" % handle) - - known = 0 - running = 0 - numerator = b'' - denominator = b'' - job = self.jobs.get(handle) - if job: - known = 1 - if job.running: - running = 1 - numerator = job.numerator or b'' - denominator = job.denominator or b'' - - data = (handle + b'\x00' + - str(known).encode('utf8') + b'\x00' + - str(running).encode('utf8') + b'\x00' + - numerator + b'\x00' + - denominator) - p = Packet(constants.RES, constants.STATUS_RES, data) - packet.connection.sendPacket(p) diff --git a/roles/submit-log-processor-jobs/module_utils/gear_acl.py b/roles/submit-log-processor-jobs/module_utils/gear_acl.py deleted file mode 100644 index 07c9e10..0000000 --- a/roles/submit-log-processor-jobs/module_utils/gear_acl.py +++ /dev/null @@ -1,289 +0,0 @@ -# Copyright 2014 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import re - - -class ACLError(Exception): - pass - - -class ACLEntry(object): - """An access control list entry. - - :arg str subject: The SSL certificate Subject Common Name to which - the entry applies. - - :arg str register: A regular expression that matches the jobs that - connections with this certificate are permitted to register. - - :arg str invoke: A regular expression that matches the jobs that - connections with this certificate are permitted to invoke. - Also implies the permission to cancel the same set of jobs in - the queue. - - :arg boolean grant: A flag indicating whether connections with - this certificate are permitted to grant access to other - connections. Also implies the permission to revoke access - from other connections. The ability to self-revoke access is - always implied. - """ - - def __init__(self, subject, register=None, invoke=None, grant=False): - self.subject = subject - self.setRegister(register) - self.setInvoke(invoke) - self.setGrant(grant) - - def __repr__(self): - return ('' % - (self.subject, self.register, self.invoke, self.grant)) - - def isEmpty(self): - """Checks whether this entry grants any permissions at all. - - :returns: False if any permission is granted, otherwise True. - """ - if (self.register is None and - self.invoke is None and - self.grant is False): - return True - return False - - def canRegister(self, name): - """Check whether this subject is permitted to register a function. - - :arg str name: The function name to check. - :returns: A boolean indicating whether the action should be permitted. - """ - if self.register is None: - return False - if not self._register.match(name): - return False - return True - - def canInvoke(self, name): - """Check whether this subject is permitted to register a function. - - :arg str name: The function name to check. - :returns: A boolean indicating whether the action should be permitted. - """ - if self.invoke is None: - return False - if not self._invoke.match(name): - return False - return True - - def setRegister(self, register): - """Sets the functions that this subject can register. - - :arg str register: A regular expression that matches the jobs that - connections with this certificate are permitted to register. - """ - self.register = register - if register: - try: - self._register = re.compile(register) - except re.error as e: - raise ACLError('Regular expression error: %s' % (e.message,)) - else: - self._register = None - - def setInvoke(self, invoke): - """Sets the functions that this subject can invoke. - - :arg str invoke: A regular expression that matches the jobs that - connections with this certificate are permitted to invoke. - """ - self.invoke = invoke - if invoke: - try: - self._invoke = re.compile(invoke) - except re.error as e: - raise ACLError('Regular expression error: %s' % (e.message,)) - else: - self._invoke = None - - def setGrant(self, grant): - """Sets whether this subject can grant ACLs to others. - - :arg boolean grant: A flag indicating whether connections with - this certificate are permitted to grant access to other - connections. Also implies the permission to revoke access - from other connections. The ability to self-revoke access is - always implied. - """ - self.grant = grant - - -class ACL(object): - """An access control list. - - ACLs are deny-by-default. The checked actions are only allowed if - there is an explicit rule in the ACL granting permission for a - given client (identified by SSL certificate Common Name Subject) - to perform that action. - """ - - def __init__(self): - self.subjects = {} - - def add(self, entry): - """Add an ACL entry. - - :arg Entry entry: The :py:class:`ACLEntry` to add. - :raises ACLError: If there is already an entry for the subject. - """ - if entry.subject in self.subjects: - raise ACLError("An ACL entry for %s already exists" % - (entry.subject,)) - self.subjects[entry.subject] = entry - - def remove(self, subject): - """Remove an ACL entry. - - :arg str subject: The SSL certificate Subject Common Name to - remove from the ACL. - :raises ACLError: If there is no entry for the subject. - """ - if subject not in self.subjects: - raise ACLError("There is no ACL entry for %s" % (subject,)) - del self.subjects[subject] - - def getEntries(self): - """Return a list of current ACL entries. - - :returns: A list of :py:class:`ACLEntry` objects. - """ - items = list(self.subjects.items()) - items.sort(key=lambda a: a[0]) - return [x[1] for x in items] - - def canRegister(self, subject, name): - """Check whether a subject is permitted to register a function. - - :arg str subject: The SSL certificate Subject Common Name to - check against. - :arg str name: The function name to check. - :returns: A boolean indicating whether the action should be permitted. - """ - entry = self.subjects.get(subject) - if entry is None: - return False - return entry.canRegister(name) - - def canInvoke(self, subject, name): - """Check whether a subject is permitted to invoke a function. - - :arg str subject: The SSL certificate Subject Common Name to - check against. - :arg str name: The function name to check. - :returns: A boolean indicating whether the action should be permitted. - """ - entry = self.subjects.get(subject) - if entry is None: - return False - return entry.canInvoke(name) - - def canGrant(self, subject): - """Check whether a subject is permitted to grant access to others. - - :arg str subject: The SSL certificate Subject Common Name to - check against. - :returns: A boolean indicating whether the action should be permitted. - """ - entry = self.subjects.get(subject) - if entry is None: - return False - if not entry.grant: - return False - return True - - def grantInvoke(self, subject, invoke): - """Grant permission to invoke certain functions. - - :arg str subject: The SSL certificate Subject Common Name to which - the entry applies. - :arg str invoke: A regular expression that matches the jobs - that connections with this certificate are permitted to - invoke. Also implies the permission to cancel the same - set of jobs in the queue. - """ - e = self.subjects.get(subject) - if not e: - e = ACLEntry(subject) - self.add(e) - e.setInvoke(invoke) - - def grantRegister(self, subject, register): - """Grant permission to register certain functions. - - :arg str subject: The SSL certificate Subject Common Name to which - the entry applies. - :arg str register: A regular expression that matches the jobs that - connections with this certificate are permitted to register. - """ - e = self.subjects.get(subject) - if not e: - e = ACLEntry(subject) - self.add(e) - e.setRegister(register) - - def grantGrant(self, subject): - """Grant permission to grant permissions to other connections. - - :arg str subject: The SSL certificate Subject Common Name to which - the entry applies. - """ - e = self.subjects.get(subject) - if not e: - e = ACLEntry(subject) - self.add(e) - e.setGrant(True) - - def revokeInvoke(self, subject): - """Revoke permission to invoke all functions. - - :arg str subject: The SSL certificate Subject Common Name to which - the entry applies. - """ - e = self.subjects.get(subject) - if e: - e.setInvoke(None) - if e.isEmpty(): - self.remove(subject) - - def revokeRegister(self, subject): - """Revoke permission to register all functions. - - :arg str subject: The SSL certificate Subject Common Name to which - the entry applies. - """ - e = self.subjects.get(subject) - if e: - e.setRegister(None) - if e.isEmpty(): - self.remove(subject) - - def revokeGrant(self, subject): - """Revoke permission to grant permissions to other connections. - - :arg str subject: The SSL certificate Subject Common Name to which - the entry applies. - """ - e = self.subjects.get(subject) - if e: - e.setGrant(False) - if e.isEmpty(): - self.remove(subject) diff --git a/roles/submit-log-processor-jobs/module_utils/gear_constants.py b/roles/submit-log-processor-jobs/module_utils/gear_constants.py deleted file mode 100644 index 2751278..0000000 --- a/roles/submit-log-processor-jobs/module_utils/gear_constants.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Protocol Constants -================== - -These are not necessary for normal API usage. See the `Gearman -protocol reference `_ for an explanation -of each of these. - -Magic Codes ------------ - -.. py:data:: REQ - - The Gearman magic code for a request. - -.. py:data:: RES - - The Gearman magic code for a response. - -Packet Types ------------- - -""" - -types = { - 1: 'CAN_DO', - 2: 'CANT_DO', - 3: 'RESET_ABILITIES', - 4: 'PRE_SLEEP', - # unused - 6: 'NOOP', - 7: 'SUBMIT_JOB', - 8: 'JOB_CREATED', - 9: 'GRAB_JOB', - 10: 'NO_JOB', - 11: 'JOB_ASSIGN', - 12: 'WORK_STATUS', - 13: 'WORK_COMPLETE', - 14: 'WORK_FAIL', - 15: 'GET_STATUS', - 16: 'ECHO_REQ', - 17: 'ECHO_RES', - 18: 'SUBMIT_JOB_BG', - 19: 'ERROR', - 20: 'STATUS_RES', - 21: 'SUBMIT_JOB_HIGH', - 22: 'SET_CLIENT_ID', - 23: 'CAN_DO_TIMEOUT', - 24: 'ALL_YOURS', - 25: 'WORK_EXCEPTION', - 26: 'OPTION_REQ', - 27: 'OPTION_RES', - 28: 'WORK_DATA', - 29: 'WORK_WARNING', - 30: 'GRAB_JOB_UNIQ', - 31: 'JOB_ASSIGN_UNIQ', - 32: 'SUBMIT_JOB_HIGH_BG', - 33: 'SUBMIT_JOB_LOW', - 34: 'SUBMIT_JOB_LOW_BG', - 35: 'SUBMIT_JOB_SCHED', - 36: 'SUBMIT_JOB_EPOCH', -} - -for i, name in types.items(): - globals()[name] = i - __doc__ += '\n.. py:data:: %s\n' % name - -REQ = b'\x00REQ' -RES = b'\x00RES' diff --git a/roles/submit-logstash-jobs/README.rst b/roles/submit-logstash-jobs/README.rst deleted file mode 100644 index eff2320..0000000 --- a/roles/submit-logstash-jobs/README.rst +++ /dev/null @@ -1,44 +0,0 @@ -Submit a log processing job to the logstash workers. - -This role examines all of the files in the log subdirectory of the job -work dir and any matching filenames are submitted to the gearman queue -for the logstash log processor, along with any tags configured for -those filenames. - -**Role Variables** - -.. zuul:rolevar:: logstash_gearman_server - :default: logstash.openstack.org - - The gearman server to use. - -.. zuul:rolevar:: logstash_processor_config - :type: dict - - The default file configuration for the logstash parser. - - This is a dictionary that contains a single entry: - - .. zuul:rolevar:: files - :type: list - - A list of files to search for in the ``work/logs/`` directory on - the executor. Each file will be compared to the entries in this - list, and if it matches, a processing job will be submitted to - the logstash processing queue, along with the tags for the - matching entry. Order is important: the first matcing is used. - This field is list of dictionaries, as follows: - - .. zuul:rolevar:: name - - The name of the file to process. This is treated as an - unanchored regular expression. To match the full path - (underneath ``work/logs``) start and end the string with - ``^`` and ``$`` respectively. - - .. zuul:rolevar:: tags - :type: list - - A list of strings indicating the logstash processing tags - associated with this file. These may be used to indicate the - file format to the parser. diff --git a/roles/submit-logstash-jobs/defaults/main.yaml b/roles/submit-logstash-jobs/defaults/main.yaml deleted file mode 100644 index 2577c6c..0000000 --- a/roles/submit-logstash-jobs/defaults/main.yaml +++ /dev/null @@ -1,88 +0,0 @@ -logstash_gearman_server: logstash.openstack.org -# For every file found in the logs directory (and its subdirs), the -# module will attempt to match the filenames below. If there is a -# match, the file is submitted to the logstash processing queue, along -# with the tags for that match. The first match wins, so be sure to -# list more specific names first. The names are un-anchored regular -# expressions (so if you need to match the root (i.e, the work/logs/ -# directory), be sure to anchor them with ^). -logstash_processor_config: - files: - - name: job-output\.txt - tags: - - console - - console.html - - name: grenade\.sh\.txt - tags: - - console - - console.html - - name: devstacklog\.txt(?!.*summary) - tags: - - console - - console.html - - name: apache/keystone\.txt - tags: - - screen - - oslofmt - - name: apache/horizon_error\.txt - tags: - - apacheerror - # TODO(clarkb) Add swift proxy logs here. - - name: syslog\.txt - tags: - - syslog - - name: tempest\.txt - tags: - - screen - - oslofmt - - name: javelin\.txt - tags: - - screen - - oslofmt - # Neutron index log files (files with messages from all test cases) - - name: dsvm-functional-index\.txt - tags: - - oslofmt - - name: dsvm-fullstack-index\.txt - tags: - - oslofmt - - name: screen-s-account\.txt - tags: - - screen - - apachecombined - - name: screen-s-container\.txt - tags: - - screen - - apachecombined - - name: screen-s-object\.txt - tags: - - screen - - apachecombined - # tripleo logs - - name: postci\.txt - tags: - - console - - postci - - name: var/log/extra/logstash\.txt - tags: - - console - - postci - - name: var/log/extra/errors\.txt - tags: - - console - - errors - # wildcard logs - - name: devstack-gate-.*\.txt - tags: - - console - - console.html - # NOTE(mriedem): Logs that are known logstash index OOM killers are - # blacklisted here until fixed. - # screen-kubelet.txt: https://bugs.launchpad.net/kuryr-kubernetes/+bug/1795067 - # screen-mistral-engine.txt: https://bugs.launchpad.net/mistral/+bug/1795068 - # screen-monasca-persister.txt: https://storyboard.openstack.org/#!/story/2003911 - # screen-ovn-northd.txt: https://bugs.launchpad.net/networking-ovn/+bug/1795069 - - name: screen-(?!(peakmem_tracker|dstat|karaf|kubelet|mistral-engine|monasca-persister|monasca-api|ovn-northd|q-svc)).*\.txt - tags: - - screen - - oslofmt diff --git a/roles/submit-logstash-jobs/meta/main.yaml b/roles/submit-logstash-jobs/meta/main.yaml deleted file mode 100644 index 9f28a12..0000000 --- a/roles/submit-logstash-jobs/meta/main.yaml +++ /dev/null @@ -1,2 +0,0 @@ -dependencies: - - role: submit-log-processor-jobs diff --git a/roles/submit-logstash-jobs/tasks/main.yaml b/roles/submit-logstash-jobs/tasks/main.yaml deleted file mode 100644 index 37c9604..0000000 --- a/roles/submit-logstash-jobs/tasks/main.yaml +++ /dev/null @@ -1,9 +0,0 @@ -- name: Submit logstash processing jobs to log processors - submit_log_processor_jobs: - gearman_server: "{{ logstash_gearman_server }}" - job: "push-log" - config: "{{ logstash_processor_config }}" - success: "{{ zuul_success }}" - host_vars: "{{ hostvars }}" - path: "{{ zuul.executor.log_root }}" - log_url: "{{ (lookup('file', zuul.executor.result_data_file) | from_json).get('data').get('zuul').get('log_url') }}" diff --git a/roles/submit-subunit-jobs/README.rst b/roles/submit-subunit-jobs/README.rst deleted file mode 100644 index e1cfbff..0000000 --- a/roles/submit-subunit-jobs/README.rst +++ /dev/null @@ -1,36 +0,0 @@ -Submit a log processing job to the subunit workers. - -This role examines all of the files in the log subdirectory of the job -work dir and any matching filenames are submitted to the gearman queue -for the subunit log processor. - -**Role Variables** - -.. zuul:rolevar:: subunit_gearman_server - :default: logstash.openstack.org - - The gearman server to use. - -.. zuul:rolevar:: subunit_processor_config - :type: dict - - The default file configuration for the subunit parser. - - This is a dictionary that contains a single entry: - - .. zuul:rolevar:: files - :type: list - - A list of files to search for in the ``work/logs/`` directory on - the executor. Each file will be compared to the entries in this - list, and if it matches, a processing job will be submitted to - the subunit processing queue, along with the tags for the - matching entry. Order is important: the first matcing is used. - This field is list of dictionaries, as follows: - - .. zuul:rolevar:: name - - The name of the file to process. This is treated as an - unanchored regular expression. To match the full path - (underneath ``work/logs``) start and end the string with - ``^`` and ``$`` respectively. diff --git a/roles/submit-subunit-jobs/defaults/main.yaml b/roles/submit-subunit-jobs/defaults/main.yaml deleted file mode 100644 index 89b5bc5..0000000 --- a/roles/submit-subunit-jobs/defaults/main.yaml +++ /dev/null @@ -1,12 +0,0 @@ -subunit_gearman_server: logstash.openstack.org -# For every file found in the logs directory (and its subdirs), the -# module will attempt to match the filenames below. If there is a -# match, the file is submitted to the subunit processing queue, along -# with the tags for that match. The first match wins, so be sure to -# list more specific names first. The names are un-anchored regular -# expressions (so if you need to match the root (i.e, the work/logs/ -# directory), be sure to anchor them with ^). -subunit_processor_config: - files: - - name: testrepository.subunit - - name: karma.subunit diff --git a/roles/submit-subunit-jobs/meta/main.yaml b/roles/submit-subunit-jobs/meta/main.yaml deleted file mode 100644 index 9f28a12..0000000 --- a/roles/submit-subunit-jobs/meta/main.yaml +++ /dev/null @@ -1,2 +0,0 @@ -dependencies: - - role: submit-log-processor-jobs diff --git a/roles/submit-subunit-jobs/tasks/main.yaml b/roles/submit-subunit-jobs/tasks/main.yaml deleted file mode 100644 index 0bc2244..0000000 --- a/roles/submit-subunit-jobs/tasks/main.yaml +++ /dev/null @@ -1,10 +0,0 @@ -- name: Submit subunit processing jobs to log processors - when: zuul.pipeline in ['gate', 'periodic', 'post'] - submit_log_processor_jobs: - gearman_server: "{{ subunit_gearman_server }}" - job: "push-subunit" - config: "{{ subunit_processor_config }}" - success: "{{ zuul_success }}" - host_vars: "{{ hostvars }}" - path: "{{ zuul.executor.log_root }}" - log_url: "{{ (lookup('file', zuul.executor.result_data_file) | from_json).get('zuul').get('log_url') }}"