Viewing file: parser.py (7.09 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
from __future__ import unicode_literals
import re
from .metrics_core import Metric from .samples import Sample
try: import StringIO except ImportError: # Python 3 import io as StringIO
def text_string_to_metric_families(text): """Parse Prometheus text format from a unicode string.
See text_fd_to_metric_families. """ for metric_family in text_fd_to_metric_families(StringIO.StringIO(text)): yield metric_family
ESCAPE_SEQUENCES = { '\\\\': '\\', '\\n': '\n', '\\"': '"', }
def replace_escape_sequence(match): return ESCAPE_SEQUENCES[match.group(0)]
HELP_ESCAPING_RE = re.compile(r'\\[\\n]') ESCAPING_RE = re.compile(r'\\[\\n"]')
def _replace_help_escaping(s): return HELP_ESCAPING_RE.sub(replace_escape_sequence, s)
def _replace_escaping(s): return ESCAPING_RE.sub(replace_escape_sequence, s)
def _is_character_escaped(s, charpos): num_bslashes = 0 while (charpos > num_bslashes and s[charpos - 1 - num_bslashes] == '\\'): num_bslashes += 1 return num_bslashes % 2 == 1
def _parse_labels(labels_string): labels = {} # Return if we don't have valid labels if "=" not in labels_string: return labels
escaping = False if "\\" in labels_string: escaping = True
# Copy original labels sub_labels = labels_string try: # Process one label at a time while sub_labels: # The label name is before the equal value_start = sub_labels.index("=") label_name = sub_labels[:value_start] sub_labels = sub_labels[value_start + 1:].lstrip() # Find the first quote after the equal quote_start = sub_labels.index('"') + 1 value_substr = sub_labels[quote_start:]
# Find the last unescaped quote i = 0 while i < len(value_substr): i = value_substr.index('"', i) if not _is_character_escaped(value_substr, i): break i += 1
# The label value is between the first and last quote quote_end = i + 1 label_value = sub_labels[quote_start:quote_end] # Replace escaping if needed if escaping: label_value = _replace_escaping(label_value) labels[label_name.strip()] = label_value
# Remove the processed label from the sub-slice for next iteration sub_labels = sub_labels[quote_end + 1:] next_comma = sub_labels.find(",") + 1 sub_labels = sub_labels[next_comma:].lstrip()
return labels
except ValueError: raise ValueError("Invalid labels: %s" % labels_string)
# If we have multiple values only consider the first def _parse_value_and_timestamp(s): s = s.lstrip() separator = " " if separator not in s: separator = "\t" values = [value.strip() for value in s.split(separator) if value.strip()] if not values: return float(s), None value = float(values[0]) timestamp = (float(values[-1])/1000) if len(values) > 1 else None return value, timestamp
def _parse_sample(text): # Detect the labels in the text try: label_start, label_end = text.index("{"), text.rindex("}") # The name is before the labels name = text[:label_start].strip() # We ignore the starting curly brace label = text[label_start + 1:label_end] # The value is after the label end (ignoring curly brace and space) value, timestamp = _parse_value_and_timestamp(text[label_end + 2:]) return Sample(name, _parse_labels(label), value, timestamp)
# We don't have labels except ValueError: # Detect what separator is used separator = " " if separator not in text: separator = "\t" name_end = text.index(separator) name = text[:name_end] # The value is after the name value, timestamp = _parse_value_and_timestamp(text[name_end:]) return Sample(name, {}, value, timestamp)
def text_fd_to_metric_families(fd): """Parse Prometheus text format from a file descriptor.
This is a laxer parser than the main Go parser, so successful parsing does not imply that the parsed text meets the specification.
Yields Metric's. """ name = '' documentation = '' typ = 'untyped' samples = [] allowed_names = []
def build_metric(name, documentation, typ, samples): # Munge counters into OpenMetrics representation # used internally. if typ == 'counter': if name.endswith('_total'): name = name[:-6] else: new_samples = [] for s in samples: new_samples.append(Sample(s[0] + '_total', *s[1:])) samples = new_samples metric = Metric(name, documentation, typ) metric.samples = samples return metric
for line in fd: line = line.strip()
if line.startswith('#'): parts = line.split(None, 3) if len(parts) < 2: continue if parts[1] == 'HELP': if parts[2] != name: if name != '': yield build_metric(name, documentation, typ, samples) # New metric name = parts[2] typ = 'untyped' samples = [] allowed_names = [parts[2]] if len(parts) == 4: documentation = _replace_help_escaping(parts[3]) else: documentation = '' elif parts[1] == 'TYPE': if parts[2] != name: if name != '': yield build_metric(name, documentation, typ, samples) # New metric name = parts[2] documentation = '' samples = [] typ = parts[3] allowed_names = { 'counter': [''], 'gauge': [''], 'summary': ['_count', '_sum', ''], 'histogram': ['_count', '_sum', '_bucket'], }.get(typ, ['']) allowed_names = [name + n for n in allowed_names] else: # Ignore other comment tokens pass elif line == '': # Ignore blank lines pass else: sample = _parse_sample(line) if sample.name not in allowed_names: if name != '': yield build_metric(name, documentation, typ, samples) # New metric, yield immediately as untyped singleton name = '' documentation = '' typ = 'untyped' samples = [] allowed_names = [] yield build_metric(sample[0], documentation, typ, [sample]) else: samples.append(sample)
if name != '': yield build_metric(name, documentation, typ, samples)
|