# Copyright 2013 Rackspace Australia # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import git import logging import os import select #import shutils import subprocess import swiftclient import time class GitRepository(object): """ Manage a git repository for our uses """ log = logging.getLogger("lib.utils.GitRepository") def __init__(self, remote_url, local_path): self.remote_url = remote_url self.local_path = local_path self._ensure_cloned() self.repo = git.Repo(self.local_path) def _ensure_cloned(self): if not os.path.exists(self.local_path): self.log.debug("Cloning from %s to %s" % (self.remote_url, self.local_path)) git.Repo.clone_from(self.remote_url, self.local_path) def fetch(self, ref): # The git.remote.fetch method may read in git progress info and # interpret it improperly causing an AssertionError. Because the # data was fetched properly subsequent fetches don't seem to fail. # So try again if an AssertionError is caught. origin = self.repo.remotes.origin self.log.debug("Fetching %s from %s" % (ref, origin)) try: origin.fetch(ref) except AssertionError: origin.fetch(ref) def checkout(self, ref): self.log.debug("Checking out %s" % ref) return self.repo.git.checkout(ref) def reset(self): self._ensure_cloned() self.log.debug("Resetting repository %s" % self.local_path) self.update() origin = self.repo.remotes.origin for ref in origin.refs: if ref.remote_head == 'HEAD': continue self.repo.create_head(ref.remote_head, ref, force=True) # Reset to remote HEAD (usually origin/master) self.repo.head.reference = origin.refs['HEAD'] self.repo.head.reset(index=True, working_tree=True) self.repo.git.clean('-x', '-f', '-d') def update(self): self._ensure_cloned() self.log.debug("Updating repository %s" % self.local_path) origin = self.repo.remotes.origin origin.update() # If the remote repository is repacked, the repo object's # cache may be out of date. Specifically, it caches whether # to check the loose or packed DB for a given SHA. Further, # if there was no pack or lose directory to start with, the # repo object may not even have a database for it. Avoid # these problems by recreating the repo object. self.repo = git.Repo(self.local_path) def execute_to_log(cmd, logfile, timeout=-1, watch_logs=[ ('[syslog]', '/var/log/syslog'), ('[sqlslo]', '/var/log/mysql/slow-queries.log'), ('[sqlerr]', '/var/log/mysql/error.log') ], heartbeat=True ): """ Executes a command and logs the STDOUT/STDERR and output of any supplied watch_logs from logs into a new logfile watch_logs is a list of tuples with (name,file) """ if not os.path.isdir(os.path.dirname(logfile)): os.makedirs(os.path.dirname(logfile)) logger = logging.getLogger('execute_to_log') log_hanlder = logging.FileHandler(logfile) log_formatter = logging.Formatter('%(asctime)s %(message)s') log_hanlder.setFormatter(log_formatter) logger.addHandler(log_hanlder) descriptors = {} for watch_file in watch_logs: fd = os.open(watch_file[1], os.O_RDONLY) os.lseek(fd, 0, os.SEEK_END) descriptors[fd] = dict( name=watch_file[0], poll=select.POLLIN, lines='' ) cmd += ' 2>&1' start_time = time.time() p = subprocess.Popen( cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) descriptors[p.stdout.fileno()] = dict( name='[output]', poll=(select.POLLIN | select.POLLHUP), lines='' ) poll_obj = select.poll() for fd, descriptor in descriptors.items(): poll_obj.register(fd, descriptor['poll']) last_heartbeat = time.time() def process(fd): """ Write the fd to log """ global last_heartbeat descriptors[fd]['lines'] += os.read(fd, 1024 * 1024) # Avoid partial lines by only processing input with breaks if descriptors[fd]['lines'].find('\n') != -1: elems = descriptors[fd]['lines'].split('\n') # Take all but the partial line for l in elems[:-1]: if len(l) > 0: l = '%s %s' % (descriptors[fd]['name'], l) logger.info(l) last_heartbeat = time.time() # Place the partial line back into lines to be processed descriptors[fd]['lines'] = elems[-1] while p.poll() is None: if timeout > 0 and time.time() - start_time > timeout: # Append to logfile logger.info("[timeout]") os.kill(p.pid, 9) for fd, flag in poll_obj.poll(0): process(fd) if time.time() - last_heartbeat > 30: # Append to logfile logger.info("[heartbeat]") last_heartbeat = time.time() # Do one last write to get the remaining lines for fd, flag in poll_obj.poll(0): process(fd) logger.info('[script exit code = %d]' % p.returncode) def push_file(job_name, file_path, publish_config): """ Push a log file to a server. Returns the public URL """ method = publish_config['type'] + '_push_file' if method in locals(): return locals(method)(job_name, file_path, publish_config) def swift_push_file(job_name, file_path, swift_config): """ Push a log file to a swift server. """ with open(file_path, 'r') as fd: name = job_name + '_' + os.path.basename(file_path) con = swiftclient.client.Connection(swift_config['authurl'], swift_config['user'], swift_config['apikey']) obj = con.put_object(swift_config['container'], name, fd) return obj def local_push_file(job_name, file_path, local_config): """ Copy the file locally somewhere sensible """ dest = os.path.join(local_config['path'], job_name) os.makedirs(dest) dest_file = os.path.join(dest, os.path.basename(file_path)) os.copyfile(file_path, dest_file) return dest_file def scp_push_file(job_name, file_path, local_config): """ Copy the file remotely over ssh """ pass