Module cevast.dataset

This package is a collection of tools for working with certificate datasets.

Expand source code
"""This package is a collection of tools for working with certificate datasets."""

__all__ = [
    'DatasetSource',
    'DatasetState',
    'Dataset',
    'DatasetRepository',
    'DatasetCollectionError',
    'DatasetInvalidError',
    'DatasetUnificationError',
    'DatasetManagerFactory',
    'DatasetManager',
    'DatasetManagerTask',
]
__version__ = '1.1'
__author__ = 'Radim Podola'

from .dataset import (
    DatasetSource,
    DatasetState,
    Dataset,
    DatasetRepository,
    DatasetCollectionError,
    DatasetInvalidError,
    DatasetUnificationError,
)
from .manager_factory import DatasetManagerFactory
from .managers import DatasetManagerTask, DatasetManager

Sub-modules

cevast.dataset.cli

Group of CLI commands used for Dataset management and tasks …

cevast.dataset.collectors

This package provides tools for collecting certificate datasets.

cevast.dataset.dataset

This module contains structures and classes logically related to a certificate datasets.

cevast.dataset.filters

This package provides tools for certificate datasets filtering.

cevast.dataset.manager_factory

This module contains DatasetManager factory implementation.

cevast.dataset.managers

This package provides certificate dataset managers …

cevast.dataset.unifiers

This package provides tools for unification of certificate datasets.

Classes

class Dataset (repository: str, source: Union[DatasetSource, str], date_id: str, port: Union[str, int], extension: str = 'gz')

Class representing a single dataset and providing an interface to the dataset on the filesystem.

Dataset is identified by source, state and filename where filename consists of mandatory date_id, port number and optional suffix. date_id represents the date (or date range) when the dataset was created (certificates were collected and added to the dataset) and its string in fomat "YYYYMMDD", port is application port on which was the data collceted, suffix can specify the dataset in various ways and is used to distinguish the files internally. source, date_id and port are static identifiers provided upon object initialization.

The dataset state is dynamic identifier that is the last part of its complete identification at the time. Each dataset can be found at 1-N of the following generalized states: - COLLECTED - FILTERED - UNIFIED - ANALYSED

Full Dataset path template: {repository}/{source}/{state}/{date_id}[_{port}][_suffix].{extension}

Initialize the static identifiers

Expand source code
class Dataset:
    """
    Class representing a single dataset and providing an interface to the dataset on the filesystem.

    Dataset is identified by `source`, `state` and filename where filename consists of mandatory
    `date_id`, `port` number and optional suffix. date_id represents the date (or date range) when
    the dataset was created (certificates were collected and added to the dataset) and its string in fomat
    "YYYYMMDD", port is application port on which was the data collceted, suffix can specify the dataset
    in various ways and is used to distinguish the files internally. `source`, `date_id` and `port`
    are static identifiers provided upon object initialization.

    The dataset state is dynamic identifier that is the last part of its complete identification at the time.
    Each dataset can be found at 1-N of the following generalized states:
        - COLLECTED
        - FILTERED
        - UNIFIED
        - ANALYSED

    Full Dataset path template: {repository}/{source}/{state}/{date_id}[_{port}][_suffix].{extension}
    """

    def __init__(self, repository: str, source: Union[DatasetSource, str],
                 date_id: str, port: Union[str, int], extension: str = 'gz'):
        """Initialize the static identifiers"""
        # Validate and init dataset repository
        if not os.path.exists(repository):
            raise DatasetInvalidError("Repository %s not found" % repository)
        self._repository = os.path.abspath(repository)
        # Validate and init dataset source
        if not DatasetSource.validate(source):
            raise DatasetInvalidError("Dataset source %s is not valid." % source)
        self._source = str(source)
        self._date_id = date_id
        self._port = str(port) if port is not None else ''
        self._extension = extension

    @property
    def source(self) -> str:
        """Get the Dataset source."""
        return self._source

    @property
    def date(self) -> str:
        """Get the DATE ID."""
        return self._date_id

    @property
    def port(self) -> str:
        """Get the PORT."""
        return self._port

    @property
    def extension(self) -> str:
        """Get the extension."""
        return self._extension

    @property
    def static_filename(self) -> str:
        """Get the static part of dataset filename."""
        return Dataset.format_filename(self._date_id, self._port)

    @classmethod
    def from_full_path(cls, path: str) -> 'Dataset':
        """
        Initialize Dataset object from the given path,
        or return None if object cannot be initialized.
        """
        template = r"^(?P<repo>\S+)[/\\](?P<source>\S+)[/\\](?P<state>\S+)[/\\](?P<date>\d{8})(_(?P<port>\d+))?(_\S+)?\.(?P<ext>\S+)$"
        match = re.match(template, path)
        if not match:
            return None
        try:
            return cls(repository=match.group('repo'),
                       source=match.group('source'),
                       date_id=match.group('date'),
                       port=match.group('port'),
                       extension=match.group('ext'),
                       )
        except DatasetInvalidError:
            log.exception("Cannot initialize Dataset class from the given path.")
            return None

    @staticmethod
    def format_filename(date: str, port: str = '', suffix: str = '') -> str:
        """Format dataset filename."""
        if port and suffix:
            return "{}_{}_{}".format(date, port, suffix)
        if port or suffix:
            return "{}_{}".format(date, port or suffix)
        return date

    def path(self, state: Union[DatasetState, str], physically: bool = True) -> str:
        """
        Assemble and return path to the dataset in given state.
        If `physically` flag is set and and path does not exist, create it.
        """
        # Validate dataset state
        if not DatasetState.validate(state):
            raise DatasetInvalidError("Dataset state %s is not valid." % state)

        path = os.path.join(self._repository, self._source, str(state))

        if physically and not os.path.exists(path):
            log.info("Path <%s> does not exist yet, will be created.", path)
            os.makedirs(path)
        return path

    def full_path(self, state: Union[DatasetState, str], suffix: str = '',
                  check_if_exists: bool = False, physically: bool = False) -> str:
        """
        Assemble and return full path to the dataset file in given state including custome suffix.
        Return None if `check_if_exists` and file does not exist.
        """
        filename = Dataset.format_filename(self._date_id, self._port, suffix)
        path = os.path.join(self.path(state, physically), filename + '.' + self._extension)
        if check_if_exists and not os.path.exists(path):
            return None
        return path

    def delete(self, state: Union[DatasetState, str]) -> None:
        """Delete the dataset of given state from the repository."""
        path = self.path(state, False)
        if not os.path.exists(path):
            return
        for file in directory_with_prefix(path, Dataset.format_filename(self._date_id, self._port)):
            log.debug("Will delete dataset <%s>.", file)
            os.remove(file)
        if not os.listdir(path):
            log.info("No more datasets in state <%s>, directory will be deleted.", state)
            os.rmdir(path)

    def purge(self) -> None:
        """Delete all datasets of specified source from the repository."""
        shutil.rmtree(os.path.join(self._repository, self._source), ignore_errors=True)

    def get(self, state: Union[DatasetState, str], suffix: str = '', full_path: bool = False) -> Tuple[str]:
        """Return all datasets stored in the dataset repository matching the paramaters."""
        filename = Dataset.format_filename(self._date_id, self._port, suffix)
        path = self.path(state, False)
        return tuple(directory_with_prefix(path, filename, not full_path))

    def exists(self, state: Union[DatasetState, str]) -> bool:
        """Test if the dataset exists in given state."""
        path = self.path(state, False)
        for _ in directory_with_prefix(path, Dataset.format_filename(self._date_id, self._port)):
            return True
        return False

    def exists_any(self) -> bool:
        """Test if any dataset (of the specified type) exists."""
        for state in DatasetState:
            if self.exists(state):
                return True
        return False

    def move(self, state: Union[DatasetState, str], source: str, format_name: bool = True) -> None:
        """
        Move the source file to the repository of the dataset of given state.
        If `format_name` is true, then name is formatted.
        """
        if os.path.exists(source):
            path = self.path(state)
            filename = os.path.basename(source)
            if format_name:
                filename = Dataset.format_filename(self._date_id, self._port, filename)
            shutil.move(os.path.abspath(source), os.path.join(path, filename))

    def __str__(self):
        return os.path.join(self._repository, self._source, "{}", Dataset.format_filename(self._date_id, self._port))

    def __repr__(self):
        return "<%s.%s source=%s, date_id=%s, port=%s>" % (
            self.__class__.__module__,
            self.__class__.__qualname__,
            self._source,
            self._date_id,
            self._port,
        )

    def __eq__(self, other):
        if not isinstance(other, Dataset):
            # don't attempt to compare against unrelated types
            return NotImplemented

        return self._port == other.port and self._date_id == other.date and self._source == other.source

    def __hash__(self):
        return hash((self._port, self._date_id, self._source))

Static methods

def format_filename(date: str, port: str = '', suffix: str = '') ‑> str

Format dataset filename.

Expand source code
@staticmethod
def format_filename(date: str, port: str = '', suffix: str = '') -> str:
    """Format dataset filename."""
    if port and suffix:
        return "{}_{}_{}".format(date, port, suffix)
    if port or suffix:
        return "{}_{}".format(date, port or suffix)
    return date
def from_full_path(path: str) ‑> Dataset

Initialize Dataset object from the given path, or return None if object cannot be initialized.

Expand source code
@classmethod
def from_full_path(cls, path: str) -> 'Dataset':
    """
    Initialize Dataset object from the given path,
    or return None if object cannot be initialized.
    """
    template = r"^(?P<repo>\S+)[/\\](?P<source>\S+)[/\\](?P<state>\S+)[/\\](?P<date>\d{8})(_(?P<port>\d+))?(_\S+)?\.(?P<ext>\S+)$"
    match = re.match(template, path)
    if not match:
        return None
    try:
        return cls(repository=match.group('repo'),
                   source=match.group('source'),
                   date_id=match.group('date'),
                   port=match.group('port'),
                   extension=match.group('ext'),
                   )
    except DatasetInvalidError:
        log.exception("Cannot initialize Dataset class from the given path.")
        return None

Instance variables

var date : str

Get the DATE ID.

Expand source code
@property
def date(self) -> str:
    """Get the DATE ID."""
    return self._date_id
var extension : str

Get the extension.

Expand source code
@property
def extension(self) -> str:
    """Get the extension."""
    return self._extension
var port : str

Get the PORT.

Expand source code
@property
def port(self) -> str:
    """Get the PORT."""
    return self._port
var source : str

Get the Dataset source.

Expand source code
@property
def source(self) -> str:
    """Get the Dataset source."""
    return self._source
var static_filename : str

Get the static part of dataset filename.

Expand source code
@property
def static_filename(self) -> str:
    """Get the static part of dataset filename."""
    return Dataset.format_filename(self._date_id, self._port)

Methods

def delete(self, state: Union[DatasetState, str]) ‑> NoneType

Delete the dataset of given state from the repository.

Expand source code
def delete(self, state: Union[DatasetState, str]) -> None:
    """Delete the dataset of given state from the repository."""
    path = self.path(state, False)
    if not os.path.exists(path):
        return
    for file in directory_with_prefix(path, Dataset.format_filename(self._date_id, self._port)):
        log.debug("Will delete dataset <%s>.", file)
        os.remove(file)
    if not os.listdir(path):
        log.info("No more datasets in state <%s>, directory will be deleted.", state)
        os.rmdir(path)
def exists(self, state: Union[DatasetState, str]) ‑> bool

Test if the dataset exists in given state.

Expand source code
def exists(self, state: Union[DatasetState, str]) -> bool:
    """Test if the dataset exists in given state."""
    path = self.path(state, False)
    for _ in directory_with_prefix(path, Dataset.format_filename(self._date_id, self._port)):
        return True
    return False
def exists_any(self) ‑> bool

Test if any dataset (of the specified type) exists.

Expand source code
def exists_any(self) -> bool:
    """Test if any dataset (of the specified type) exists."""
    for state in DatasetState:
        if self.exists(state):
            return True
    return False
def full_path(self, state: Union[DatasetState, str], suffix: str = '', check_if_exists: bool = False, physically: bool = False) ‑> str

Assemble and return full path to the dataset file in given state including custome suffix. Return None if check_if_exists and file does not exist.

Expand source code
def full_path(self, state: Union[DatasetState, str], suffix: str = '',
              check_if_exists: bool = False, physically: bool = False) -> str:
    """
    Assemble and return full path to the dataset file in given state including custome suffix.
    Return None if `check_if_exists` and file does not exist.
    """
    filename = Dataset.format_filename(self._date_id, self._port, suffix)
    path = os.path.join(self.path(state, physically), filename + '.' + self._extension)
    if check_if_exists and not os.path.exists(path):
        return None
    return path
def get(self, state: Union[DatasetState, str], suffix: str = '', full_path: bool = False) ‑> Tuple[str]

Return all datasets stored in the dataset repository matching the paramaters.

Expand source code
def get(self, state: Union[DatasetState, str], suffix: str = '', full_path: bool = False) -> Tuple[str]:
    """Return all datasets stored in the dataset repository matching the paramaters."""
    filename = Dataset.format_filename(self._date_id, self._port, suffix)
    path = self.path(state, False)
    return tuple(directory_with_prefix(path, filename, not full_path))
def move(self, state: Union[DatasetState, str], source: str, format_name: bool = True) ‑> NoneType

Move the source file to the repository of the dataset of given state. If format_name is true, then name is formatted.

Expand source code
def move(self, state: Union[DatasetState, str], source: str, format_name: bool = True) -> None:
    """
    Move the source file to the repository of the dataset of given state.
    If `format_name` is true, then name is formatted.
    """
    if os.path.exists(source):
        path = self.path(state)
        filename = os.path.basename(source)
        if format_name:
            filename = Dataset.format_filename(self._date_id, self._port, filename)
        shutil.move(os.path.abspath(source), os.path.join(path, filename))
def path(self, state: Union[DatasetState, str], physically: bool = True) ‑> str

Assemble and return path to the dataset in given state. If physically flag is set and and path does not exist, create it.

Expand source code
def path(self, state: Union[DatasetState, str], physically: bool = True) -> str:
    """
    Assemble and return path to the dataset in given state.
    If `physically` flag is set and and path does not exist, create it.
    """
    # Validate dataset state
    if not DatasetState.validate(state):
        raise DatasetInvalidError("Dataset state %s is not valid." % state)

    path = os.path.join(self._repository, self._source, str(state))

    if physically and not os.path.exists(path):
        log.info("Path <%s> does not exist yet, will be created.", path)
        os.makedirs(path)
    return path
def purge(self) ‑> NoneType

Delete all datasets of specified source from the repository.

Expand source code
def purge(self) -> None:
    """Delete all datasets of specified source from the repository."""
    shutil.rmtree(os.path.join(self._repository, self._source), ignore_errors=True)
class DatasetCollectionError (*args, **kwargs)

Raised when dataset collection fails.

Expand source code
class DatasetCollectionError(ValueError):
    """Raised when dataset collection fails."""

Ancestors

  • builtins.ValueError
  • builtins.Exception
  • builtins.BaseException
class DatasetInvalidError (*args, **kwargs)

Raised when the dataset has an invalid identifier.

Expand source code
class DatasetInvalidError(ValueError):
    """Raised when the dataset has an invalid identifier."""

Ancestors

  • builtins.ValueError
  • builtins.Exception
  • builtins.BaseException
class DatasetManager (repository: str, date:  = datetime.date(2021, 5, 4), ports: Tuple[str] = ('443',), cpu_cores: int = 1)

An abstract DatasetManager class representing an interface that can be used to perform various tasks with a certificate dataset.

For Manager to perform a task, a repository path and date must be provided. Date works as an identifier of the dataset even though the date don't need to match exactly - the newest dataset by that date is identified. Additionally a port number might be used to more specify the dataset.

DatasetManager offers performing tasks independently or running a series of tasks at once by run method (usefull for performing tasks that would be rather complex and/or long-lasting running separatelly). Running a series might also be more optimized.

Initialize Manager. repository is dataset repository, 'date' is date, 'ports' is list of ports more specifying datasets, 'cpu_cores' is maximum number of CPU cores that might be used.

Expand source code
class DatasetManager(ABC):
    """
    An abstract DatasetManager class representing an interface that can be used to perform
    various tasks with a certificate dataset.

    For Manager to perform a task, a repository path and date must be provided. Date works
    as an identifier of the dataset even though the date don't need to match exactly
     - the newest dataset by that date is identified. Additionally a port number might be
    used to more specify the dataset.

    DatasetManager offers performing tasks independently or running a series of tasks at once
    by `run` method (usefull for performing tasks that would be rather complex and/or long-lasting
    running separatelly). Running a series might also be more optimized.
    """

    @property
    @abstractclassmethod
    def dataset_source(cls) -> DatasetSource:
        """
        Dataset source property used to identify a manager specification.
        """

    @abstractmethod
    def __init__(self, repository: str, date: datetime.date = datetime.today().date(),
                 ports: Tuple[str] = ('443',), cpu_cores: int = 1):
        """
        Initialize Manager.
        `repository` is dataset repository,
        'date' is date,
        'ports' is list of ports more specifying datasets,
        'cpu_cores' is maximum number of CPU cores that might be used.
        """

    @abstractmethod
    def run(self, task_pipline: Tuple[Tuple[DatasetManagerTask, dict]]) -> None:
        """
        Run a series of tasks.
        `task_pipline` is tuple composed of the required tasks in form of pairs ('task', 'cfg'), where:
            - 'task' is supported DatasetManagerTask,
            - 'cfg' is dictionary filled of parameters that will be passed to individual task methods.
        Caller function must ensure that 'cfg' parameters match task method's declaration.
        TODO make cfg dict optional
        """

    @abstractmethod
    def collect(self, api_key: str = None) -> Tuple[Dataset]:
        """
        Collect a dataset.
        `api_key` is API access key that might be needed to retrieve datasets (depends on source implementation).
        Return tuple of collected Datasets.
        """

    @abstractmethod
    def filter(self, methods: list = None) -> Tuple[Dataset]:
        """
        Filter a dataset with given methods.
        Return tuple of filtered Datasets.
        """

    @abstractmethod
    def unify(self, certdb: CertDB) -> Tuple[Dataset]:
        """
        Unify a dataset.
        `certdb` is CertDB instance to work with (to insert parsed certificates to).
        Return tuple of unified Datasets.
        """

    @abstractmethod
    def analyse(self, analyser: Type[CertAnalyser], analyser_cfg: dict) -> Tuple[Dataset]:
        """
        Analyse a dataset with given analyser.
        `analyser` is a CertAnalyser class,
        `analyser_cfg` is a dictionary with analyser paramaters that will be passed to the initializer.

        Call to analyser is performed like this: analyser(cert_chain, **analyser_cfg).

        Return tuple of analysed Datasets.
        """

Ancestors

  • abc.ABC

Subclasses

Instance variables

var dataset_source

A decorator indicating abstract classmethods.

Similar to abstractmethod.

Usage

class C(metaclass=ABCMeta): @abstractclassmethod def my_abstract_classmethod(cls, …): …

'abstractclassmethod' is deprecated. Use 'classmethod' with 'abstractmethod' instead.

Methods

def analyse(self, analyser: Type[CertAnalyser], analyser_cfg: dict) ‑> Tuple[Dataset]

Analyse a dataset with given analyser. analyser is a CertAnalyser class, analyser_cfg is a dictionary with analyser paramaters that will be passed to the initializer.

Call to analyser is performed like this: analyser(cert_chain, **analyser_cfg).

Return tuple of analysed Datasets.

Expand source code
@abstractmethod
def analyse(self, analyser: Type[CertAnalyser], analyser_cfg: dict) -> Tuple[Dataset]:
    """
    Analyse a dataset with given analyser.
    `analyser` is a CertAnalyser class,
    `analyser_cfg` is a dictionary with analyser paramaters that will be passed to the initializer.

    Call to analyser is performed like this: analyser(cert_chain, **analyser_cfg).

    Return tuple of analysed Datasets.
    """
def collect(self, api_key: str = None) ‑> Tuple[Dataset]

Collect a dataset. api_key is API access key that might be needed to retrieve datasets (depends on source implementation). Return tuple of collected Datasets.

Expand source code
@abstractmethod
def collect(self, api_key: str = None) -> Tuple[Dataset]:
    """
    Collect a dataset.
    `api_key` is API access key that might be needed to retrieve datasets (depends on source implementation).
    Return tuple of collected Datasets.
    """
def filter(self, methods: list = None) ‑> Tuple[Dataset]

Filter a dataset with given methods. Return tuple of filtered Datasets.

Expand source code
@abstractmethod
def filter(self, methods: list = None) -> Tuple[Dataset]:
    """
    Filter a dataset with given methods.
    Return tuple of filtered Datasets.
    """
def run(self, task_pipline: Tuple[Tuple[DatasetManagerTask, dict]]) ‑> NoneType

Run a series of tasks. task_pipline is tuple composed of the required tasks in form of pairs ('task', 'cfg'), where: - 'task' is supported DatasetManagerTask, - 'cfg' is dictionary filled of parameters that will be passed to individual task methods. Caller function must ensure that 'cfg' parameters match task method's declaration. TODO make cfg dict optional

Expand source code
@abstractmethod
def run(self, task_pipline: Tuple[Tuple[DatasetManagerTask, dict]]) -> None:
    """
    Run a series of tasks.
    `task_pipline` is tuple composed of the required tasks in form of pairs ('task', 'cfg'), where:
        - 'task' is supported DatasetManagerTask,
        - 'cfg' is dictionary filled of parameters that will be passed to individual task methods.
    Caller function must ensure that 'cfg' parameters match task method's declaration.
    TODO make cfg dict optional
    """
def unify(self, certdb: CertDB) ‑> Tuple[Dataset]

Unify a dataset. certdb is CertDB instance to work with (to insert parsed certificates to). Return tuple of unified Datasets.

Expand source code
@abstractmethod
def unify(self, certdb: CertDB) -> Tuple[Dataset]:
    """
    Unify a dataset.
    `certdb` is CertDB instance to work with (to insert parsed certificates to).
    Return tuple of unified Datasets.
    """
class DatasetManagerFactory

Factory class providing the specific DatasetManager class based on DatasetSource.

Important: DatasetManager classes are registered automatically. To add a new

specialized DatasetManager class, it must inherit DatasetManager and be placed in "cevast.dataset.managers" package.

Expand source code
class DatasetManagerFactory:
    """
    Factory class providing the specific DatasetManager class based on DatasetSource.

    .. IMPORTANT:: DatasetManager classes are registered automatically. To add a new
    specialized DatasetManager class, it must inherit DatasetManager and be placed
    in "cevast.dataset.managers" package.
    """

    __classes = {}

    @classmethod
    def __load_classes(cls):
        """
        Automatically initialize lookup dictionary with subclasses of DatasetManager class
        that are visible to the Python interpreter (obtained from `type.__subclasses__()`).
        """
        for manager_class in managers.DatasetManager.__subclasses__():
            if hasattr(manager_class, 'dataset_source'):
                cls.__classes[str(manager_class.dataset_source)] = manager_class

    @classmethod
    def get_manager(cls, dataset_source: Union[DatasetSource, str]) -> Type[managers.DatasetManager]:
        """Return a corresponding DatasetManager class based on `dataset_source`."""
        if not cls.__classes:
            cls.__load_classes()

        # Validate and init dataset manager
        if not DatasetSource.validate(dataset_source):
            raise DatasetInvalidError("Dataset source %s is not valid." % dataset_source)
        if str(dataset_source) not in cls.__classes:
            raise DatasetInvalidError("Dataset source %s has no manager." % dataset_source)

        return cls.__classes[str(dataset_source)]

Static methods

def get_manager(dataset_source: Union[DatasetSource, str]) ‑> Type[DatasetManager]

Return a corresponding DatasetManager class based on dataset_source.

Expand source code
@classmethod
def get_manager(cls, dataset_source: Union[DatasetSource, str]) -> Type[managers.DatasetManager]:
    """Return a corresponding DatasetManager class based on `dataset_source`."""
    if not cls.__classes:
        cls.__load_classes()

    # Validate and init dataset manager
    if not DatasetSource.validate(dataset_source):
        raise DatasetInvalidError("Dataset source %s is not valid." % dataset_source)
    if str(dataset_source) not in cls.__classes:
        raise DatasetInvalidError("Dataset source %s has no manager." % dataset_source)

    return cls.__classes[str(dataset_source)]
class DatasetManagerTask (value, names=None, *, module=None, qualname=None, type=None, start=1)

Enumeration of DatasetManager Tasks

Expand source code
class DatasetManagerTask(IntEnum):
    """Enumeration of DatasetManager Tasks"""

    COLLECT = 1
    FILTER = 2
    UNIFY = 3
    ANALYSE = 4

    @classmethod
    def validate(cls, state: Union['DatasetManagerTask', str]) -> bool:
        """Validate DatasetManagerTask."""
        if isinstance(state, cls):
            return state in cls
        if isinstance(state, str):
            return state in cls.__members__
        return False

    def __str__(self):
        return str(self.name)

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var ANALYSE
var COLLECT
var FILTER
var UNIFY

Static methods

def validate(state: Union[_ForwardRef('DatasetManagerTask'), str]) ‑> bool

Validate DatasetManagerTask.

Expand source code
@classmethod
def validate(cls, state: Union['DatasetManagerTask', str]) -> bool:
    """Validate DatasetManagerTask."""
    if isinstance(state, cls):
        return state in cls
    if isinstance(state, str):
        return state in cls.__members__
    return False
class DatasetRepository (repository: str)

Wrapper around the whole dataset repository providing overview and abstraction of the storage system.

Expand source code
class DatasetRepository:
    """
    Wrapper around the whole dataset repository providing overview and abstraction of the storage system.
    """

    def __init__(self, repository: str):
        if repository and os.path.exists(repository):
            self.repository = os.path.abspath(repository)
        else:
            raise FileNotFoundError("Dataset Repository %s not found." % repository)

    def dumps(self, source: Union[DatasetSource, str] = None,
              state: Union[DatasetState, str] = None, dataset_id: str = '') -> str:
        """
        Return string representation of the specified dataset repository.
        The parameters represent the output filter options.
        """
        repo = self.get(source, state, dataset_id)
        repo_str = ''
        for d_src, d_states in repo.items():
            repo_str += '{:<8}: '.format(d_src)
            first_state = True

            for d_state, d_datasets in d_states.items():
                if first_state:
                    first_state = False
                else:
                    repo_str += " " * 10
                repo_str += "{:<10}: ".format(d_state)

                first_dataset = True
                for dataset in d_datasets:
                    if first_dataset:
                        repo_str += dataset + "\n"
                        first_dataset = False
                    else:
                        repo_str += " " * 22 + dataset + "\n"

        return repo_str

    def dump(self, source: Union[DatasetSource, str] = None,
             state: Union[DatasetState, str] = None, dataset_id: str = '') -> None:
        """
        Print string representation of the specified dataset repository to the STDOUT.
        The parameters represent the output filter options.
        """
        print(self.dumps(source, state, dataset_id))

    def get(self, source: Union[DatasetSource, str] = None,
            state: Union[DatasetState, str] = None, dataset_id: str = '') -> dict:
        """
        Return dictionary representation of the specified dataset repository.
        The parameters represent the output filter options.
        """

        def get_source() -> dict:
            ret_src = {}
            states = [state] if state else DatasetState
            # Iterate through filtered states and get its datasets
            for d_state in states:
                ret_state = dataset_path.get(d_state)
                if ret_state:
                    ret_src[str(d_state)] = ret_state
            return ret_src

        # Validate dataset source
        if source and not DatasetSource.validate(source):
            raise DatasetInvalidError("Dataset source %s is not valid." % source)
        # Validate dataset state
        if state and not DatasetState.validate(state):
            raise DatasetInvalidError("Dataset state %s is not valid." % state)

        ret_repo = {}
        sources = [source] if source else DatasetSource
        # Iterate through filtered sources and get its states
        for d_src in sources:
            dataset_path = Dataset(self.repository, d_src, dataset_id, None)
            ret_src = get_source()
            if ret_src:
                ret_repo[str(d_src)] = ret_src

        return ret_repo

    def __str__(self):
        return self.dumps()

Methods

def dump(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') ‑> NoneType

Print string representation of the specified dataset repository to the STDOUT. The parameters represent the output filter options.

Expand source code
def dump(self, source: Union[DatasetSource, str] = None,
         state: Union[DatasetState, str] = None, dataset_id: str = '') -> None:
    """
    Print string representation of the specified dataset repository to the STDOUT.
    The parameters represent the output filter options.
    """
    print(self.dumps(source, state, dataset_id))
def dumps(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') ‑> str

Return string representation of the specified dataset repository. The parameters represent the output filter options.

Expand source code
def dumps(self, source: Union[DatasetSource, str] = None,
          state: Union[DatasetState, str] = None, dataset_id: str = '') -> str:
    """
    Return string representation of the specified dataset repository.
    The parameters represent the output filter options.
    """
    repo = self.get(source, state, dataset_id)
    repo_str = ''
    for d_src, d_states in repo.items():
        repo_str += '{:<8}: '.format(d_src)
        first_state = True

        for d_state, d_datasets in d_states.items():
            if first_state:
                first_state = False
            else:
                repo_str += " " * 10
            repo_str += "{:<10}: ".format(d_state)

            first_dataset = True
            for dataset in d_datasets:
                if first_dataset:
                    repo_str += dataset + "\n"
                    first_dataset = False
                else:
                    repo_str += " " * 22 + dataset + "\n"

    return repo_str
def get(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') ‑> dict

Return dictionary representation of the specified dataset repository. The parameters represent the output filter options.

Expand source code
def get(self, source: Union[DatasetSource, str] = None,
        state: Union[DatasetState, str] = None, dataset_id: str = '') -> dict:
    """
    Return dictionary representation of the specified dataset repository.
    The parameters represent the output filter options.
    """

    def get_source() -> dict:
        ret_src = {}
        states = [state] if state else DatasetState
        # Iterate through filtered states and get its datasets
        for d_state in states:
            ret_state = dataset_path.get(d_state)
            if ret_state:
                ret_src[str(d_state)] = ret_state
        return ret_src

    # Validate dataset source
    if source and not DatasetSource.validate(source):
        raise DatasetInvalidError("Dataset source %s is not valid." % source)
    # Validate dataset state
    if state and not DatasetState.validate(state):
        raise DatasetInvalidError("Dataset state %s is not valid." % state)

    ret_repo = {}
    sources = [source] if source else DatasetSource
    # Iterate through filtered sources and get its states
    for d_src in sources:
        dataset_path = Dataset(self.repository, d_src, dataset_id, None)
        ret_src = get_source()
        if ret_src:
            ret_repo[str(d_src)] = ret_src

    return ret_repo
class DatasetSource (value, names=None, *, module=None, qualname=None, type=None, start=1)

Enumaration class of all supported Dataset sources.

Expand source code
class DatasetSource(IntEnum):
    """Enumaration class of all supported Dataset sources."""

    RAPID = 1
    CENSYS = 2

    @classmethod
    def validate(cls, source: Union['DatasetSource', str]) -> bool:
        """Validate DatasetSource."""
        if isinstance(source, cls):
            return source in cls
        if isinstance(source, str):
            return source in cls.__members__
        return False

    def __str__(self):
        return str(self.name)

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var CENSYS
var RAPID

Static methods

def validate(source: Union[_ForwardRef('DatasetSource'), str]) ‑> bool

Validate DatasetSource.

Expand source code
@classmethod
def validate(cls, source: Union['DatasetSource', str]) -> bool:
    """Validate DatasetSource."""
    if isinstance(source, cls):
        return source in cls
    if isinstance(source, str):
        return source in cls.__members__
    return False
class DatasetState (value, names=None, *, module=None, qualname=None, type=None, start=1)

Enumaration class of all supported Dataset states.

Expand source code
class DatasetState(IntEnum):
    """Enumaration class of all supported Dataset states."""

    COLLECTED = 1  # Dataset was collected and is available in a raw format
    FILTERED = 2  # Dataset was fitered
    UNIFIED = 3  # Dataset was unified to internal format, certificates were stored to CertDB
    ANALYSED = 4  # Dataset was already run through analysis, result might be available

    @classmethod
    def validate(cls, state: Union['DatasetState', str]) -> bool:
        """Validate DatasetState."""
        if isinstance(state, cls):
            return state in cls
        if isinstance(state, str):
            return state in cls.__members__
        return False

    def __str__(self):
        return str(self.name)

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var ANALYSED
var COLLECTED
var FILTERED
var UNIFIED

Static methods

def validate(state: Union[_ForwardRef('DatasetState'), str]) ‑> bool

Validate DatasetState.

Expand source code
@classmethod
def validate(cls, state: Union['DatasetState', str]) -> bool:
    """Validate DatasetState."""
    if isinstance(state, cls):
        return state in cls
    if isinstance(state, str):
        return state in cls.__members__
    return False
class DatasetUnificationError (*args, **kwargs)

Raised when dataset unification fails.

Expand source code
class DatasetUnificationError(ValueError):
    """Raised when dataset unification fails."""

Ancestors

  • builtins.ValueError
  • builtins.Exception
  • builtins.BaseException