Module cevast.dataset.managers

This package provides certificate dataset managers.

Import and add your DatasetManager implementation into all for identification by DatasetManagerFactory.

Expand source code
"""
This package provides certificate dataset managers.

Import and add your DatasetManager implementation into __all__ for identification by DatasetManagerFactory.
"""

__all__ = ['DatasetManager', 'DatasetManagerTask', 'RapidDatasetManager']
__author__ = 'Radim Podola'

from .manager import DatasetManager, DatasetManagerTask
from .rapid import RapidDatasetManager

Sub-modules

cevast.dataset.managers.manager

This module contains DatasetManager interface.

cevast.dataset.managers.rapid

This module contains DatasetManager interface implementation of RAPID dataset source.

Classes

class DatasetManager (repository: str, date:  = datetime.date(2021, 5, 4), ports: Tuple[str] = ('443',), cpu_cores: int = 1)

An abstract DatasetManager class representing an interface that can be used to perform various tasks with a certificate dataset.

For Manager to perform a task, a repository path and date must be provided. Date works as an identifier of the dataset even though the date don't need to match exactly - the newest dataset by that date is identified. Additionally a port number might be used to more specify the dataset.

DatasetManager offers performing tasks independently or running a series of tasks at once by run method (usefull for performing tasks that would be rather complex and/or long-lasting running separatelly). Running a series might also be more optimized.

Initialize Manager. repository is dataset repository, 'date' is date, 'ports' is list of ports more specifying datasets, 'cpu_cores' is maximum number of CPU cores that might be used.

Expand source code
class DatasetManager(ABC):
    """
    An abstract DatasetManager class representing an interface that can be used to perform
    various tasks with a certificate dataset.

    For Manager to perform a task, a repository path and date must be provided. Date works
    as an identifier of the dataset even though the date don't need to match exactly
     - the newest dataset by that date is identified. Additionally a port number might be
    used to more specify the dataset.

    DatasetManager offers performing tasks independently or running a series of tasks at once
    by `run` method (usefull for performing tasks that would be rather complex and/or long-lasting
    running separatelly). Running a series might also be more optimized.
    """

    @property
    @abstractclassmethod
    def dataset_source(cls) -> DatasetSource:
        """
        Dataset source property used to identify a manager specification.
        """

    @abstractmethod
    def __init__(self, repository: str, date: datetime.date = datetime.today().date(),
                 ports: Tuple[str] = ('443',), cpu_cores: int = 1):
        """
        Initialize Manager.
        `repository` is dataset repository,
        'date' is date,
        'ports' is list of ports more specifying datasets,
        'cpu_cores' is maximum number of CPU cores that might be used.
        """

    @abstractmethod
    def run(self, task_pipline: Tuple[Tuple[DatasetManagerTask, dict]]) -> None:
        """
        Run a series of tasks.
        `task_pipline` is tuple composed of the required tasks in form of pairs ('task', 'cfg'), where:
            - 'task' is supported DatasetManagerTask,
            - 'cfg' is dictionary filled of parameters that will be passed to individual task methods.
        Caller function must ensure that 'cfg' parameters match task method's declaration.
        TODO make cfg dict optional
        """

    @abstractmethod
    def collect(self, api_key: str = None) -> Tuple[Dataset]:
        """
        Collect a dataset.
        `api_key` is API access key that might be needed to retrieve datasets (depends on source implementation).
        Return tuple of collected Datasets.
        """

    @abstractmethod
    def filter(self, methods: list = None) -> Tuple[Dataset]:
        """
        Filter a dataset with given methods.
        Return tuple of filtered Datasets.
        """

    @abstractmethod
    def unify(self, certdb: CertDB) -> Tuple[Dataset]:
        """
        Unify a dataset.
        `certdb` is CertDB instance to work with (to insert parsed certificates to).
        Return tuple of unified Datasets.
        """

    @abstractmethod
    def analyse(self, analyser: Type[CertAnalyser], analyser_cfg: dict) -> Tuple[Dataset]:
        """
        Analyse a dataset with given analyser.
        `analyser` is a CertAnalyser class,
        `analyser_cfg` is a dictionary with analyser paramaters that will be passed to the initializer.

        Call to analyser is performed like this: analyser(cert_chain, **analyser_cfg).

        Return tuple of analysed Datasets.
        """

Ancestors

  • abc.ABC

Subclasses

Instance variables

var dataset_source

A decorator indicating abstract classmethods.

Similar to abstractmethod.

Usage

class C(metaclass=ABCMeta): @abstractclassmethod def my_abstract_classmethod(cls, …): …

'abstractclassmethod' is deprecated. Use 'classmethod' with 'abstractmethod' instead.

Methods

def analyse(self, analyser: Type[CertAnalyser], analyser_cfg: dict) ‑> Tuple[Dataset]

Analyse a dataset with given analyser. analyser is a CertAnalyser class, analyser_cfg is a dictionary with analyser paramaters that will be passed to the initializer.

Call to analyser is performed like this: analyser(cert_chain, **analyser_cfg).

Return tuple of analysed Datasets.

Expand source code
@abstractmethod
def analyse(self, analyser: Type[CertAnalyser], analyser_cfg: dict) -> Tuple[Dataset]:
    """
    Analyse a dataset with given analyser.
    `analyser` is a CertAnalyser class,
    `analyser_cfg` is a dictionary with analyser paramaters that will be passed to the initializer.

    Call to analyser is performed like this: analyser(cert_chain, **analyser_cfg).

    Return tuple of analysed Datasets.
    """
def collect(self, api_key: str = None) ‑> Tuple[Dataset]

Collect a dataset. api_key is API access key that might be needed to retrieve datasets (depends on source implementation). Return tuple of collected Datasets.

Expand source code
@abstractmethod
def collect(self, api_key: str = None) -> Tuple[Dataset]:
    """
    Collect a dataset.
    `api_key` is API access key that might be needed to retrieve datasets (depends on source implementation).
    Return tuple of collected Datasets.
    """
def filter(self, methods: list = None) ‑> Tuple[Dataset]

Filter a dataset with given methods. Return tuple of filtered Datasets.

Expand source code
@abstractmethod
def filter(self, methods: list = None) -> Tuple[Dataset]:
    """
    Filter a dataset with given methods.
    Return tuple of filtered Datasets.
    """
def run(self, task_pipline: Tuple[Tuple[DatasetManagerTask, dict]]) ‑> NoneType

Run a series of tasks. task_pipline is tuple composed of the required tasks in form of pairs ('task', 'cfg'), where: - 'task' is supported DatasetManagerTask, - 'cfg' is dictionary filled of parameters that will be passed to individual task methods. Caller function must ensure that 'cfg' parameters match task method's declaration. TODO make cfg dict optional

Expand source code
@abstractmethod
def run(self, task_pipline: Tuple[Tuple[DatasetManagerTask, dict]]) -> None:
    """
    Run a series of tasks.
    `task_pipline` is tuple composed of the required tasks in form of pairs ('task', 'cfg'), where:
        - 'task' is supported DatasetManagerTask,
        - 'cfg' is dictionary filled of parameters that will be passed to individual task methods.
    Caller function must ensure that 'cfg' parameters match task method's declaration.
    TODO make cfg dict optional
    """
def unify(self, certdb: CertDB) ‑> Tuple[Dataset]

Unify a dataset. certdb is CertDB instance to work with (to insert parsed certificates to). Return tuple of unified Datasets.

Expand source code
@abstractmethod
def unify(self, certdb: CertDB) -> Tuple[Dataset]:
    """
    Unify a dataset.
    `certdb` is CertDB instance to work with (to insert parsed certificates to).
    Return tuple of unified Datasets.
    """
class DatasetManagerTask (value, names=None, *, module=None, qualname=None, type=None, start=1)

Enumeration of DatasetManager Tasks

Expand source code
class DatasetManagerTask(IntEnum):
    """Enumeration of DatasetManager Tasks"""

    COLLECT = 1
    FILTER = 2
    UNIFY = 3
    ANALYSE = 4

    @classmethod
    def validate(cls, state: Union['DatasetManagerTask', str]) -> bool:
        """Validate DatasetManagerTask."""
        if isinstance(state, cls):
            return state in cls
        if isinstance(state, str):
            return state in cls.__members__
        return False

    def __str__(self):
        return str(self.name)

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var ANALYSE
var COLLECT
var FILTER
var UNIFY

Static methods

def validate(state: Union[_ForwardRef('DatasetManagerTask'), str]) ‑> bool

Validate DatasetManagerTask.

Expand source code
@classmethod
def validate(cls, state: Union['DatasetManagerTask', str]) -> bool:
    """Validate DatasetManagerTask."""
    if isinstance(state, cls):
        return state in cls
    if isinstance(state, str):
        return state in cls.__members__
    return False
class RapidDatasetManager (repository: str, date:  = datetime.date(2021, 5, 4), ports: Tuple[str] = ('443',), cpu_cores: int = 1)

DatasetManager interface implementation of RAPID dataset source.

Initialize Manager. repository is dataset repository, 'date' is date, 'ports' is list of ports more specifying datasets, 'cpu_cores' is maximum number of CPU cores that might be used.

Expand source code
class RapidDatasetManager(DatasetManager):
    """DatasetManager interface implementation of RAPID dataset source."""

    _CERT_NAME_SUFFIX = 'certs'
    _HOSTS_NAME_SUFFIX = 'hosts'
    _CHAINS_NAME_SUFFIX = 'chains'
    _BROKEN_CHAINS_NAME_SUFFIX = 'broken_chains'

    dataset_source = DatasetSource.RAPID.name

    # TODO add date range
    # TODO make ports optional
    # TODO support port range in single validate or unify tasks
    def __init__(self, repository: str, date: datetime.date = datetime.today().date(),
                 ports: Tuple[str] = ('443',), cpu_cores: int = 1):
        self._repository = repository
        self._date = date
        self._ports = (ports,) if isinstance(ports, str) else ports
        self._cpu_cores = cpu_cores
        self.__date_id = date.strftime('%Y%m%d')
        log.info('RapidDatasetManager initialized with repository=%s, date=%s, ports=%s', repository, date, ports)

    def run(self, task_pipline: Tuple[Tuple[DatasetManagerTask, dict]]) -> None:
        collected_datasets, unified_datasets, analysed_datasets = None, None, None
        # Sort just to ensure valid sequence
        task_pipline = sorted(task_pipline, key=lambda x: x[0])
        log.info('Started with task pipeline %s', task_pipline)
        try:
            # Run tasks
            for task_item in task_pipline:
                task, params = task_item
                log.info('Run task %s with parameters: %s', task, params)
                cli_log.info('Running TASK %s', task)
                # Runs collection TASK, collected datasets might be used in next task
                if task == DatasetManagerTask.COLLECT:
                    collected_datasets = self.collect(**params)
                    log.info("Collected datasets: %s", collected_datasets)

                # Runs filtering TASK
                elif task == DatasetManagerTask.FILTER:
                    raise NotImplementedError  # Not implemented yet

                # Runs unifying TASK, unified datasets might be used in next task
                elif task == DatasetManagerTask.UNIFY:
                    if collected_datasets:  # If some datasets were just collected in pipeline, use these
                        unified_datasets = self.__unify(datasets=collected_datasets, **params)
                    else:
                        unified_datasets = self.unify(**params)
                    log.info("unified datasets: %s", unified_datasets)

                # Runs analytical TASK
                elif task == DatasetManagerTask.ANALYSE:
                    if unified_datasets:
                        analysed_datasets = self.__analyse(datasets=unified_datasets, **params)
                    else:  # If some datasets were just unified in pipeline, use these
                        analysed_datasets = self.analyse(**params)
                    log.info("Analysed datasets: %s", analysed_datasets)
        except TypeError:
            log.exception("Error when running task pipeline, are the arguments set correctly?")
        log.info("Finished")
        return collected_datasets, unified_datasets, analysed_datasets

    def collect(self, api_key: str = None) -> Tuple[Dataset]:
        log.info('Collecting started')
        cli_log.info('Collecting started')
        collector = RapidCollector(api_key)
        # Create dummy dataset only to get target dir
        dummy_dataset = Dataset(self._repository, self.dataset_source, self.__date_id, None)
        download_dir = dummy_dataset.path(DatasetState.COLLECTED)
        # Collect datasets
        collected = collector.collect(
            download_dir=download_dir, date=self._date, filter_ports=self._ports, filter_types=('hosts', 'certs')
        )
        # Remove duplicates (same datasets with e.g. different suffix)
        datasets = tuple(OrderedDict.fromkeys(map(Dataset.from_full_path, collected)))
        log.info('%d dataset were downloaded', len(datasets))
        log.info('Collecting finished')
        cli_log.info('Collecting finished')
        return datasets if datasets else None

    def filter(self, methods: list = None) -> str:
        raise NotImplementedError

    def unify(self, certdb: CertDB) -> Tuple[Dataset]:
        datasets = self.__init_datasets()
        # Unify datasets
        unified = self.__unify(certdb=certdb, datasets=datasets)
        return unified if unified else None

    def analyse(self, analyser: CertAnalyser, analyser_cfg: dict) -> Tuple[Dataset]:
        datasets = self.__init_datasets()
        # Analyse datasets
        analysed = self.__analyse(datasets=datasets, analyser=analyser, analyser_cfg=analyser_cfg)
        return analysed if analysed else None

    def __init_datasets(self) -> Tuple[Dataset]:
        return tuple(Dataset(self._repository, self.dataset_source, self.__date_id, port) for port in self._ports)

    def __init_unifier(self, dataset: Dataset) -> RapidUnifier:
        certs_file = dataset.full_path(DatasetState.COLLECTED, self._CERT_NAME_SUFFIX, True)
        hosts_file = dataset.full_path(DatasetState.COLLECTED, self._HOSTS_NAME_SUFFIX, True)
        if certs_file and hosts_file:
            chain_file = dataset.full_path(DatasetState.UNIFIED, self._CHAINS_NAME_SUFFIX, physically=True)
            broken_file = dataset.full_path(DatasetState.UNIFIED, self._BROKEN_CHAINS_NAME_SUFFIX, physically=True)
            try:
                unifier = RapidUnifier(certs_file, hosts_file, chain_file, broken_file)
                log.info("Will unify dataset: %s", dataset.static_filename)
                return unifier
            except FileNotFoundError:
                log.exception("Collected dataset not found")
        return None

    def __unify(self, certdb: CertDB, datasets: Tuple[Dataset], store_log: bool = True) -> Tuple[Dataset]:
        log.info('Unifying started')
        cli_log.info('Unifying started')
        # First validate datasets and init unifiers
        unifyable, unifiers = [], []
        for dataset in datasets:
            unifier = self.__init_unifier(dataset)
            if unifier is not None:
                unifiers.append(unifier)
                unifyable.append(dataset)
        # Parse and store certificates
        for unifier in unifiers:
            try:
                cli_log.info('Parsing certificates from Dataset file <%s>', unifier.certs_dataset)
                unifier.store_certs(certdb)
            except (OSError, ValueError):
                log.exception("Error during certs dataset parsing -> rollback")
                certdb.rollback()
                raise DatasetUnificationError("Error during certs dataset parsing")
        # Now parse and store chains
        for unifier in unifiers:
            try:
                cli_log.info('Parsing host chains from Dataset file <%s>', unifier.certs_dataset)
                unifier.store_chains(certdb)
                if store_log:
                    # Store dataset unification log
                    unifier.save_unification_log(os.path.splitext(unifier.chain_file)[0] + '.log')
            except OSError:
                log.exception("Error during hosts dataset parsing -> commit")
                certdb.commit()
                raise DatasetUnificationError("Error during hosts dataset parsing")
        # Remove unified datasets
        # for dataset in unifyable:
        #    dataset.delete(DatasetState.COLLECTED)
        log.info('Unifying finished')
        cli_log.info('Unifying finished')
        return tuple(unifyable) if unifyable else None

    def __analyse(self, datasets: Tuple[Dataset], analyser: Type[CertAnalyser], analyser_cfg: dict) -> Tuple[Dataset]:
        log.info('Analysis started')
        cli_log.info('Analysis started')
        analysable = []

        for dataset in datasets:
            chain_file = dataset.full_path(DatasetState.UNIFIED, self._CHAINS_NAME_SUFFIX, True)
            if chain_file:
                analysable.append(dataset)
                filename = os.path.join(dataset.path(DatasetState.ANALYSED), dataset.static_filename)
                # Open CertAnalyser as context manager
                with analyser(output_file=filename, processes=self._cpu_cores, **analyser_cfg) as analyser_ctx:
                    log.info("Will analyse dataset: %s", dataset.static_filename)
                    for host, chain in RapidUnifier.read_chains(chain_file):
                        analyser_ctx.schedule(host, chain)
                    # Indicate that no more data for analysis will be scheduled
                    analyser_ctx.done()
                    log.info("Dataset analysis finished")

        log.info('Analysis finished')
        cli_log.info('Analysis finished')
        return tuple(analysable) if analysable else None

Ancestors

Inherited members