Module cevast.analysis

This package contains analytical functions and tools for quantitative analysis of certificate datasets.

Expand source code
"""This package contains analytical functions and tools for quantitative analysis of certificate datasets."""

__version__ = '1.1'
__author__ = 'Radim Podola'
__all__ = (
    'CertAnalyser',
    'ChainValidator',
)

from .cert_analyser import CertAnalyser
from .chain_validator import ChainValidator

Sub-modules

cevast.analysis.cert_analyser

This module contains CertAnalyser interface.

cevast.analysis.chain_validator

This module contains ChainValidator implementation of CertAnalyser interface.

cevast.analysis.cli

Group of CLI commands used for certificate analysis …

cevast.analysis.methods

This module provides access to certificate chain validation clients and other analytical modules …

cevast.analysis.modules

Classes

class CertAnalyser (output_file: str, processes: int, **kwargs)

An abstract CertAnalyser class an interface design that provides possibility to perform various analytical tasks with certificates. Design aims for using parallelism but can also be used as a single thread process. With such design one can use either multithreading or multiprocessing to run analytical tasks, depends what performance insufficiency is targeted (I/O vs CPU operating costs).

Class implements mandatory methods of context manager interface so can be (and is recommended) to use with with statement. This way one can be sure that pool is safely cleaned-up.

output_file is a file where results will be written, processes is maximum number of additional workers (threads or processes) [0=single thread], kwargs are optional key arguments special to concrete implementation.

TODO pass list of actual analytical methods that should be run?

Initialize CertAnalyser.

Expand source code
class CertAnalyser(ABC):
    """
    An abstract CertAnalyser class an interface design that provides possibility to perform
    various analytical tasks with certificates. Design aims for using parallelism but can also be
    used as a single thread process. With such design one can use either multithreading or
    multiprocessing to run analytical tasks, depends what performance insufficiency is targeted
    (I/O vs CPU operating costs).

    Class implements mandatory methods of context manager interface so can be (and is recommended)
    to use with `with` statement. This way one can be sure that pool is safely cleaned-up.

    `output_file` is a file where results will be written,
    `processes` is maximum number of additional workers (threads or processes) [0=single thread],
    `kwargs` are optional key arguments special to concrete implementation.

    TODO pass list of actual analytical methods that should be run?
    """

    def __init__(self, output_file: str, processes: int, **kwargs):
        """Initialize CertAnalyser."""

    @abstractmethod
    def schedule(self, host: str, chain: List[str]) -> None:
        """Enqueue host and certificate chain for analysis."""

    @abstractmethod
    def done(self) -> None:
        """
        Indicate that no more data will be scheduled to analyse and clean up context.
        If multithreading/multiprocessing is used, this function will close-up the pool and wait
        for all workers to finish.
        """

    @abstractmethod
    def __enter__(self):
        """Return self."""

    @abstractmethod
    def __exit__(self, exc_type, exc_value, traceback):
        """Safely clean-up context."""

Ancestors

  • abc.ABC

Subclasses

Methods

def done(self) ‑> NoneType

Indicate that no more data will be scheduled to analyse and clean up context. If multithreading/multiprocessing is used, this function will close-up the pool and wait for all workers to finish.

Expand source code
@abstractmethod
def done(self) -> None:
    """
    Indicate that no more data will be scheduled to analyse and clean up context.
    If multithreading/multiprocessing is used, this function will close-up the pool and wait
    for all workers to finish.
    """
def schedule(self, host: str, chain: List[str]) ‑> NoneType

Enqueue host and certificate chain for analysis.

Expand source code
@abstractmethod
def schedule(self, host: str, chain: List[str]) -> None:
    """Enqueue host and certificate chain for analysis."""
class ChainValidator (output_file: str, processes: int, **kwargs)

CertAnalyser implementation that validates certificate chains. Validation function accepts host name and list of certificate IDs (fingerprints). Those certificates are searched in provided CertDB.

Result is stored as CSV file in following format:

Hint

Such format can be easily analyzed. E.g. to count number of each error code one could use: awk -F "","" '{print $2}' cevast_repo/RAPID/VALIDATED/20200616_12443.csv | sort | uniq -c

Special key arguments: [mandatory] certdb is an instance of CertDB, where the certificates will be taken from. [optional] export_dir is a directory that will be used for temporary operations with certificates. Directory will be clean-up upon calling done. [optional] cevast.analysis.methods is a list with validation methods to use.

Initialize CertAnalyser.

Expand source code
class ChainValidator(CertAnalyser):
    """
    CertAnalyser implementation that validates certificate chains. Validation function
    accepts host name and list of certificate IDs (fingerprints). Those certificates are
    searched in provided CertDB.

    Result is stored as CSV file in following format:
    {host, validation method 1, validation method 2, validation method N, chain}
    .. hint::
       Such format can be easily analyzed. E.g. to count number of each error code one could use:
       awk -F "\"*,\"*" '{print $2}' cevast_repo/RAPID/VALIDATED/20200616_12443.csv | sort | uniq -c

    Special key arguments:
    [mandatory] `certdb` is an instance of CertDB, where the certificates will be taken from.
    [optional] `export_dir` is a directory that will be used for temporary operations
        with certificates. Directory will be clean-up upon calling `done`.
    [optional] `methods` is a list with validation methods to use.
    """

    def __init__(self, output_file: str, processes: int, **kwargs):
        # Init common arguments
        self.__single = processes == 0
        self.__out = open(output_file + '.csv', 'w')

        # Init validation methods
        self.__methods = kwargs.get('methods', None)
        if self.__methods is None:
            methods = get_all()
        else:
            methods = [get(name) for name in self.__methods]
        if not methods:
            raise ValueError("No validation methods are available -> nothing to do")
        # write validation header
        self.__out.write("{},{},{}\n".format('HOST', ",".join(show()), "CHAIN"))

        # Init special arguments
        self.__certdb: CertDB = kwargs.get('certdb', None)
        if self.__certdb is None:
            raise ValueError('Mandatory certdb argument must be provided withing kwargs.')
        self.__export_dir = kwargs.get('export_dir', None)
        if self.__export_dir is None:
            self.__export_dir = './tmp_chain_validator/'
            os.makedirs(self.__export_dir, exist_ok=True)
            self.__cleanup_export_dir = True
        else:
            self.__cleanup_export_dir = False
        self.__reference_date: datetime.date = kwargs.get('reference_date', None)
        if self.__reference_date is None:
            raise ValueError('Mandatory reference_date argument must be provided withing kwargs.')
        log.info("Reference date: {0}, ({1})".format(self.__reference_date, int(self.__reference_date.strftime("%s"))))

        self.__lock = multiprocessing.Lock()

        # Initialize pool and workers
        if not self.__single:
            self.__pool = multiprocessing.Pool(processes,
                                               initializer=ChainValidator.__init_worker,
                                               initargs=(self.__certdb,
                                                         self.__export_dir,
                                                         methods,
                                                         self.__reference_date,
                                                         self.__lock,
                                                         True))
        else:
            ChainValidator.__init_worker(self.__certdb, self.__export_dir, methods, self.__reference_date, self.__lock)

        log.info("ChainValidator created: output_file=%s, processes=%d", output_file, processes)

    @staticmethod
    def __init_worker(certdb: CertDB, tmp_dir: str, methods: list, reference_date: datetime.date,
                      lock: multiprocessing.Lock, ignore_sigint: bool = False):
        """Create and initialize global variables used in validate method. {Not nice, but working well
        with multiprocessing pool -> sharing instance of CertDB - object is not copied because of copy-on-write fork()}
        """
        global WORKER_CERTDB
        global WORKER_TMP_DIR
        global VALIDATION_METHODS
        global REFERENCE_DATE
        global LOCK
        WORKER_CERTDB = certdb
        WORKER_TMP_DIR = tmp_dir
        VALIDATION_METHODS = methods
        REFERENCE_DATE = reference_date
        LOCK = lock
        if ignore_sigint:
            # let worker processes ignore SIGINT, parent will cleanup pool via teminate()
            signal.signal(signal.SIGINT, signal.SIG_IGN)

    def schedule(self, host: str, chain: List[str]) -> None:
        if self.__single:
            self.__out.write(ChainValidator._validate(host, chain))
        else:
            self.__pool.apply_async(ChainValidator._validate, args=(host, chain), callback=self.__out.write)

    def done(self) -> None:
        # Wait for workers to finish
        if not self.__single:
            self.__pool.close()
            self.__pool.join()
        # Close output file
        self.__out.flush()
        self.__out.close()
        # Clean up own export dir
        if self.__cleanup_export_dir:
            shutil.rmtree(self.__export_dir)

    @staticmethod
    def _validate(host: str, chain: List[str]) -> str:
        """
        Validation function of single validation task. Return formatted result.
        `host` is host name,
        `chain` is list of certificate IDs forming SSL Certificate Chain (starting with server certificate).
        """
        result = []
        pems = []

        # check if already exported first
        LOCK.acquire()
        try:
            for cert in chain:
                # TODO make some structure to not overload single directory
                path = WORKER_TMP_DIR + make_PEM_filename(cert)
                if not os.path.exists(path):
                    try:
                        path = WORKER_CERTDB.export(cert, WORKER_TMP_DIR, False)
                    except CertNotAvailableError:
                        log.info("HOST <%s> has broken chain", host)
                        return ""
                pems.append(path)
        finally:
            LOCK.release()

        validation_method_arguments = {"reference_time": int(REFERENCE_DATE.strftime("%s"))}

        # Call validation methods
        for method in VALIDATION_METHODS:
            result.append("|".join([str(item).replace(",", ";") for item in method(pems, **validation_method_arguments)]))

        return "{},{},{}\n".format(host, ",".join(result), ",".join(chain))

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        if not self.__single:
            self.__pool.terminate()
        self.__out.close()

Ancestors

Inherited members