Change counter to sample vocable in cm.publisher
This changes counter to sample vocable in ceilometer.publisher code Parts of the blueprint remove-counter Change-Id: I1fe0269ef9f1f98a93ef12bd5a4d55b7b1d5a5f2
This commit is contained in:
parent
32b135f1ea
commit
6f7da3a192
@ -44,4 +44,4 @@ class PublisherBase(object):
|
|||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def publish_samples(self, context, samples):
|
def publish_samples(self, context, samples):
|
||||||
"Publish counters into final conduit."
|
"Publish samples into final conduit."
|
||||||
|
@ -86,11 +86,11 @@ class FilePublisher(publisher.PublisherBase):
|
|||||||
rfh.setLevel(logging.INFO)
|
rfh.setLevel(logging.INFO)
|
||||||
self.publisher_logger.addHandler(rfh)
|
self.publisher_logger.addHandler(rfh)
|
||||||
|
|
||||||
def publish_samples(self, context, counters):
|
def publish_samples(self, context, samples):
|
||||||
"""Send a metering message for publishing
|
"""Send a metering message for publishing
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call
|
:param context: Execution context from the service or RPC call
|
||||||
:param counter: Counter from pipeline after transformation
|
:param samples: Samples from pipeline after transformation
|
||||||
"""
|
"""
|
||||||
if self.publisher_logger:
|
if self.publisher_logger:
|
||||||
self.publisher_logger.info(counters)
|
self.publisher_logger.info(samples)
|
||||||
|
@ -15,7 +15,7 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
"""Publish a counter using the preferred RPC mechanism.
|
"""Publish a sample using the preferred RPC mechanism.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import hashlib
|
import hashlib
|
||||||
@ -84,23 +84,23 @@ def verify_signature(message, secret):
|
|||||||
return new_sig == old_sig
|
return new_sig == old_sig
|
||||||
|
|
||||||
|
|
||||||
def meter_message_from_counter(counter, secret):
|
def meter_message_from_counter(sample, secret):
|
||||||
"""Make a metering message ready to be published or stored.
|
"""Make a metering message ready to be published or stored.
|
||||||
|
|
||||||
Returns a dictionary containing a metering message
|
Returns a dictionary containing a metering message
|
||||||
for a notification message and a Counter instance.
|
for a notification message and a Sample instance.
|
||||||
"""
|
"""
|
||||||
msg = {'source': counter.source,
|
msg = {'source': sample.source,
|
||||||
'counter_name': counter.name,
|
'counter_name': sample.name,
|
||||||
'counter_type': counter.type,
|
'counter_type': sample.type,
|
||||||
'counter_unit': counter.unit,
|
'counter_unit': sample.unit,
|
||||||
'counter_volume': counter.volume,
|
'counter_volume': sample.volume,
|
||||||
'user_id': counter.user_id,
|
'user_id': sample.user_id,
|
||||||
'project_id': counter.project_id,
|
'project_id': sample.project_id,
|
||||||
'resource_id': counter.resource_id,
|
'resource_id': sample.resource_id,
|
||||||
'timestamp': counter.timestamp,
|
'timestamp': sample.timestamp,
|
||||||
'resource_metadata': counter.resource_metadata,
|
'resource_metadata': sample.resource_metadata,
|
||||||
'message_id': counter.id,
|
'message_id': sample.id,
|
||||||
}
|
}
|
||||||
msg['message_signature'] = compute_signature(msg, secret)
|
msg['message_signature'] = compute_signature(msg, secret)
|
||||||
return msg
|
return msg
|
||||||
@ -136,19 +136,19 @@ class RPCPublisher(publisher.PublisherBase):
|
|||||||
% self.policy)
|
% self.policy)
|
||||||
self.policy = 'default'
|
self.policy = 'default'
|
||||||
|
|
||||||
def publish_samples(self, context, counters):
|
def publish_samples(self, context, samples):
|
||||||
"""Publish counters on RPC.
|
"""Publish samples on RPC.
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call.
|
:param context: Execution context from the service or RPC call.
|
||||||
:param counters: Counters from pipeline after transformation.
|
:param samples: Samples from pipeline after transformation.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
meters = [
|
meters = [
|
||||||
meter_message_from_counter(
|
meter_message_from_counter(
|
||||||
counter,
|
sample,
|
||||||
cfg.CONF.publisher_rpc.metering_secret)
|
cfg.CONF.publisher_rpc.metering_secret)
|
||||||
for counter in counters
|
for sample in samples
|
||||||
]
|
]
|
||||||
|
|
||||||
topic = cfg.CONF.publisher_rpc.metering_topic
|
topic = cfg.CONF.publisher_rpc.metering_topic
|
||||||
@ -157,7 +157,7 @@ class RPCPublisher(publisher.PublisherBase):
|
|||||||
'version': '1.0',
|
'version': '1.0',
|
||||||
'args': {'data': meters},
|
'args': {'data': meters},
|
||||||
}
|
}
|
||||||
LOG.audit('Publishing %d counters on %s',
|
LOG.audit('Publishing %d samples on %s',
|
||||||
len(msg['args']['data']), topic)
|
len(msg['args']['data']), topic)
|
||||||
self.local_queue.append((context, topic, msg))
|
self.local_queue.append((context, topic, msg))
|
||||||
|
|
||||||
@ -171,7 +171,7 @@ class RPCPublisher(publisher.PublisherBase):
|
|||||||
'args': {'data': list(meter_list)},
|
'args': {'data': list(meter_list)},
|
||||||
}
|
}
|
||||||
topic_name = topic + '.' + meter_name
|
topic_name = topic + '.' + meter_name
|
||||||
LOG.audit('Publishing %d counters on %s',
|
LOG.audit('Publishing %d samples on %s',
|
||||||
len(msg['args']['data']), topic_name)
|
len(msg['args']['data']), topic_name)
|
||||||
self.local_queue.append((context, topic_name, msg))
|
self.local_queue.append((context, topic_name, msg))
|
||||||
|
|
||||||
@ -197,7 +197,7 @@ class RPCPublisher(publisher.PublisherBase):
|
|||||||
count = queue_length - self.max_queue_length
|
count = queue_length - self.max_queue_length
|
||||||
self.local_queue = self.local_queue[count:]
|
self.local_queue = self.local_queue[count:]
|
||||||
LOG.warn("Publisher max local_queue length is exceeded, "
|
LOG.warn("Publisher max local_queue length is exceeded, "
|
||||||
"dropping %d oldest counters", count)
|
"dropping %d oldest samples", count)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _process_queue(queue, policy):
|
def _process_queue(queue, policy):
|
||||||
@ -220,14 +220,14 @@ class RPCPublisher(publisher.PublisherBase):
|
|||||||
try:
|
try:
|
||||||
rpc.cast(context, topic, msg)
|
rpc.cast(context, topic, msg)
|
||||||
except (SystemExit, rpc.common.RPCException):
|
except (SystemExit, rpc.common.RPCException):
|
||||||
counters = sum([len(m['args']['data']) for _, _, m in queue])
|
samples = sum([len(m['args']['data']) for _, _, m in queue])
|
||||||
if policy == 'queue':
|
if policy == 'queue':
|
||||||
LOG.warn("Failed to publish %s counters, queue them",
|
LOG.warn("Failed to publish %s samples, queue them",
|
||||||
counters)
|
samples)
|
||||||
return queue
|
return queue
|
||||||
elif policy == 'drop':
|
elif policy == 'drop':
|
||||||
LOG.warn("Failed to publish %d counters, dropping them",
|
LOG.warn("Failed to publish %d samples, dropping them",
|
||||||
counters)
|
samples)
|
||||||
return []
|
return []
|
||||||
# default, occur only if rabbit_max_retries > 0
|
# default, occur only if rabbit_max_retries > 0
|
||||||
raise
|
raise
|
||||||
|
@ -15,7 +15,7 @@
|
|||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
"""Publish a counter using an UDP mechanism
|
"""Publish a sample using an UDP mechanism
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from ceilometer import publisher
|
from ceilometer import publisher
|
||||||
@ -41,23 +41,23 @@ class UDPPublisher(publisher.PublisherBase):
|
|||||||
self.socket = socket.socket(socket.AF_INET,
|
self.socket = socket.socket(socket.AF_INET,
|
||||||
socket.SOCK_DGRAM)
|
socket.SOCK_DGRAM)
|
||||||
|
|
||||||
def publish_samples(self, context, counters):
|
def publish_samples(self, context, samples):
|
||||||
"""Send a metering message for publishing
|
"""Send a metering message for publishing
|
||||||
|
|
||||||
:param context: Execution context from the service or RPC call
|
:param context: Execution context from the service or RPC call
|
||||||
:param counter: Counter from pipeline after transformation
|
:param samples: Samples from pipeline after transformation
|
||||||
"""
|
"""
|
||||||
|
|
||||||
for counter in counters:
|
for sample in samples:
|
||||||
msg = counter.as_dict()
|
msg = sample.as_dict()
|
||||||
host = self.host
|
host = self.host
|
||||||
port = self.port
|
port = self.port
|
||||||
LOG.debug(_("Publishing counter %(msg)s over UDP to "
|
LOG.debug(_("Publishing sample %(msg)s over UDP to "
|
||||||
"%(host)s:%(port)d") % {'msg': msg, 'host': host,
|
"%(host)s:%(port)d") % {'msg': msg, 'host': host,
|
||||||
'port': port})
|
'port': port})
|
||||||
try:
|
try:
|
||||||
self.socket.sendto(msgpack.dumps(msg),
|
self.socket.sendto(msgpack.dumps(msg),
|
||||||
(self.host, self.port))
|
(self.host, self.port))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.warn(_("Unable to send counter over UDP"))
|
LOG.warn(_("Unable to send sample over UDP"))
|
||||||
LOG.exception(e)
|
LOG.exception(e)
|
||||||
|
Loading…
Reference in New Issue
Block a user