- Added logic to populate generic rawdata and glance rawdata

- Moved the responsibilty to save rawdata to the notification classes
- Notification are now created based on exchange instead of routing_key since routing_keys
  may not be unique across services
- Separate consumers are now created for every exchange specified in the config
- Each consumer is started in a separate process
- Introduced notification factory and the config module
This commit is contained in:
Manali Latkar 2013-06-24 08:52:04 +05:30
parent 4281133aac
commit 3f6542f049
17 changed files with 899 additions and 352 deletions

View File

@ -7,7 +7,11 @@
"rabbit_userid": "rabbit", "rabbit_userid": "rabbit",
"rabbit_password": "rabbit", "rabbit_password": "rabbit",
"rabbit_virtual_host": "/", "rabbit_virtual_host": "/",
"exit_on_exception": true "exit_on_exception": true,
"topics": {
"nova": ["monitor.info", "monitor.error"],
"glance": ["monitor_glance.info", "monitor_glance.error"]
}
}, },
{ {
"name": "east_coast.prod.cell1", "name": "east_coast.prod.cell1",
@ -17,6 +21,10 @@
"rabbit_userid": "rabbit", "rabbit_userid": "rabbit",
"rabbit_password": "rabbit", "rabbit_password": "rabbit",
"rabbit_virtual_host": "/", "rabbit_virtual_host": "/",
"exit_on_exception": false "exit_on_exception": false,
"topics": {
"nova": ["monitor.info", "monitor.error"],
"glance": ["monitor_glance.info", "monitor_glance.error"]
}
}] }]
} }

View File

@ -140,6 +140,8 @@ INSTALLED_APPS = (
'south' 'south'
) )
SOUTH_TESTS_MIGRATE = False
ALLOWED_HOSTS = ['*'] ALLOWED_HOSTS = ['*']
# A sample logging configuration. The only tangible logging # A sample logging configuration. The only tangible logging

View File

@ -20,7 +20,7 @@ def get_or_create_deployment(name):
return models.Deployment.objects.get_or_create(name=name) return models.Deployment.objects.get_or_create(name=name)
def create_rawdata(**kwargs): def create_nova_rawdata(**kwargs):
imagemeta_fields = ['os_architecture', 'os_version', imagemeta_fields = ['os_architecture', 'os_version',
'os_distro', 'rax_options'] 'os_distro', 'rax_options']
imagemeta_kwargs = \ imagemeta_kwargs = \
@ -35,6 +35,7 @@ def create_rawdata(**kwargs):
return rawdata return rawdata
def create_lifecycle(**kwargs): def create_lifecycle(**kwargs):
return models.Lifecycle(**kwargs) return models.Lifecycle(**kwargs)
@ -88,4 +89,18 @@ def create_instance_exists(**kwargs):
def save(obj): def save(obj):
obj.save() obj.save()
def create_glance_rawdata(**kwargs):
rawdata = models.GlanceRawData(**kwargs)
rawdata.save()
return rawdata
def create_generic_rawdata(**kwargs):
rawdata = models.GenericRawData(**kwargs)
rawdata.save()
return rawdata

View File

@ -1,8 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import copy import copy
from south.v2 import DataMigration from south.v2 import DataMigration
from stacktach.notification import Notification from stacktach.notification import notification_factory
from stacktach.views import NOTIFICATIONS
try: try:
import ujson as json import ujson as json
@ -43,8 +42,7 @@ class Migration(DataMigration):
json_dict = json.loads(json_message) json_dict = json.loads(json_message)
routing_key = json_dict[0] routing_key = json_dict[0]
body = json_dict[1] body = json_dict[1]
notification = NOTIFICATIONS[routing_key](body) return notification_factory(body, None, routing_key, json_message, None)
return notification
def forwards(self, orm): def forwards(self, orm):
# Note: Don't use "from appname.models import ModelName". # Note: Don't use "from appname.models import ModelName".
@ -79,7 +77,7 @@ class Migration(DataMigration):
exists_update_count += 1 exists_update_count += 1
print "Updated %s records in InstanceExists" % exists_update_count print "Updated %s records in InstanceExists" % exists_update_count
print "\nStarted updating records in InstacnceUsages" print "\nStarted updating records in InstanceUsages"
usages = orm.InstanceUsage.objects.all().values('request_id') usages = orm.InstanceUsage.objects.all().values('request_id')
usages_update_count = 0 usages_update_count = 0
for usage in usages: for usage in usages:

View File

@ -0,0 +1,211 @@
# -*- coding: utf-8 -*-
import datetime
from south.db import db
from south.v2 import SchemaMigration
from django.db import models
class Migration(SchemaMigration):
def forwards(self, orm):
# Adding model 'GlanceRawData'
db.create_table(u'stacktach_glancerawdata', (
(u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('deployment', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['stacktach.Deployment'])),
('owner', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)),
('json', self.gf('django.db.models.fields.TextField')()),
('routing_key', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)),
('when', self.gf('django.db.models.fields.DecimalField')(max_digits=20, decimal_places=6, db_index=True)),
('publisher', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=100, null=True, blank=True)),
('event', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)),
('service', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)),
('host', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=100, null=True, blank=True)),
('instance', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)),
('request_id', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)),
('uuid', self.gf('django.db.models.fields.CharField')(max_length=50)),
('status', self.gf('django.db.models.fields.CharField')(default='queued', max_length=50, db_index=True)),
('image_type', self.gf('django.db.models.fields.IntegerField')(default=0, null=True, db_index=True)),
))
db.send_create_signal(u'stacktach', ['GlanceRawData'])
# Adding model 'GenericRawData'
db.create_table(u'stacktach_genericrawdata', (
(u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('deployment', self.gf('django.db.models.fields.related.ForeignKey')(to=orm['stacktach.Deployment'])),
('tenant', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)),
('json', self.gf('django.db.models.fields.TextField')()),
('routing_key', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)),
('image_type', self.gf('django.db.models.fields.IntegerField')(default=0, null=True, db_index=True)),
('when', self.gf('django.db.models.fields.DecimalField')(max_digits=20, decimal_places=6, db_index=True)),
('publisher', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=100, null=True, blank=True)),
('event', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)),
('service', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)),
('host', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=100, null=True, blank=True)),
('instance', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)),
('request_id', self.gf('django.db.models.fields.CharField')(db_index=True, max_length=50, null=True, blank=True)),
))
db.send_create_signal(u'stacktach', ['GenericRawData'])
def backwards(self, orm):
# Deleting model 'GlanceRawData'
db.delete_table(u'stacktach_glancerawdata')
# Deleting model 'GenericRawData'
db.delete_table(u'stacktach_genericrawdata')
models = {
u'stacktach.deployment': {
'Meta': {'object_name': 'Deployment'},
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '50'})
},
u'stacktach.genericrawdata': {
'Meta': {'object_name': 'GenericRawData'},
'deployment': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.Deployment']"}),
'event': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'host': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '100', 'null': 'True', 'blank': 'True'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'image_type': ('django.db.models.fields.IntegerField', [], {'default': '0', 'null': 'True', 'db_index': 'True'}),
'instance': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'json': ('django.db.models.fields.TextField', [], {}),
'publisher': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '100', 'null': 'True', 'blank': 'True'}),
'request_id': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'routing_key': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'service': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'tenant': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'when': ('django.db.models.fields.DecimalField', [], {'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'})
},
u'stacktach.glancerawdata': {
'Meta': {'object_name': 'GlanceRawData'},
'deployment': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.Deployment']"}),
'event': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'host': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '100', 'null': 'True', 'blank': 'True'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'image_type': ('django.db.models.fields.IntegerField', [], {'default': '0', 'null': 'True', 'db_index': 'True'}),
'instance': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'json': ('django.db.models.fields.TextField', [], {}),
'owner': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'publisher': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '100', 'null': 'True', 'blank': 'True'}),
'request_id': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'routing_key': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'service': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'status': ('django.db.models.fields.CharField', [], {'default': "'queued'", 'max_length': '50', 'db_index': 'True'}),
'uuid': ('django.db.models.fields.CharField', [], {'max_length': '50'}),
'when': ('django.db.models.fields.DecimalField', [], {'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'})
},
u'stacktach.instancedeletes': {
'Meta': {'object_name': 'InstanceDeletes'},
'deleted_at': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'instance': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'launched_at': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}),
'raw': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.RawData']", 'null': 'True'})
},
u'stacktach.instanceexists': {
'Meta': {'object_name': 'InstanceExists'},
'audit_period_beginning': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}),
'audit_period_ending': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}),
'delete': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'to': u"orm['stacktach.InstanceDeletes']"}),
'deleted_at': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}),
'fail_reason': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '300', 'null': 'True', 'blank': 'True'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'instance': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'instance_type_id': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'launched_at': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}),
'message_id': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'os_architecture': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
'os_distro': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
'os_version': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
'raw': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'to': u"orm['stacktach.RawData']"}),
'rax_options': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
'send_status': ('django.db.models.fields.IntegerField', [], {'default': '0', 'null': 'True', 'db_index': 'True'}),
'status': ('django.db.models.fields.CharField', [], {'default': "'pending'", 'max_length': '50', 'db_index': 'True'}),
'tenant': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'usage': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'to': u"orm['stacktach.InstanceUsage']"})
},
u'stacktach.instanceusage': {
'Meta': {'object_name': 'InstanceUsage'},
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'instance': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'instance_type_id': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'launched_at': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}),
'os_architecture': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
'os_distro': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
'os_version': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
'rax_options': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
'request_id': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'tenant': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'})
},
u'stacktach.jsonreport': {
'Meta': {'object_name': 'JsonReport'},
'created': ('django.db.models.fields.DecimalField', [], {'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'json': ('django.db.models.fields.TextField', [], {}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '50', 'db_index': 'True'}),
'period_end': ('django.db.models.fields.DateTimeField', [], {'db_index': 'True'}),
'period_start': ('django.db.models.fields.DateTimeField', [], {'db_index': 'True'}),
'version': ('django.db.models.fields.IntegerField', [], {'default': '1'})
},
u'stacktach.lifecycle': {
'Meta': {'object_name': 'Lifecycle'},
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'instance': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'last_raw': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.RawData']", 'null': 'True'}),
'last_state': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'last_task_state': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'})
},
u'stacktach.rawdata': {
'Meta': {'object_name': 'RawData'},
'deployment': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.Deployment']"}),
'event': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'host': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '100', 'null': 'True', 'blank': 'True'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'image_type': ('django.db.models.fields.IntegerField', [], {'default': '0', 'null': 'True', 'db_index': 'True'}),
'instance': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'json': ('django.db.models.fields.TextField', [], {}),
'old_state': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '20', 'null': 'True', 'blank': 'True'}),
'old_task': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '30', 'null': 'True', 'blank': 'True'}),
'publisher': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '100', 'null': 'True', 'blank': 'True'}),
'request_id': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'routing_key': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'service': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'state': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '20', 'null': 'True', 'blank': 'True'}),
'task': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '30', 'null': 'True', 'blank': 'True'}),
'tenant': ('django.db.models.fields.CharField', [], {'db_index': 'True', 'max_length': '50', 'null': 'True', 'blank': 'True'}),
'when': ('django.db.models.fields.DecimalField', [], {'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'})
},
u'stacktach.rawdataimagemeta': {
'Meta': {'object_name': 'RawDataImageMeta'},
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'os_architecture': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
'os_distro': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
'os_version': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'}),
'raw': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.RawData']"}),
'rax_options': ('django.db.models.fields.TextField', [], {'null': 'True', 'blank': 'True'})
},
u'stacktach.requesttracker': {
'Meta': {'object_name': 'RequestTracker'},
'completed': ('django.db.models.fields.BooleanField', [], {'default': 'False', 'db_index': 'True'}),
'duration': ('django.db.models.fields.DecimalField', [], {'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'last_timing': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.Timing']", 'null': 'True'}),
'lifecycle': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.Lifecycle']"}),
'request_id': ('django.db.models.fields.CharField', [], {'max_length': '50', 'db_index': 'True'}),
'start': ('django.db.models.fields.DecimalField', [], {'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'})
},
u'stacktach.timing': {
'Meta': {'object_name': 'Timing'},
'diff': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6', 'db_index': 'True'}),
'end_raw': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'to': u"orm['stacktach.RawData']"}),
'end_when': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'lifecycle': ('django.db.models.fields.related.ForeignKey', [], {'to': u"orm['stacktach.Lifecycle']"}),
'name': ('django.db.models.fields.CharField', [], {'max_length': '50', 'db_index': 'True'}),
'start_raw': ('django.db.models.fields.related.ForeignKey', [], {'related_name': "'+'", 'null': 'True', 'to': u"orm['stacktach.RawData']"}),
'start_when': ('django.db.models.fields.DecimalField', [], {'null': 'True', 'max_digits': '20', 'decimal_places': '6'})
}
}
complete_apps = ['stacktach']

View File

@ -1,18 +1,3 @@
# Copyright 2012 - Dark Secret Software Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from django import forms from django import forms
from django.db import models from django.db import models
@ -24,6 +9,34 @@ class Deployment(models.Model):
return self.name return self.name
class GenericRawData(models.Model):
deployment = models.ForeignKey(Deployment)
tenant = models.CharField(max_length=50, null=True, blank=True,
db_index=True)
json = models.TextField()
routing_key = models.CharField(max_length=50, null=True,
blank=True, db_index=True)
image_type = models.IntegerField(null=True, default=0, db_index=True)
when = models.DecimalField(max_digits=20, decimal_places=6,
db_index=True)
publisher = models.CharField(max_length=100, null=True,
blank=True, db_index=True)
event = models.CharField(max_length=50, null=True,
blank=True, db_index=True)
service = models.CharField(max_length=50, null=True,
blank=True, db_index=True)
host = models.CharField(max_length=100, null=True,
blank=True, db_index=True)
instance = models.CharField(max_length=50, null=True,
blank=True, db_index=True)
request_id = models.CharField(max_length=50, null=True,
blank=True, db_index=True)
@staticmethod
def get_name():
return GenericRawData.__name__
class RawData(models.Model): class RawData(models.Model):
deployment = models.ForeignKey(Deployment) deployment = models.ForeignKey(Deployment)
tenant = models.CharField(max_length=50, null=True, blank=True, tenant = models.CharField(max_length=50, null=True, blank=True,
@ -58,6 +71,10 @@ class RawData(models.Model):
def __repr__(self): def __repr__(self):
return "%s %s %s" % (self.event, self.instance, self.state) return "%s %s %s" % (self.event, self.instance, self.state)
@staticmethod
def get_name():
return RawData.__name__
class RawDataImageMeta(models.Model): class RawDataImageMeta(models.Model):
raw = models.ForeignKey(RawData, null=False) raw = models.ForeignKey(RawData, null=False)
@ -158,6 +175,7 @@ class InstanceExists(models.Model):
(RECONCILED, 'Passed Verification After Reconciliation'), (RECONCILED, 'Passed Verification After Reconciliation'),
(FAILED, 'Failed Verification'), (FAILED, 'Failed Verification'),
] ]
instance = models.CharField(max_length=50, null=True, instance = models.CharField(max_length=50, null=True,
blank=True, db_index=True) blank=True, db_index=True)
launched_at = models.DecimalField(null=True, max_digits=20, launched_at = models.DecimalField(null=True, max_digits=20,
@ -238,5 +256,49 @@ class JsonReport(models.Model):
json = models.TextField() json = models.TextField()
class GlanceRawData(models.Model):
ACTIVE = 'active'
DELETED = 'deleted'
KILLED = 'killed'
PENDING_DELETE = 'pending_delete'
QUEUED = 'queued'
SAVING = 'saving'
STATUS_CHOICES = [
(ACTIVE, 'Active'),
(DELETED, 'Deleted'),
(KILLED, 'Killed'),
(PENDING_DELETE, 'Pending delete'),
(QUEUED, 'Queued'),
(SAVING, 'Saving'),
]
deployment = models.ForeignKey(Deployment)
owner = models.CharField(max_length=50, null=True, blank=True,
db_index=True)
json = models.TextField()
routing_key = models.CharField(max_length=50, null=True, blank=True,
db_index=True)
when = models.DecimalField(max_digits=20, decimal_places=6, db_index=True)
publisher = models.CharField(max_length=100, null=True,
blank=True, db_index=True)
event = models.CharField(max_length=50, null=True, blank=True,
db_index=True)
service = models.CharField(max_length=50, null=True, blank=True,
db_index=True)
host = models.CharField(max_length=100, null=True, blank=True,
db_index=True)
instance = models.CharField(max_length=50, null=True, blank=True,
db_index=True)
request_id = models.CharField(max_length=50, null=True, blank=True,
db_index=True)
uuid = models.CharField(max_length=50)
status = models.CharField(max_length=50, db_index=True,
choices=STATUS_CHOICES, default=QUEUED)
image_type = models.IntegerField(null=True, default=0, db_index=True)
@staticmethod
def get_name():
return GlanceRawData.__name__
def get_model_fields(model): def get_model_fields(model):
return model._meta.fields return model._meta.fields

View File

@ -1,25 +1,37 @@
# Copyright (c) 2013 - Rackspace Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
from stacktach import utils from stacktach import utils
from stacktach import image_type from stacktach import image_type
from stacktach import db
class Notification(object): class Notification(object):
def __init__(self, body): def __init__(self, body, deployment, routing_key, json):
self.body = body self.body = body
self.request_id = body.get('_context_request_id', "") self.request_id = body.get('_context_request_id', "")
self.deployment = deployment
self.routing_key = routing_key
self.json = json
self.payload = body.get('payload', {}) self.payload = body.get('payload', {})
self.state = self.payload.get('state', "")
self.old_state = self.payload.get('old_state', "")
self.old_task = self.payload.get('old_task_state', "")
self.task = self.payload.get('new_task_state', "")
self.image_type = image_type.get_numeric_code(self.payload)
self.publisher = self.body['publisher_id'] self.publisher = self.body['publisher_id']
self.event = self.body['event_type'] self.event = self.body['event_type']
image_meta = self.payload.get('image_meta', {})
self.os_architecture = image_meta.get('org.openstack__1__architecture',
'')
self.os_distro = image_meta.get('org.openstack__1__os_distro', '')
self.os_version = image_meta.get('org.openstack__1__os_version', '')
self.rax_options = image_meta.get('com.rackspace__1__options', '')
@property @property
def when(self): def when(self):
@ -29,30 +41,24 @@ class Notification(object):
when = utils.str_time_to_unix(when) when = utils.str_time_to_unix(when)
return when return when
def rawdata_kwargs(self, deployment, routing_key, json): @property
return { def service(self):
'deployment': deployment, parts = self.publisher.split('.')
'routing_key': routing_key, return parts[0]
'event': self.event,
'publisher': self.publisher, @property
'json': json, def host(self):
'state': self.state, host = None
'old_state': self.old_state, parts = self.publisher.split('.')
'task': self.task, if len(parts) > 1:
'old_task': self.old_task, host = ".".join(parts[1:])
'image_type': self.image_type, return host
'when': self.when,
'publisher': self.publisher, @property
'service': self.service, def tenant(self):
'host': self.host, tenant = self.body.get('_context_project_id', None)
'instance': self.instance, tenant = self.payload.get('tenant_id', tenant)
'request_id': self.request_id, return tenant
'tenant': self.tenant,
'os_architecture': self.os_architecture,
'os_distro': self.os_distro,
'os_version': self.os_version,
'rax_options': self.rax_options
}
@property @property
def instance(self): def instance(self):
@ -65,6 +71,71 @@ class Notification(object):
instance = self.payload.get('instance', {}).get('uuid') instance = self.payload.get('instance', {}).get('uuid')
return instance return instance
def save(self):
return db.create_generic_rawdata(deployment=self.deployment,
routing_key=self.routing_key,
tenant=self.tenant,
json=self.json,
when=self.when,
publisher=self.publisher,
event=self.event,
service=self.service,
host=self.host,
instance=self.instance,
request_id=self.request_id)
class GlanceNotification(Notification):
def __init__(self, body, deployment, routing_key, json):
super(GlanceNotification, self).__init__(body, deployment,
routing_key, json)
self.properties = self.payload.get('properties', {})
self.image_type = image_type.get_numeric_code(self.payload)
self.status = self.payload.get('status', None)
self.uuid = self.payload.get('id', None)
@property
def owner(self):
return self.payload.get('owner', None)
@property
def instance(self):
return self.properties.get('instance_uuid', None)
def save(self):
db.create_glance_rawdata(deployment=self.deployment,
routing_key=self.routing_key,
owner=self.owner,
json=self.json,
when=self.when,
publisher=self.publisher,
event=self.event,
service=self.service,
host=self.host,
instance=self.instance,
request_id=self.request_id,
image_type=self.image_type,
status=self.status,
uuid=self.uuid)
class NovaNotification(Notification):
def __init__(self, body, deployment, routing_key, json):
super(NovaNotification, self).__init__(body, deployment, routing_key,
json)
self.state = self.payload.get('state', "")
self.old_state = self.payload.get('old_state', "")
self.old_task = self.payload.get('old_task_state', "")
self.task = self.payload.get('new_task_state', "")
self.image_type = image_type.get_numeric_code(self.payload)
image_meta = self.payload.get('image_meta', {})
self.os_architecture = image_meta.get('org.openstack__1__architecture',
'')
self.os_distro = image_meta.get('org.openstack__1__os_distro', '')
self.os_version = image_meta.get('org.openstack__1__os_version', '')
self.rax_options = image_meta.get('com.rackspace__1__options', '')
@property @property
def host(self): def host(self):
host = None host = None
@ -78,8 +149,31 @@ class Notification(object):
parts = self.publisher.split('.') parts = self.publisher.split('.')
return parts[0] return parts[0]
@property def save(self):
def tenant(self): return db.create_nova_rawdata(deployment=self.deployment,
tenant = self.body.get('_context_project_id', None) routing_key=self.routing_key,
tenant = self.payload.get('tenant_id', tenant) tenant=self.tenant,
return tenant json=self.json,
when=self.when,
publisher=self.publisher,
event=self.event,
service=self.service,
host=self.host,
instance=self.instance,
request_id=self.request_id,
state=self.state,
old_state=self.old_state,
task=self.task,
old_task=self.old_task,
os_architecture=self.os_architecture,
os_distro=self.os_distro,
os_version=self.os_version,
rax_options=self.rax_options)
def notification_factory(body, deployment, routing_key, json, exchange):
if exchange == 'nova':
return NovaNotification(body, deployment, routing_key, json)
if exchange == "glance":
return GlanceNotification(body, deployment, routing_key, json)
return Notification(body, deployment, routing_key, json)

View File

@ -19,39 +19,106 @@
# IN THE SOFTWARE. # IN THE SOFTWARE.
from datetime import datetime from datetime import datetime
import unittest from django.test import TransactionTestCase
import db import db
from stacktach.datetime_to_decimal import dt_to_decimal from stacktach.datetime_to_decimal import dt_to_decimal
from stacktach.models import RawDataImageMeta from stacktach.models import RawDataImageMeta
from stacktach.models import GenericRawData
from stacktach.models import GlanceRawData
from stacktach.models import RawData from stacktach.models import RawData
from stacktach.models import get_model_fields from stacktach.models import get_model_fields
class RawDataImageMetaDbTestCase(unittest.TestCase): class RawDataImageMetaDbTestCase(TransactionTestCase):
def test_create_raw_data_should_populate_rawdata_and_rawdata_imagemeta(self): def test_create_raw_data_should_populate_rawdata_and_rawdata_imagemeta(self):
deployment = db.get_or_create_deployment('deployment1')[0] deployment = db.get_or_create_deployment('deployment1')[0]
kwargs = { kwargs = {
'deployment': deployment, 'deployment': deployment,
'when': dt_to_decimal(datetime.utcnow()), 'when': dt_to_decimal(datetime.utcnow()),
'tenant': '1', 'json': '{}', 'routing_key': 'monitor.info', 'tenant': '1',
'state': 'verifying', 'old_state': 'pending', 'json': '{}',
'old_task': '', 'task': '', 'image_type': 1, 'routing_key': 'monitor.info',
'publisher': '', 'event': 'compute.instance.exists', 'state': 'verifying',
'service': '', 'host': '', 'instance': '1234-5678-9012-3456', 'old_state': 'pending',
'request_id': '1234', 'os_architecture': 'x86', 'os_version': '1', 'old_task': 'building',
'os_distro': 'windows', 'rax_options': '2'} 'task': 'saving',
'image_type': 1,
'publisher': 'publisher',
'event': 'compute.instance.exists',
'service': 'compute',
'host': 'host',
'instance': '1234-5678-9012-3456',
'request_id': '1234',
'os_architecture': 'x86',
'os_version': '1',
'os_distro': 'windows',
'rax_options': '2'}
rawdata = db.create_rawdata(**kwargs) rawdata = db.create_nova_rawdata(**kwargs)
for field in get_model_fields(RawData): for field in get_model_fields(RawData):
if field.name != 'id': if field.name != 'id':
self.assertEquals(getattr(rawdata, field.name), self.assertEquals(getattr(rawdata, field.name),
kwargs[field.name]) kwargs[field.name])
raw_image_meta = RawDataImageMeta.objects.all()[0] raw_image_meta = RawDataImageMeta.objects.filter(raw_id=rawdata.id)[0]
self.assertEquals(raw_image_meta.raw, rawdata)
self.assertEquals(raw_image_meta.os_architecture, self.assertEquals(raw_image_meta.os_architecture,
kwargs['os_architecture']) kwargs['os_architecture'])
self.assertEquals(raw_image_meta.os_version, kwargs['os_version']) self.assertEquals(raw_image_meta.os_version, kwargs['os_version'])
self.assertEquals(raw_image_meta.os_distro, kwargs['os_distro']) self.assertEquals(raw_image_meta.os_distro, kwargs['os_distro'])
self.assertEquals(raw_image_meta.rax_options, kwargs['rax_options']) self.assertEquals(raw_image_meta.rax_options, kwargs['rax_options'])
class GlanceRawDataTestCase(TransactionTestCase):
def test_create_rawdata_should_populate_glance_rawdata(self):
deployment = db.get_or_create_deployment('deployment1')[0]
kwargs = {
'deployment': deployment,
'when': dt_to_decimal(datetime.utcnow()),
'owner': '1234567',
'json': '{}',
'routing_key': 'glance_monitor.info',
'image_type': 1,
'publisher': 'publisher',
'event': 'event',
'service': 'service',
'host': 'host',
'instance': '1234-5678-9012-3456',
'request_id': '1234',
'uuid': '1234-5678-0912-3456',
'status': 'active',
}
db.create_glance_rawdata(**kwargs)
rawdata = GlanceRawData.objects.all().order_by('-id')[0]
for field in get_model_fields(GlanceRawData):
if field.name != 'id':
self.assertEquals(getattr(rawdata, field.name),
kwargs[field.name])
class GenericRawDataTestCase(TransactionTestCase):
def test_create_generic_rawdata_should_populate_generic_rawdata(self):
deployment = db.get_or_create_deployment('deployment1')[0]
kwargs = {
'deployment': deployment,
'when': dt_to_decimal(datetime.utcnow()),
'tenant': '1234567',
'json': '{}',
'routing_key': 'monitor.info',
'image_type': 1,
'publisher': 'publisher',
'event': 'event',
'service': 'service',
'host': 'host',
'instance': '1234-5678-9012-3456',
'request_id': '1234'}
db.create_generic_rawdata(**kwargs)
rawdata = GenericRawData.objects.all()[0]
for field in get_model_fields(GenericRawData):
if field.name != 'id':
self.assertEquals(getattr(rawdata, field.name),
kwargs[field.name])

View File

@ -12,7 +12,7 @@ from stacktach import db as stackdb
from stacktach import models from stacktach import models
from stacktach import stacklog from stacktach import stacklog
from stacktach import utils from stacktach import utils
from stacktach.notification import Notification from stacktach import notification
STACKDB = stackdb STACKDB = stackdb
@ -25,13 +25,6 @@ def log_warn(msg):
LOG.warn(msg) LOG.warn(msg)
# routing_key : handler
NOTIFICATIONS = {
'monitor.info': Notification,
'monitor.error': Notification}
def start_kpi_tracking(lifecycle, raw): def start_kpi_tracking(lifecycle, raw):
"""Start the clock for kpi timings when we see an instance.update """Start the clock for kpi timings when we see an instance.update
coming in from an api node.""" coming in from an api node."""
@ -279,10 +272,8 @@ def _process_exists(raw, body):
values['tenant'] = payload['tenant_id'] values['tenant'] = payload['tenant_id']
image_meta = payload.get('image_meta', {}) image_meta = payload.get('image_meta', {})
values['rax_options'] = image_meta.get('com.rackspace__1__options', '') values['rax_options'] = image_meta.get('com.rackspace__1__options', '')
os_arch = image_meta.get('org.openstack__1__architecture', '') values['os_architecture'] = image_meta.get('org.openstack__1__architecture', '')
values['os_architecture'] = os_arch values['os_version'] = image_meta.get('org.openstack__1__os_version', '')
os_version = image_meta.get('org.openstack__1__os_version', '')
values['os_version'] = os_version
values['os_distro'] = image_meta.get('org.openstack__1__os_distro', '') values['os_distro'] = image_meta.get('org.openstack__1__os_distro', '')
deleted_at = payload.get('deleted_at') deleted_at = payload.get('deleted_at')
@ -327,26 +318,29 @@ def aggregate_usage(raw, body):
USAGE_PROCESS_MAPPING[raw.event](raw, body) USAGE_PROCESS_MAPPING[raw.event](raw, body)
def process_raw_data(deployment, args, json_args): def process_raw_data(deployment, args, json_args, exchange):
"""This is called directly by the worker to add the event to the db.""" """This is called directly by the worker to add the event to the db."""
db.reset_queries() db.reset_queries()
routing_key, body = args routing_key, body = args
record = None notif = notification.notification_factory(body, deployment, routing_key,
notification = NOTIFICATIONS[routing_key](body) json_args, exchange)
if notification: return notif.save()
values = notification.rawdata_kwargs(deployment, routing_key, json_args)
if not values:
return record
record = STACKDB.create_rawdata(**values)
return record
def post_process(raw, body): def post_process_rawdata(raw, body):
aggregate_lifecycle(raw) aggregate_lifecycle(raw)
aggregate_usage(raw, body) aggregate_usage(raw, body)
def post_process_glancerawdata(raw, body):
pass
def post_process_genericrawdata(raw, body):
pass
def _post_process_raw_data(rows, highlight=None): def _post_process_raw_data(rows, highlight=None):
for row in rows: for row in rows:
if "error" in row.routing_key: if "error" in row.routing_key:

33
tests/unit/test_models.py Normal file
View File

@ -0,0 +1,33 @@
# Copyright (c) 2013 - Rackspace Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
import unittest
from stacktach.models import RawData, GlanceRawData, GenericRawData
class ModelsTestCase(unittest.TestCase):
def test_get_name_for_rawdata(self):
self.assertEquals(RawData.get_name(), 'RawData')
def test_get_name_for_glancerawdata(self):
self.assertEquals(GlanceRawData.get_name(), 'GlanceRawData')
def test_get_name_for_genericrawdata(self):
self.assertEquals(GenericRawData.get_name(), 'GenericRawData')

View File

@ -18,163 +18,167 @@
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE. # IN THE SOFTWARE.
from decimal import Decimal
import unittest import unittest
import mox
from stacktach import notification, utils
from stacktach.notification import Notification from stacktach.notification import Notification
from tests.unit.utils import REQUEST_ID_1, TENANT_ID_1, INSTANCE_ID_1 from stacktach.notification import GlanceNotification
from stacktach import db
from tests.unit.utils import REQUEST_ID_1
from tests.unit.utils import TIMESTAMP_1
from tests.unit.utils import TENANT_ID_1
from tests.unit.utils import INSTANCE_ID_1
class NovaNotificationTestCase(unittest.TestCase):
def setUp(self):
self.mox = mox.Mox()
def tearDown(self):
self.mox.UnsetStubs()
def test_factory_should_return_nova_notification_for_nova_exchange(
self):
body = {}
deployment = "1"
json = "{}"
routing_key = "monitor.info"
self.mox.StubOutWithMock(notification, 'NovaNotification')
notification.NovaNotification(body, deployment, routing_key, json)
self.mox.ReplayAll()
notification.notification_factory(body, deployment, routing_key, json,
'nova')
self.mox.VerifyAll()
def test_factory_should_return_glance_notification_for_glance_exchange(
self):
body = {}
deployment = "1"
json = "{}"
routing_key = "monitor_glance.info"
self.mox.StubOutWithMock(notification, 'GlanceNotification')
notification.GlanceNotification(body, deployment, routing_key, json)
self.mox.ReplayAll()
notification.notification_factory(body, deployment, routing_key, json,
'glance')
self.mox.VerifyAll()
def test_factory_should_return_notification_for_unknown_exchange(
self):
body = {}
deployment = "1"
json = "{}"
routing_key = "unknown.info"
self.mox.StubOutWithMock(notification, 'Notification')
notification.Notification(body, deployment, routing_key, json)
self.mox.ReplayAll()
notification.notification_factory(body, deployment, routing_key, json,
'unknown_exchange')
self.mox.VerifyAll()
class GlanceNotificationTestCase(unittest.TestCase):
def setUp(self):
self.mox = mox.Mox()
def tearDown(self):
self.mox.UnsetStubs()
def test_save_should_persist_glance_rawdata_to_database(self):
body = {
"event_type": "image.upload",
"timestamp": "2013-06-20 17:31:57.939614",
"publisher_id": "glance-api01-r2961.global.preprod-ord.ohthree.com",
"payload": {
"status": "saving",
"properties": {
"image_type": "snapshot",
"instance_uuid": INSTANCE_ID_1,
},
"owner": TENANT_ID_1,
"id": "2df2ccf6-bc1b-4853-aab0-25fda346b3bb",
}
}
deployment = "1"
routing_key = "glance_monitor.info"
json = '{["routing_key", {%s}]}' % body
self.mox.StubOutWithMock(db, 'create_glance_rawdata')
db.create_glance_rawdata(
deployment="1",
owner=TENANT_ID_1,
json=json,
routing_key=routing_key,
when=utils.str_time_to_unix("2013-06-20 17:31:57.939614"),
publisher="glance-api01-r2961.global.preprod-ord.ohthree.com",
event="image.upload",
service="glance-api01-r2961",
host="global.preprod-ord.ohthree.com",
instance=INSTANCE_ID_1,
request_id=None,
image_type=0,
status="saving",
uuid="2df2ccf6-bc1b-4853-aab0-25fda346b3bb")
self.mox.ReplayAll()
notification = GlanceNotification(body, deployment, routing_key,
json)
notification.save()
self.mox.VerifyAll()
class NotificationTestCase(unittest.TestCase): class NotificationTestCase(unittest.TestCase):
def setUp(self):
self.mox = mox.Mox()
def test_rawdata_kwargs(self): def tearDown(self):
message = { self.mox.UnsetStubs()
'event_type': 'compute.instance.create.start',
'publisher_id': 'compute.cpu1-n01.example.com', def test_save_should_persist_generic_rawdata_to_database(self):
body = {
"event_type": "image.upload",
'_context_request_id': REQUEST_ID_1, '_context_request_id': REQUEST_ID_1,
'_context_project_id': TENANT_ID_1, '_context_project_id': TENANT_ID_1,
'timestamp': '2013-06-12 06:30:52.790476', "timestamp": TIMESTAMP_1,
'payload': { "publisher_id": "glance-api01-r2961.global.preprod-ord.ohthree.com",
"payload": {
'instance_id': INSTANCE_ID_1, 'instance_id': INSTANCE_ID_1,
'state': 'active', "status": "saving",
'old_state': 'building', "container_format": "ovf",
'old_task_state': 'build', "properties": {
"new_task_state": 'rebuild_spawning', "image_type": "snapshot",
'image_meta': { },
'image_type': 'base', "tenant": "5877054",
'org.openstack__1__architecture': 'x64',
'org.openstack__1__os_distro': 'com.microsoft.server',
'org.openstack__1__os_version': '2008.2',
'com.rackspace__1__options': '36'
}
} }
} }
kwargs = Notification(message).rawdata_kwargs('1', 'monitor.info', 'json') deployment = "1"
routing_key = "generic_monitor.info"
json = '{["routing_key", {%s}]}' % body
self.mox.StubOutWithMock(db, 'create_generic_rawdata')
db.create_generic_rawdata(
deployment="1",
tenant=TENANT_ID_1,
json=json,
routing_key=routing_key,
when=utils.str_time_to_unix(TIMESTAMP_1),
publisher="glance-api01-r2961.global.preprod-ord.ohthree.com",
event="image.upload",
service="glance-api01-r2961",
host="global.preprod-ord.ohthree.com",
instance=INSTANCE_ID_1,
request_id=REQUEST_ID_1)
self.assertEquals(kwargs['host'], 'cpu1-n01.example.com') self.mox.ReplayAll()
self.assertEquals(kwargs['deployment'], '1')
self.assertEquals(kwargs['routing_key'], 'monitor.info')
self.assertEquals(kwargs['tenant'], TENANT_ID_1)
self.assertEquals(kwargs['json'], 'json')
self.assertEquals(kwargs['state'], 'active')
self.assertEquals(kwargs['old_state'], 'building')
self.assertEquals(kwargs['old_task'], 'build')
self.assertEquals(kwargs['task'], 'rebuild_spawning')
self.assertEquals(kwargs['image_type'], 1)
self.assertEquals(kwargs['when'], Decimal('1371018652.790476'))
self.assertEquals(kwargs['publisher'], 'compute.cpu1-n01.example.com')
self.assertEquals(kwargs['event'], 'compute.instance.create.start')
self.assertEquals(kwargs['request_id'], REQUEST_ID_1)
def test_rawdata_kwargs_missing_image_meta(self): notification = Notification(body, deployment, routing_key, json)
message = { notification.save()
'event_type': 'compute.instance.create.start', self.mox.VerifyAll()
'publisher_id': 'compute.cpu1-n01.example.com',
'_context_request_id': REQUEST_ID_1,
'_context_project_id': TENANT_ID_1,
'timestamp': '2013-06-12 06:30:52.790476',
'payload': {
'instance_id': INSTANCE_ID_1,
'state': 'active',
'old_state': 'building',
'old_task_state': 'build',
"new_task_state": 'rebuild_spawning',
'image_meta': {
'image_type': 'base',
}
}
}
kwargs = Notification(message).rawdata_kwargs('1', 'monitor.info', 'json')
self.assertEquals(kwargs['host'], 'cpu1-n01.example.com')
self.assertEquals(kwargs['deployment'], '1')
self.assertEquals(kwargs['routing_key'], 'monitor.info')
self.assertEquals(kwargs['tenant'], TENANT_ID_1)
self.assertEquals(kwargs['json'], 'json')
self.assertEquals(kwargs['state'], 'active')
self.assertEquals(kwargs['old_state'], 'building')
self.assertEquals(kwargs['old_task'], 'build')
self.assertEquals(kwargs['task'], 'rebuild_spawning')
self.assertEquals(kwargs['image_type'], 1)
self.assertEquals(kwargs['when'], Decimal('1371018652.790476'))
self.assertEquals(kwargs['publisher'], 'compute.cpu1-n01.example.com')
self.assertEquals(kwargs['event'], 'compute.instance.create.start')
self.assertEquals(kwargs['request_id'], REQUEST_ID_1)
def test_rawdata_kwargs_for_message_with_no_host(self):
message = {
'event_type': 'compute.instance.create.start',
'publisher_id': 'compute',
'_context_request_id': REQUEST_ID_1,
'_context_project_id': TENANT_ID_1,
'timestamp': '2013-06-12 06:30:52.790476',
'payload': {
'instance_id': INSTANCE_ID_1,
'state': 'active',
'old_state': 'building',
'old_task_state': 'build',
"new_task_state": 'rebuild_spawning',
'image_meta': {
'image_type': 'base',
'org.openstack__1__architecture': 'x64',
'org.openstack__1__os_distro': 'com.microsoft.server',
'org.openstack__1__os_version': '2008.2',
'com.rackspace__1__options': '36'
}
}
}
kwargs = Notification(message).rawdata_kwargs('1', 'monitor.info', 'json')
self.assertEquals(kwargs['host'], None)
self.assertEquals(kwargs['deployment'], '1')
self.assertEquals(kwargs['routing_key'], 'monitor.info')
self.assertEquals(kwargs['tenant'], TENANT_ID_1)
self.assertEquals(kwargs['json'], 'json')
self.assertEquals(kwargs['state'], 'active')
self.assertEquals(kwargs['old_state'], 'building')
self.assertEquals(kwargs['old_task'], 'build')
self.assertEquals(kwargs['task'], 'rebuild_spawning')
self.assertEquals(kwargs['image_type'], 1)
self.assertEquals(kwargs['when'], Decimal('1371018652.790476'))
self.assertEquals(kwargs['publisher'], 'compute')
self.assertEquals(kwargs['event'], 'compute.instance.create.start')
self.assertEquals(kwargs['request_id'], REQUEST_ID_1)
def test_rawdata_kwargs_for_message_with_exception(self):
message = {
'event_type': 'compute.instance.create.start',
'publisher_id': 'compute.cpu1-n01.example.com',
'_context_request_id': REQUEST_ID_1,
'_context_project_id': TENANT_ID_1,
'timestamp': '2013-06-12 06:30:52.790476',
'payload': {
'exception': {'kwargs':{'uuid': INSTANCE_ID_1}},
'instance_id': INSTANCE_ID_1,
'state': 'active',
'old_state': 'building',
'old_task_state': 'build',
"new_task_state": 'rebuild_spawning',
'image_meta': {
'image_type': 'base',
'org.openstack__1__architecture': 'x64',
'org.openstack__1__os_distro': 'com.microsoft.server',
'org.openstack__1__os_version': '2008.2',
'com.rackspace__1__options': '36'
}
}
}
kwargs = Notification(message).rawdata_kwargs('1', 'monitor.info', 'json')
self.assertEquals(kwargs['host'], 'cpu1-n01.example.com')
self.assertEquals(kwargs['deployment'], '1')
self.assertEquals(kwargs['routing_key'], 'monitor.info')
self.assertEquals(kwargs['tenant'], TENANT_ID_1)
self.assertEquals(kwargs['json'], 'json')
self.assertEquals(kwargs['state'], 'active')
self.assertEquals(kwargs['old_state'], 'building')
self.assertEquals(kwargs['old_task'], 'build')
self.assertEquals(kwargs['task'], 'rebuild_spawning')
self.assertEquals(kwargs['image_type'], 1)
self.assertEquals(kwargs['when'], Decimal('1371018652.790476'))
self.assertEquals(kwargs['publisher'], 'compute.cpu1-n01.example.com')
self.assertEquals(kwargs['event'], 'compute.instance.create.start')
self.assertEquals(kwargs['request_id'], REQUEST_ID_1)

View File

@ -37,6 +37,7 @@ from utils import INSTANCE_TYPE_ID_1
from utils import DUMMY_TIME from utils import DUMMY_TIME
from utils import INSTANCE_TYPE_ID_2 from utils import INSTANCE_TYPE_ID_2
from stacktach import stacklog from stacktach import stacklog
from stacktach import notification
from stacktach import views from stacktach import views
@ -59,54 +60,45 @@ class StacktachRawParsingTestCase(unittest.TestCase):
dict = { dict = {
'timestamp': when, 'timestamp': when,
} }
args = ('monitor.info', dict) routing_key = 'monitor.info'
args = (routing_key, dict)
json_args = json.dumps(args) json_args = json.dumps(args)
raw_values = { mock_record = self.mox.CreateMockAnything()
'deployment': deployment,
'when': utils.decimal_utc(datetime.datetime.strptime(when, '%Y-%m-%d %H:%M:%S.%f')),
'host': 'api',
'routing_key': 'monitor.info',
'json': json_args
}
old_info_handler = views.NOTIFICATIONS['monitor.info']
mock_notification = self.mox.CreateMockAnything() mock_notification = self.mox.CreateMockAnything()
mock_notification.rawdata_kwargs(deployment, 'monitor.info', json_args).AndReturn(raw_values) mock_notification.save().AndReturn(mock_record)
views.NOTIFICATIONS['monitor.info'] = lambda message_body: mock_notification self.mox.StubOutWithMock(notification, 'notification_factory')
exchange = 'nova'
views.STACKDB.create_rawdata(**raw_values) notification.notification_factory(dict, deployment, routing_key,
json_args, exchange).AndReturn(
mock_notification)
self.mox.ReplayAll() self.mox.ReplayAll()
views.process_raw_data(deployment, args, json_args)
self.mox.VerifyAll()
views.NOTIFICATIONS['monitor.info'] = old_info_handler self.assertEquals(
views.process_raw_data(deployment, args, json_args, exchange),
mock_record)
self.mox.VerifyAll()
def test_process_raw_data_old_timestamp(self): def test_process_raw_data_old_timestamp(self):
deployment = self.mox.CreateMockAnything() deployment = self.mox.CreateMockAnything()
when = '2013-1-25T13:38:23.123' when = '2013-1-25T13:38:23.123'
dict = { dict = {
'_context_timestamp': when, '_context_timestamp': when,
} }
routing_key = 'monitor.info'
args = ('monitor.info', dict) args = ('monitor.info', dict)
json_args = json.dumps(args[1]) json_args = json.dumps(args[1])
raw_values = {
'deployment': deployment,
'when': utils.decimal_utc(datetime.datetime.strptime(when, '%Y-%m-%dT%H:%M:%S.%f')),
'host': 'api',
'routing_key': 'monitor.info',
'json': json_args
}
old_info_handler = views.NOTIFICATIONS['monitor.info']
mock_notification = self.mox.CreateMockAnything()
mock_notification.rawdata_kwargs(deployment, 'monitor.info', json_args).AndReturn(raw_values)
views.NOTIFICATIONS['monitor.info'] = lambda message_body: mock_notification
views.STACKDB.create_rawdata(**raw_values) mock_notification = self.mox.CreateMockAnything()
mock_notification.save()
self.mox.StubOutWithMock(notification, 'notification_factory')
exchange = 'nova'
notification.notification_factory(dict, deployment, routing_key,
json_args, exchange).AndReturn(mock_notification)
self.mox.ReplayAll() self.mox.ReplayAll()
views.process_raw_data(deployment, args, json_args)
views.process_raw_data(deployment, args, json_args, exchange)
self.mox.VerifyAll() self.mox.VerifyAll()
views.NOTIFICATIONS['monitor.info'] = old_info_handler
class StacktachLifecycleTestCase(unittest.TestCase): class StacktachLifecycleTestCase(unittest.TestCase):
def setUp(self): def setUp(self):

View File

@ -22,15 +22,14 @@ import json
import unittest import unittest
import kombu import kombu
import kombu.entity
import kombu.connection
import mox import mox
from stacktach import db, views from stacktach import db
from stacktach import views
import worker.worker as worker import worker.worker as worker
class NovaConsumerTestCase(unittest.TestCase): class ConsumerTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.mox = mox.Mox() self.mox = mox.Mox()
@ -47,9 +46,10 @@ class NovaConsumerTestCase(unittest.TestCase):
consumer = self.mox.CreateMockAnything() consumer = self.mox.CreateMockAnything()
created_consumers.append(consumer) created_consumers.append(consumer)
return consumer return consumer
self.mox.StubOutWithMock(worker.NovaConsumer, '_create_exchange') self.mox.StubOutWithMock(worker.Consumer, '_create_exchange')
self.mox.StubOutWithMock(worker.NovaConsumer, '_create_queue') self.mox.StubOutWithMock(worker.Consumer, '_create_queue')
consumer = worker.NovaConsumer('test', None, None, True, {}) consumer = worker.Consumer('test', None, None, True, {}, "nova",
["monitor.info", "monitor.error"])
exchange = self.mox.CreateMockAnything() exchange = self.mox.CreateMockAnything()
consumer._create_exchange('nova', 'topic').AndReturn(exchange) consumer._create_exchange('nova', 'topic').AndReturn(exchange)
info_queue = self.mox.CreateMockAnything() info_queue = self.mox.CreateMockAnything()
@ -71,7 +71,8 @@ class NovaConsumerTestCase(unittest.TestCase):
def test_create_exchange(self): def test_create_exchange(self):
args = {'key': 'value'} args = {'key': 'value'}
consumer = worker.NovaConsumer('test', None, None, True, args) consumer = worker.Consumer('test', None, None, True, args, 'nova',
["monitor.info", "monitor.error"])
self.mox.StubOutClassWithMocks(kombu.entity, 'Exchange') self.mox.StubOutClassWithMocks(kombu.entity, 'Exchange')
exchange = kombu.entity.Exchange('nova', type='topic', exclusive=False, exchange = kombu.entity.Exchange('nova', type='topic', exclusive=False,
@ -87,7 +88,8 @@ class NovaConsumerTestCase(unittest.TestCase):
queue = kombu.Queue('name', exchange, auto_delete=False, durable=True, queue = kombu.Queue('name', exchange, auto_delete=False, durable=True,
exclusive=False, routing_key='routing.key', exclusive=False, routing_key='routing.key',
queue_arguments={}) queue_arguments={})
consumer = worker.NovaConsumer('test', None, None, True, {}) consumer = worker.Consumer('test', None, None, True, {}, 'nova',
["monitor.info", "monitor.error"])
self.mox.ReplayAll() self.mox.ReplayAll()
actual_queue = consumer._create_queue('name', exchange, 'routing.key', actual_queue = consumer._create_queue('name', exchange, 'routing.key',
exclusive=False, exclusive=False,
@ -103,7 +105,8 @@ class NovaConsumerTestCase(unittest.TestCase):
queue = kombu.Queue('name', exchange, auto_delete=False, durable=True, queue = kombu.Queue('name', exchange, auto_delete=False, durable=True,
exclusive=False, routing_key='routing.key', exclusive=False, routing_key='routing.key',
queue_arguments=queue_args) queue_arguments=queue_args)
consumer = worker.NovaConsumer('test', None, None, True, queue_args) consumer = worker.Consumer('test', None, None, True, queue_args,
'nova', ["monitor.info", "monitor.error"])
self.mox.ReplayAll() self.mox.ReplayAll()
actual_queue = consumer._create_queue('name', exchange, 'routing.key', actual_queue = consumer._create_queue('name', exchange, 'routing.key',
exclusive=False, exclusive=False,
@ -114,21 +117,29 @@ class NovaConsumerTestCase(unittest.TestCase):
def test_process(self): def test_process(self):
deployment = self.mox.CreateMockAnything() deployment = self.mox.CreateMockAnything()
raw = self.mox.CreateMockAnything() raw = self.mox.CreateMockAnything()
raw.get_name().AndReturn('RawData')
message = self.mox.CreateMockAnything() message = self.mox.CreateMockAnything()
consumer = worker.NovaConsumer('test', None, deployment, True, {}) exchange = 'nova'
consumer = worker.Consumer('test', None, deployment, True, {},
exchange, ["monitor.info", "monitor.error"])
routing_key = 'monitor.info' routing_key = 'monitor.info'
message.delivery_info = {'routing_key': routing_key} message.delivery_info = {'routing_key': routing_key}
body_dict = {u'key': u'value'} body_dict = {u'key': u'value'}
message.body = json.dumps(body_dict) message.body = json.dumps(body_dict)
mock_post_process_method = self.mox.CreateMockAnything()
mock_post_process_method(raw, body_dict)
old_handler = worker.POST_PROCESS_METHODS
worker.POST_PROCESS_METHODS["RawData"] = mock_post_process_method
self.mox.StubOutWithMock(views, 'process_raw_data', self.mox.StubOutWithMock(views, 'process_raw_data',
use_mock_anything=True) use_mock_anything=True)
args = (routing_key, body_dict) args = (routing_key, body_dict)
views.process_raw_data(deployment, args, json.dumps(args))\ views.process_raw_data(deployment, args, json.dumps(args), exchange) \
.AndReturn(raw) .AndReturn(raw)
message.ack() message.ack()
self.mox.StubOutWithMock(views, 'post_process')
views.post_process(raw, body_dict)
self.mox.StubOutWithMock(consumer, '_check_memory', self.mox.StubOutWithMock(consumer, '_check_memory',
use_mock_anything=True) use_mock_anything=True)
consumer._check_memory() consumer._check_memory()
@ -136,13 +147,16 @@ class NovaConsumerTestCase(unittest.TestCase):
consumer._process(message) consumer._process(message)
self.assertEqual(consumer.processed, 1) self.assertEqual(consumer.processed, 1)
self.mox.VerifyAll() self.mox.VerifyAll()
worker.POST_PROCESS_METHODS["RawData"] = old_handler
def test_process_no_raw_dont_ack(self): def test_process_no_raw_dont_ack(self):
deployment = self.mox.CreateMockAnything() deployment = self.mox.CreateMockAnything()
raw = self.mox.CreateMockAnything() raw = self.mox.CreateMockAnything()
message = self.mox.CreateMockAnything() message = self.mox.CreateMockAnything()
consumer = worker.NovaConsumer('test', None, deployment, True, {}) exchange = 'nova'
consumer = worker.Consumer('test', None, deployment, True, {},
exchange, ["monitor.info", "monitor.error"])
routing_key = 'monitor.info' routing_key = 'monitor.info'
message.delivery_info = {'routing_key': routing_key} message.delivery_info = {'routing_key': routing_key}
body_dict = {u'key': u'value'} body_dict = {u'key': u'value'}
@ -150,8 +164,8 @@ class NovaConsumerTestCase(unittest.TestCase):
self.mox.StubOutWithMock(views, 'process_raw_data', self.mox.StubOutWithMock(views, 'process_raw_data',
use_mock_anything=True) use_mock_anything=True)
args = (routing_key, body_dict) args = (routing_key, body_dict)
views.process_raw_data(deployment, args, json.dumps(args))\ views.process_raw_data(deployment, args, json.dumps(args), exchange) \
.AndReturn(None) .AndReturn(None)
self.mox.StubOutWithMock(consumer, '_check_memory', self.mox.StubOutWithMock(consumer, '_check_memory',
use_mock_anything=True) use_mock_anything=True)
consumer._check_memory() consumer._check_memory()
@ -168,7 +182,9 @@ class NovaConsumerTestCase(unittest.TestCase):
'rabbit_port': 5672, 'rabbit_port': 5672,
'rabbit_userid': 'rabbit', 'rabbit_userid': 'rabbit',
'rabbit_password': 'rabbit', 'rabbit_password': 'rabbit',
'rabbit_virtual_host': '/' 'rabbit_virtual_host': '/',
"services": ["nova"],
"topics": {"nova": ["monitor.info", "monitor.error"]}
} }
self.mox.StubOutWithMock(db, 'get_or_create_deployment') self.mox.StubOutWithMock(db, 'get_or_create_deployment')
deployment = self.mox.CreateMockAnything() deployment = self.mox.CreateMockAnything()
@ -187,13 +203,15 @@ class NovaConsumerTestCase(unittest.TestCase):
kombu.connection.BrokerConnection(**params).AndReturn(conn) kombu.connection.BrokerConnection(**params).AndReturn(conn)
conn.__enter__().AndReturn(conn) conn.__enter__().AndReturn(conn)
conn.__exit__(None, None, None).AndReturn(None) conn.__exit__(None, None, None).AndReturn(None)
self.mox.StubOutClassWithMocks(worker, 'NovaConsumer') self.mox.StubOutClassWithMocks(worker, 'Consumer')
consumer = worker.NovaConsumer(config['name'], conn, deployment, exchange = 'nova'
config['durable_queue'], {}) consumer = worker.Consumer(config['name'], conn, deployment,
config['durable_queue'], {}, exchange,
["monitor.info", "monitor.error"])
consumer.run() consumer.run()
worker.continue_running().AndReturn(False) worker.continue_running().AndReturn(False)
self.mox.ReplayAll() self.mox.ReplayAll()
worker.run(config) worker.run(config, exchange)
self.mox.VerifyAll() self.mox.VerifyAll()
def test_run_queue_args(self): def test_run_queue_args(self):
@ -205,7 +223,9 @@ class NovaConsumerTestCase(unittest.TestCase):
'rabbit_userid': 'rabbit', 'rabbit_userid': 'rabbit',
'rabbit_password': 'rabbit', 'rabbit_password': 'rabbit',
'rabbit_virtual_host': '/', 'rabbit_virtual_host': '/',
'queue_arguments': {'x-ha-policy': 'all'} 'queue_arguments': {'x-ha-policy': 'all'},
"services": ["nova"],
"topics": {"nova": ["monitor.info", "monitor.error"]}
} }
self.mox.StubOutWithMock(db, 'get_or_create_deployment') self.mox.StubOutWithMock(db, 'get_or_create_deployment')
deployment = self.mox.CreateMockAnything() deployment = self.mox.CreateMockAnything()
@ -224,12 +244,14 @@ class NovaConsumerTestCase(unittest.TestCase):
kombu.connection.BrokerConnection(**params).AndReturn(conn) kombu.connection.BrokerConnection(**params).AndReturn(conn)
conn.__enter__().AndReturn(conn) conn.__enter__().AndReturn(conn)
conn.__exit__(None, None, None).AndReturn(None) conn.__exit__(None, None, None).AndReturn(None)
self.mox.StubOutClassWithMocks(worker, 'NovaConsumer') self.mox.StubOutClassWithMocks(worker, 'Consumer')
consumer = worker.NovaConsumer(config['name'], conn, deployment, exchange = 'nova'
config['durable_queue'], consumer = worker.Consumer(config['name'], conn, deployment,
config['queue_arguments']) config['durable_queue'],
config['queue_arguments'], exchange,
["monitor.info", "monitor.error"])
consumer.run() consumer.run()
worker.continue_running().AndReturn(False) worker.continue_running().AndReturn(False)
self.mox.ReplayAll() self.mox.ReplayAll()
worker.run(config) worker.run(config, exchange)
self.mox.VerifyAll() self.mox.VerifyAll()

View File

@ -52,6 +52,7 @@ OS_ARCH_2 = "x64"
OS_VERSION_1 = "1" OS_VERSION_1 = "1"
OS_VERSION_2 = "2" OS_VERSION_2 = "2"
TIMESTAMP_1 = "2013-06-20 17:31:57.939614"
def decimal_utc(t = datetime.datetime.utcnow()): def decimal_utc(t = datetime.datetime.utcnow()):
return dt.dt_to_decimal(t) return dt.dt_to_decimal(t)

43
worker/config.py Normal file
View File

@ -0,0 +1,43 @@
# Copyright (c) 2013 - Rackspace Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
import json
import os
config_filename = os.environ.get('STACKTACH_DEPLOYMENTS_FILE',
'stacktach_worker_config.json')
try:
from local_settings import *
config_filename = STACKTACH_DEPLOYMENTS_FILE
except ImportError:
pass
config = None
with open(config_filename, "r") as f:
config = json.load(f)
def deployments():
return config['deployments']
def topics():
return config['topics']

View File

@ -1,9 +1,9 @@
import json
import os import os
import signal import signal
import sys import sys
from multiprocessing import Process from multiprocessing import Process
from worker import config
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir, os.pardir)) os.pardir, os.pardir))
@ -12,14 +12,6 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')):
import worker.worker as worker import worker.worker as worker
config_filename = os.environ.get('STACKTACH_DEPLOYMENTS_FILE',
'stacktach_worker_config.json')
try:
from local_settings import *
config_filename = STACKTACH_DEPLOYMENTS_FILE
except ImportError:
pass
processes = [] processes = []
@ -35,18 +27,15 @@ def kill_time(signal, frame):
if __name__ == '__main__': if __name__ == '__main__':
config = None
with open(config_filename, "r") as f:
config = json.load(f)
deployments = config['deployments'] for deployment in config.deployments():
for deployment in deployments:
if deployment.get('enabled', True): if deployment.get('enabled', True):
process = Process(target=worker.run, args=(deployment,)) for exchange in deployment.get('topics').keys():
process.daemon = True process = Process(target=worker.run, args=(deployment,
process.start() exchange,))
processes.append(process) process.daemon = True
process.start()
processes.append(process)
signal.signal(signal.SIGINT, kill_time) signal.signal(signal.SIGINT, kill_time)
signal.signal(signal.SIGTERM, kill_time) signal.signal(signal.SIGTERM, kill_time)
signal.pause() signal.pause()

View File

@ -17,12 +17,13 @@
# to set TENANT_ID and URL to point to your StackTach web server. # to set TENANT_ID and URL to point to your StackTach web server.
import datetime import datetime
import kombu
import kombu.entity
import kombu.mixins
import sys import sys
import time import time
import kombu
import kombu.mixins
try: try:
import ujson as json import ujson as json
except ImportError: except ImportError:
@ -41,8 +42,9 @@ stacklog.set_default_logger_name('worker')
LOG = stacklog.get_logger() LOG = stacklog.get_logger()
class NovaConsumer(kombu.mixins.ConsumerMixin): class Consumer(kombu.mixins.ConsumerMixin):
def __init__(self, name, connection, deployment, durable, queue_arguments): def __init__(self, name, connection, deployment, durable, queue_arguments,
exchange, topics):
self.connection = connection self.connection = connection
self.deployment = deployment self.deployment = deployment
self.durable = durable self.durable = durable
@ -52,6 +54,8 @@ class NovaConsumer(kombu.mixins.ConsumerMixin):
self.pmi = None self.pmi = None
self.processed = 0 self.processed = 0
self.total_processed = 0 self.total_processed = 0
self.topics = topics
self.exchange = exchange
def _create_exchange(self, name, type, exclusive=False, auto_delete=False): def _create_exchange(self, name, type, exclusive=False, auto_delete=False):
return kombu.entity.Exchange(name, type=type, exclusive=exclusive, return kombu.entity.Exchange(name, type=type, exclusive=exclusive,
@ -66,14 +70,12 @@ class NovaConsumer(kombu.mixins.ConsumerMixin):
routing_key=routing_key) routing_key=routing_key)
def get_consumers(self, Consumer, channel): def get_consumers(self, Consumer, channel):
nova_exchange = self._create_exchange("nova", "topic") exchange = self._create_exchange(self.exchange, "topic")
nova_queues = [ queues = [self._create_queue(topic, exchange, topic)
self._create_queue('monitor.info', nova_exchange, 'monitor.info'), for topic in self.topics]
self._create_queue('monitor.error', nova_exchange, 'monitor.error')
]
return [Consumer(queues=nova_queues, callbacks=[self.on_nova])] return [Consumer(queues=queues, callbacks=[self.on_nova])]
def _process(self, message): def _process(self, message):
routing_key = message.delivery_info['routing_key'] routing_key = message.delivery_info['routing_key']
@ -81,14 +83,13 @@ class NovaConsumer(kombu.mixins.ConsumerMixin):
body = str(message.body) body = str(message.body)
args = (routing_key, json.loads(body)) args = (routing_key, json.loads(body))
asJson = json.dumps(args) asJson = json.dumps(args)
# save raw and ack the message # save raw and ack the message
raw = views.process_raw_data(self.deployment, args, asJson) raw = views.process_raw_data(self.deployment, args, asJson, self.exchange)
if raw: if raw:
self.processed += 1 self.processed += 1
message.ack() message.ack()
views.post_process(raw, args[1]) POST_PROCESS_METHODS[raw.get_name()](raw, args[1])
self._check_memory() self._check_memory()
@ -140,7 +141,7 @@ def exit_or_sleep(exit=False):
time.sleep(5) time.sleep(5)
def run(deployment_config): def run(deployment_config, exchange):
name = deployment_config['name'] name = deployment_config['name']
host = deployment_config.get('rabbit_host', 'localhost') host = deployment_config.get('rabbit_host', 'localhost')
port = deployment_config.get('rabbit_port', 5672) port = deployment_config.get('rabbit_port', 5672)
@ -150,11 +151,13 @@ def run(deployment_config):
durable = deployment_config.get('durable_queue', True) durable = deployment_config.get('durable_queue', True)
queue_arguments = deployment_config.get('queue_arguments', {}) queue_arguments = deployment_config.get('queue_arguments', {})
exit_on_exception = deployment_config.get('exit_on_exception', False) exit_on_exception = deployment_config.get('exit_on_exception', False)
topics = deployment_config.get('topics', {})
deployment, new = db.get_or_create_deployment(name) deployment, new = db.get_or_create_deployment(name)
print "Starting worker for '%s'" % name print "Starting worker for '%s %s'" % (name, exchange)
LOG.info("%s: %s %s %s %s" % (name, host, port, user_id, virtual_host)) LOG.info("%s: %s %s %s %s %s" % (name, exchange, host, port, user_id,
virtual_host))
params = dict(hostname=host, params = dict(hostname=host,
port=port, port=port,
@ -166,21 +169,30 @@ def run(deployment_config):
# continue_running() is used for testing # continue_running() is used for testing
while continue_running(): while continue_running():
try: try:
LOG.debug("Processing on '%s'" % name) LOG.debug("Processing on '%s %s'" % (name, exchange))
with kombu.connection.BrokerConnection(**params) as conn: with kombu.connection.BrokerConnection(**params) as conn:
try: try:
consumer = NovaConsumer(name, conn, deployment, durable, consumer = Consumer(name, conn, deployment, durable,
queue_arguments) queue_arguments, exchange,
topics[exchange])
consumer.run() consumer.run()
except Exception as e: except Exception as e:
LOG.error("!!!!Exception!!!!") LOG.error("!!!!Exception!!!!")
LOG.exception("name=%s, exception=%s. Reconnecting in 5s" % LOG.exception("name=%s, exchange=%s, exception=%s. "
(name, e)) "Reconnecting in 5s" %
(name, exchange, e))
exit_or_sleep(exit_on_exception) exit_or_sleep(exit_on_exception)
LOG.debug("Completed processing on '%s'" % name) LOG.debug("Completed processing on '%s %s'" % (name, exchange))
except: except:
LOG.error("!!!!Exception!!!!") LOG.error("!!!!Exception!!!!")
e = sys.exc_info()[0] e = sys.exc_info()[0]
msg = "Uncaught exception: deployment=%s, exception=%s. Retrying in 5s" msg = "Uncaught exception: deployment=%s, exchange=%s, " \
LOG.exception(msg % (name, e)) "exception=%s. Retrying in 5s"
LOG.exception(msg % (name, exchange, e))
exit_or_sleep(exit_on_exception) exit_or_sleep(exit_on_exception)
POST_PROCESS_METHODS = {
'RawData': views.post_process_rawdata,
'GlanceRawData': views.post_process_glancerawdata,
'GenericRawData': views.post_process_genericrawdata
}