Source code for p3exporter.collector

"""Entry point for collector sub module."""
import inflection
import logging
import re

from importlib import import_module
from prometheus_client.core import REGISTRY


[docs]class CollectorConfig(object): """Class that provide all the logic needed for configuration handling.""" def __init__(self, **kwargs): """Initialize instance variables. All configuration parameters are handed over as separate arguments. :raises Exception: Raises an exception if credentials are not well configured. """ self.exporter_name = kwargs.pop('exporter_name', None) self.collectors = kwargs.pop('collectors', []) self.collector_opts = kwargs.pop('collector_opts', {})
[docs]class CollectorBase(object): """Base class for all collectors. This class will provide methods that do generic work. """ def __init__(self, config: CollectorConfig): """Instantiate a CollectorBase object.""" self.collector_name = self.collector_name_from_class self.opts = config.collector_opts.pop(self.collector_name, {}) @property def collector_name_from_class(self): """Convert class name to controller name. The class name must follow naming convention: * camelized string * starts with camelized module name * ends with 'Collector' This will convert <Name>Collector class name to <name> collector name. Examples for valid names: * MyCollector => my * FooBarCollector => foo_bar * FooBarBazCollector => foo_bar_baz :return: collector name in snake case :rtype: string """ class_name = re.sub(r'(?<=[a-z])[A-Z]|[A-Z](?=[^A-Z])', r'_\g<0>', self.__class__.__name__).lower().strip('_') class_name_parts = class_name.split('_')[0:-1] return '_'.join(class_name_parts)
[docs]class Collector(object): """Base class to load collectors. All collectors have to be placed inside the directory `collector`. You have to follow the naming convention: 1. Place the collector code in a <name>.py file (e.g. `my.py`) 2. Within the file <name>.py` a class <Name>Collector (e.g. `MyController`) needs to be defined. This is the main collector class which will be imported, instantiate and registered automatically. """ def __init__(self, config: CollectorConfig): """Instantiate an CollectorBase object.""" for c in config.collectors: try: collector_module = import_module("p3exporter.collector.{}".format(c), package=None) collector_class = getattr(collector_module, "{0}Collector".format(inflection.camelize(c))) collector = collector_class(config) REGISTRY.register(collector) logging.info("Collector '{0}' was loaded and registred successfully".format(c)) except ModuleNotFoundError as e: logging.warning("Collector '{0}' not loaded: {1}".format(c, e.msg)) except AttributeError as e: logging.warning("Collector '{0}' not loaded: {1}".format(c, e))