Module cevast.dataset
This package is a collection of tools for working with certificate datasets.
Expand source code
"""This package is a collection of tools for working with certificate datasets."""
__all__ = [
'DatasetSource',
'DatasetState',
'Dataset',
'DatasetRepository',
'DatasetCollectionError',
'DatasetInvalidError',
'DatasetUnificationError',
'DatasetManagerFactory',
'DatasetManager',
'DatasetManagerTask',
]
__version__ = '1.1'
__author__ = 'Radim Podola'
from .dataset import (
DatasetSource,
DatasetState,
Dataset,
DatasetRepository,
DatasetCollectionError,
DatasetInvalidError,
DatasetUnificationError,
)
from .manager_factory import DatasetManagerFactory
from .managers import DatasetManagerTask, DatasetManager
Sub-modules
cevast.dataset.cli
-
Group of CLI commands used for Dataset management and tasks …
cevast.dataset.collectors
-
This package provides tools for collecting certificate datasets.
cevast.dataset.dataset
-
This module contains structures and classes logically related to a certificate datasets.
cevast.dataset.filters
-
This package provides tools for certificate datasets filtering.
cevast.dataset.manager_factory
-
This module contains DatasetManager factory implementation.
cevast.dataset.managers
-
This package provides certificate dataset managers …
cevast.dataset.unifiers
-
This package provides tools for unification of certificate datasets.
Classes
class Dataset (repository: str, source: Union[DatasetSource, str], date_id: str, port: Union[str, int], extension: str = 'gz')
-
Class representing a single dataset and providing an interface to the dataset on the filesystem.
Dataset is identified by
source
,state
and filename where filename consists of mandatorydate_id
,port
number and optional suffix. date_id represents the date (or date range) when the dataset was created (certificates were collected and added to the dataset) and its string in fomat "YYYYMMDD", port is application port on which was the data collceted, suffix can specify the dataset in various ways and is used to distinguish the files internally.source
,date_id
andport
are static identifiers provided upon object initialization.The dataset state is dynamic identifier that is the last part of its complete identification at the time. Each dataset can be found at 1-N of the following generalized states: - COLLECTED - FILTERED - UNIFIED - ANALYSED
Full Dataset path template: {repository}/{source}/{state}/{date_id}[_{port}][_suffix].{extension}
Initialize the static identifiers
Expand source code
class Dataset: """ Class representing a single dataset and providing an interface to the dataset on the filesystem. Dataset is identified by `source`, `state` and filename where filename consists of mandatory `date_id`, `port` number and optional suffix. date_id represents the date (or date range) when the dataset was created (certificates were collected and added to the dataset) and its string in fomat "YYYYMMDD", port is application port on which was the data collceted, suffix can specify the dataset in various ways and is used to distinguish the files internally. `source`, `date_id` and `port` are static identifiers provided upon object initialization. The dataset state is dynamic identifier that is the last part of its complete identification at the time. Each dataset can be found at 1-N of the following generalized states: - COLLECTED - FILTERED - UNIFIED - ANALYSED Full Dataset path template: {repository}/{source}/{state}/{date_id}[_{port}][_suffix].{extension} """ def __init__(self, repository: str, source: Union[DatasetSource, str], date_id: str, port: Union[str, int], extension: str = 'gz'): """Initialize the static identifiers""" # Validate and init dataset repository if not os.path.exists(repository): raise DatasetInvalidError("Repository %s not found" % repository) self._repository = os.path.abspath(repository) # Validate and init dataset source if not DatasetSource.validate(source): raise DatasetInvalidError("Dataset source %s is not valid." % source) self._source = str(source) self._date_id = date_id self._port = str(port) if port is not None else '' self._extension = extension @property def source(self) -> str: """Get the Dataset source.""" return self._source @property def date(self) -> str: """Get the DATE ID.""" return self._date_id @property def port(self) -> str: """Get the PORT.""" return self._port @property def extension(self) -> str: """Get the extension.""" return self._extension @property def static_filename(self) -> str: """Get the static part of dataset filename.""" return Dataset.format_filename(self._date_id, self._port) @classmethod def from_full_path(cls, path: str) -> 'Dataset': """ Initialize Dataset object from the given path, or return None if object cannot be initialized. """ template = r"^(?P<repo>\S+)[/\\](?P<source>\S+)[/\\](?P<state>\S+)[/\\](?P<date>\d{8})(_(?P<port>\d+))?(_\S+)?\.(?P<ext>\S+)$" match = re.match(template, path) if not match: return None try: return cls(repository=match.group('repo'), source=match.group('source'), date_id=match.group('date'), port=match.group('port'), extension=match.group('ext'), ) except DatasetInvalidError: log.exception("Cannot initialize Dataset class from the given path.") return None @staticmethod def format_filename(date: str, port: str = '', suffix: str = '') -> str: """Format dataset filename.""" if port and suffix: return "{}_{}_{}".format(date, port, suffix) if port or suffix: return "{}_{}".format(date, port or suffix) return date def path(self, state: Union[DatasetState, str], physically: bool = True) -> str: """ Assemble and return path to the dataset in given state. If `physically` flag is set and and path does not exist, create it. """ # Validate dataset state if not DatasetState.validate(state): raise DatasetInvalidError("Dataset state %s is not valid." % state) path = os.path.join(self._repository, self._source, str(state)) if physically and not os.path.exists(path): log.info("Path <%s> does not exist yet, will be created.", path) os.makedirs(path) return path def full_path(self, state: Union[DatasetState, str], suffix: str = '', check_if_exists: bool = False, physically: bool = False) -> str: """ Assemble and return full path to the dataset file in given state including custome suffix. Return None if `check_if_exists` and file does not exist. """ filename = Dataset.format_filename(self._date_id, self._port, suffix) path = os.path.join(self.path(state, physically), filename + '.' + self._extension) if check_if_exists and not os.path.exists(path): return None return path def delete(self, state: Union[DatasetState, str]) -> None: """Delete the dataset of given state from the repository.""" path = self.path(state, False) if not os.path.exists(path): return for file in directory_with_prefix(path, Dataset.format_filename(self._date_id, self._port)): log.debug("Will delete dataset <%s>.", file) os.remove(file) if not os.listdir(path): log.info("No more datasets in state <%s>, directory will be deleted.", state) os.rmdir(path) def purge(self) -> None: """Delete all datasets of specified source from the repository.""" shutil.rmtree(os.path.join(self._repository, self._source), ignore_errors=True) def get(self, state: Union[DatasetState, str], suffix: str = '', full_path: bool = False) -> Tuple[str]: """Return all datasets stored in the dataset repository matching the paramaters.""" filename = Dataset.format_filename(self._date_id, self._port, suffix) path = self.path(state, False) return tuple(directory_with_prefix(path, filename, not full_path)) def exists(self, state: Union[DatasetState, str]) -> bool: """Test if the dataset exists in given state.""" path = self.path(state, False) for _ in directory_with_prefix(path, Dataset.format_filename(self._date_id, self._port)): return True return False def exists_any(self) -> bool: """Test if any dataset (of the specified type) exists.""" for state in DatasetState: if self.exists(state): return True return False def move(self, state: Union[DatasetState, str], source: str, format_name: bool = True) -> None: """ Move the source file to the repository of the dataset of given state. If `format_name` is true, then name is formatted. """ if os.path.exists(source): path = self.path(state) filename = os.path.basename(source) if format_name: filename = Dataset.format_filename(self._date_id, self._port, filename) shutil.move(os.path.abspath(source), os.path.join(path, filename)) def __str__(self): return os.path.join(self._repository, self._source, "{}", Dataset.format_filename(self._date_id, self._port)) def __repr__(self): return "<%s.%s source=%s, date_id=%s, port=%s>" % ( self.__class__.__module__, self.__class__.__qualname__, self._source, self._date_id, self._port, ) def __eq__(self, other): if not isinstance(other, Dataset): # don't attempt to compare against unrelated types return NotImplemented return self._port == other.port and self._date_id == other.date and self._source == other.source def __hash__(self): return hash((self._port, self._date_id, self._source))
Static methods
def format_filename(date: str, port: str = '', suffix: str = '') ‑> str
-
Format dataset filename.
Expand source code
@staticmethod def format_filename(date: str, port: str = '', suffix: str = '') -> str: """Format dataset filename.""" if port and suffix: return "{}_{}_{}".format(date, port, suffix) if port or suffix: return "{}_{}".format(date, port or suffix) return date
def from_full_path(path: str) ‑> Dataset
-
Initialize Dataset object from the given path, or return None if object cannot be initialized.
Expand source code
@classmethod def from_full_path(cls, path: str) -> 'Dataset': """ Initialize Dataset object from the given path, or return None if object cannot be initialized. """ template = r"^(?P<repo>\S+)[/\\](?P<source>\S+)[/\\](?P<state>\S+)[/\\](?P<date>\d{8})(_(?P<port>\d+))?(_\S+)?\.(?P<ext>\S+)$" match = re.match(template, path) if not match: return None try: return cls(repository=match.group('repo'), source=match.group('source'), date_id=match.group('date'), port=match.group('port'), extension=match.group('ext'), ) except DatasetInvalidError: log.exception("Cannot initialize Dataset class from the given path.") return None
Instance variables
var date : str
-
Get the DATE ID.
Expand source code
@property def date(self) -> str: """Get the DATE ID.""" return self._date_id
var extension : str
-
Get the extension.
Expand source code
@property def extension(self) -> str: """Get the extension.""" return self._extension
var port : str
-
Get the PORT.
Expand source code
@property def port(self) -> str: """Get the PORT.""" return self._port
var source : str
-
Get the Dataset source.
Expand source code
@property def source(self) -> str: """Get the Dataset source.""" return self._source
var static_filename : str
-
Get the static part of dataset filename.
Expand source code
@property def static_filename(self) -> str: """Get the static part of dataset filename.""" return Dataset.format_filename(self._date_id, self._port)
Methods
def delete(self, state: Union[DatasetState, str]) ‑> NoneType
-
Delete the dataset of given state from the repository.
Expand source code
def delete(self, state: Union[DatasetState, str]) -> None: """Delete the dataset of given state from the repository.""" path = self.path(state, False) if not os.path.exists(path): return for file in directory_with_prefix(path, Dataset.format_filename(self._date_id, self._port)): log.debug("Will delete dataset <%s>.", file) os.remove(file) if not os.listdir(path): log.info("No more datasets in state <%s>, directory will be deleted.", state) os.rmdir(path)
def exists(self, state: Union[DatasetState, str]) ‑> bool
-
Test if the dataset exists in given state.
Expand source code
def exists(self, state: Union[DatasetState, str]) -> bool: """Test if the dataset exists in given state.""" path = self.path(state, False) for _ in directory_with_prefix(path, Dataset.format_filename(self._date_id, self._port)): return True return False
def exists_any(self) ‑> bool
-
Test if any dataset (of the specified type) exists.
Expand source code
def exists_any(self) -> bool: """Test if any dataset (of the specified type) exists.""" for state in DatasetState: if self.exists(state): return True return False
def full_path(self, state: Union[DatasetState, str], suffix: str = '', check_if_exists: bool = False, physically: bool = False) ‑> str
-
Assemble and return full path to the dataset file in given state including custome suffix. Return None if
check_if_exists
and file does not exist.Expand source code
def full_path(self, state: Union[DatasetState, str], suffix: str = '', check_if_exists: bool = False, physically: bool = False) -> str: """ Assemble and return full path to the dataset file in given state including custome suffix. Return None if `check_if_exists` and file does not exist. """ filename = Dataset.format_filename(self._date_id, self._port, suffix) path = os.path.join(self.path(state, physically), filename + '.' + self._extension) if check_if_exists and not os.path.exists(path): return None return path
def get(self, state: Union[DatasetState, str], suffix: str = '', full_path: bool = False) ‑> Tuple[str]
-
Return all datasets stored in the dataset repository matching the paramaters.
Expand source code
def get(self, state: Union[DatasetState, str], suffix: str = '', full_path: bool = False) -> Tuple[str]: """Return all datasets stored in the dataset repository matching the paramaters.""" filename = Dataset.format_filename(self._date_id, self._port, suffix) path = self.path(state, False) return tuple(directory_with_prefix(path, filename, not full_path))
def move(self, state: Union[DatasetState, str], source: str, format_name: bool = True) ‑> NoneType
-
Move the source file to the repository of the dataset of given state. If
format_name
is true, then name is formatted.Expand source code
def move(self, state: Union[DatasetState, str], source: str, format_name: bool = True) -> None: """ Move the source file to the repository of the dataset of given state. If `format_name` is true, then name is formatted. """ if os.path.exists(source): path = self.path(state) filename = os.path.basename(source) if format_name: filename = Dataset.format_filename(self._date_id, self._port, filename) shutil.move(os.path.abspath(source), os.path.join(path, filename))
def path(self, state: Union[DatasetState, str], physically: bool = True) ‑> str
-
Assemble and return path to the dataset in given state. If
physically
flag is set and and path does not exist, create it.Expand source code
def path(self, state: Union[DatasetState, str], physically: bool = True) -> str: """ Assemble and return path to the dataset in given state. If `physically` flag is set and and path does not exist, create it. """ # Validate dataset state if not DatasetState.validate(state): raise DatasetInvalidError("Dataset state %s is not valid." % state) path = os.path.join(self._repository, self._source, str(state)) if physically and not os.path.exists(path): log.info("Path <%s> does not exist yet, will be created.", path) os.makedirs(path) return path
def purge(self) ‑> NoneType
-
Delete all datasets of specified source from the repository.
Expand source code
def purge(self) -> None: """Delete all datasets of specified source from the repository.""" shutil.rmtree(os.path.join(self._repository, self._source), ignore_errors=True)
class DatasetCollectionError (*args, **kwargs)
-
Raised when dataset collection fails.
Expand source code
class DatasetCollectionError(ValueError): """Raised when dataset collection fails."""
Ancestors
- builtins.ValueError
- builtins.Exception
- builtins.BaseException
class DatasetInvalidError (*args, **kwargs)
-
Raised when the dataset has an invalid identifier.
Expand source code
class DatasetInvalidError(ValueError): """Raised when the dataset has an invalid identifier."""
Ancestors
- builtins.ValueError
- builtins.Exception
- builtins.BaseException
class DatasetManager (repository: str, date:
= datetime.date(2021, 5, 4), ports: Tuple[str] = ('443',), cpu_cores: int = 1) -
An abstract DatasetManager class representing an interface that can be used to perform various tasks with a certificate dataset.
For Manager to perform a task, a repository path and date must be provided. Date works as an identifier of the dataset even though the date don't need to match exactly - the newest dataset by that date is identified. Additionally a port number might be used to more specify the dataset.
DatasetManager offers performing tasks independently or running a series of tasks at once by
run
method (usefull for performing tasks that would be rather complex and/or long-lasting running separatelly). Running a series might also be more optimized.Initialize Manager.
repository
is dataset repository, 'date' is date, 'ports' is list of ports more specifying datasets, 'cpu_cores' is maximum number of CPU cores that might be used.Expand source code
class DatasetManager(ABC): """ An abstract DatasetManager class representing an interface that can be used to perform various tasks with a certificate dataset. For Manager to perform a task, a repository path and date must be provided. Date works as an identifier of the dataset even though the date don't need to match exactly - the newest dataset by that date is identified. Additionally a port number might be used to more specify the dataset. DatasetManager offers performing tasks independently or running a series of tasks at once by `run` method (usefull for performing tasks that would be rather complex and/or long-lasting running separatelly). Running a series might also be more optimized. """ @property @abstractclassmethod def dataset_source(cls) -> DatasetSource: """ Dataset source property used to identify a manager specification. """ @abstractmethod def __init__(self, repository: str, date: datetime.date = datetime.today().date(), ports: Tuple[str] = ('443',), cpu_cores: int = 1): """ Initialize Manager. `repository` is dataset repository, 'date' is date, 'ports' is list of ports more specifying datasets, 'cpu_cores' is maximum number of CPU cores that might be used. """ @abstractmethod def run(self, task_pipline: Tuple[Tuple[DatasetManagerTask, dict]]) -> None: """ Run a series of tasks. `task_pipline` is tuple composed of the required tasks in form of pairs ('task', 'cfg'), where: - 'task' is supported DatasetManagerTask, - 'cfg' is dictionary filled of parameters that will be passed to individual task methods. Caller function must ensure that 'cfg' parameters match task method's declaration. TODO make cfg dict optional """ @abstractmethod def collect(self, api_key: str = None) -> Tuple[Dataset]: """ Collect a dataset. `api_key` is API access key that might be needed to retrieve datasets (depends on source implementation). Return tuple of collected Datasets. """ @abstractmethod def filter(self, methods: list = None) -> Tuple[Dataset]: """ Filter a dataset with given methods. Return tuple of filtered Datasets. """ @abstractmethod def unify(self, certdb: CertDB) -> Tuple[Dataset]: """ Unify a dataset. `certdb` is CertDB instance to work with (to insert parsed certificates to). Return tuple of unified Datasets. """ @abstractmethod def analyse(self, analyser: Type[CertAnalyser], analyser_cfg: dict) -> Tuple[Dataset]: """ Analyse a dataset with given analyser. `analyser` is a CertAnalyser class, `analyser_cfg` is a dictionary with analyser paramaters that will be passed to the initializer. Call to analyser is performed like this: analyser(cert_chain, **analyser_cfg). Return tuple of analysed Datasets. """
Ancestors
- abc.ABC
Subclasses
Instance variables
var dataset_source
-
A decorator indicating abstract classmethods.
Similar to abstractmethod.
Usage
class C(metaclass=ABCMeta): @abstractclassmethod def my_abstract_classmethod(cls, …): …
'abstractclassmethod' is deprecated. Use 'classmethod' with 'abstractmethod' instead.
Methods
def analyse(self, analyser: Type[CertAnalyser], analyser_cfg: dict) ‑> Tuple[Dataset]
-
Analyse a dataset with given analyser.
analyser
is a CertAnalyser class,analyser_cfg
is a dictionary with analyser paramaters that will be passed to the initializer.Call to analyser is performed like this: analyser(cert_chain, **analyser_cfg).
Return tuple of analysed Datasets.
Expand source code
@abstractmethod def analyse(self, analyser: Type[CertAnalyser], analyser_cfg: dict) -> Tuple[Dataset]: """ Analyse a dataset with given analyser. `analyser` is a CertAnalyser class, `analyser_cfg` is a dictionary with analyser paramaters that will be passed to the initializer. Call to analyser is performed like this: analyser(cert_chain, **analyser_cfg). Return tuple of analysed Datasets. """
def collect(self, api_key: str = None) ‑> Tuple[Dataset]
-
Collect a dataset.
api_key
is API access key that might be needed to retrieve datasets (depends on source implementation). Return tuple of collected Datasets.Expand source code
@abstractmethod def collect(self, api_key: str = None) -> Tuple[Dataset]: """ Collect a dataset. `api_key` is API access key that might be needed to retrieve datasets (depends on source implementation). Return tuple of collected Datasets. """
def filter(self, methods: list = None) ‑> Tuple[Dataset]
-
Filter a dataset with given methods. Return tuple of filtered Datasets.
Expand source code
@abstractmethod def filter(self, methods: list = None) -> Tuple[Dataset]: """ Filter a dataset with given methods. Return tuple of filtered Datasets. """
def run(self, task_pipline: Tuple[Tuple[DatasetManagerTask, dict]]) ‑> NoneType
-
Run a series of tasks.
task_pipline
is tuple composed of the required tasks in form of pairs ('task', 'cfg'), where: - 'task' is supported DatasetManagerTask, - 'cfg' is dictionary filled of parameters that will be passed to individual task methods. Caller function must ensure that 'cfg' parameters match task method's declaration. TODO make cfg dict optionalExpand source code
@abstractmethod def run(self, task_pipline: Tuple[Tuple[DatasetManagerTask, dict]]) -> None: """ Run a series of tasks. `task_pipline` is tuple composed of the required tasks in form of pairs ('task', 'cfg'), where: - 'task' is supported DatasetManagerTask, - 'cfg' is dictionary filled of parameters that will be passed to individual task methods. Caller function must ensure that 'cfg' parameters match task method's declaration. TODO make cfg dict optional """
def unify(self, certdb: CertDB) ‑> Tuple[Dataset]
-
Unify a dataset.
certdb
is CertDB instance to work with (to insert parsed certificates to). Return tuple of unified Datasets.Expand source code
@abstractmethod def unify(self, certdb: CertDB) -> Tuple[Dataset]: """ Unify a dataset. `certdb` is CertDB instance to work with (to insert parsed certificates to). Return tuple of unified Datasets. """
class DatasetManagerFactory
-
Factory class providing the specific DatasetManager class based on DatasetSource.
Important: DatasetManager classes are registered automatically. To add a new
specialized DatasetManager class, it must inherit DatasetManager and be placed in "cevast.dataset.managers" package.
Expand source code
class DatasetManagerFactory: """ Factory class providing the specific DatasetManager class based on DatasetSource. .. IMPORTANT:: DatasetManager classes are registered automatically. To add a new specialized DatasetManager class, it must inherit DatasetManager and be placed in "cevast.dataset.managers" package. """ __classes = {} @classmethod def __load_classes(cls): """ Automatically initialize lookup dictionary with subclasses of DatasetManager class that are visible to the Python interpreter (obtained from `type.__subclasses__()`). """ for manager_class in managers.DatasetManager.__subclasses__(): if hasattr(manager_class, 'dataset_source'): cls.__classes[str(manager_class.dataset_source)] = manager_class @classmethod def get_manager(cls, dataset_source: Union[DatasetSource, str]) -> Type[managers.DatasetManager]: """Return a corresponding DatasetManager class based on `dataset_source`.""" if not cls.__classes: cls.__load_classes() # Validate and init dataset manager if not DatasetSource.validate(dataset_source): raise DatasetInvalidError("Dataset source %s is not valid." % dataset_source) if str(dataset_source) not in cls.__classes: raise DatasetInvalidError("Dataset source %s has no manager." % dataset_source) return cls.__classes[str(dataset_source)]
Static methods
def get_manager(dataset_source: Union[DatasetSource, str]) ‑> Type[DatasetManager]
-
Return a corresponding DatasetManager class based on
dataset_source
.Expand source code
@classmethod def get_manager(cls, dataset_source: Union[DatasetSource, str]) -> Type[managers.DatasetManager]: """Return a corresponding DatasetManager class based on `dataset_source`.""" if not cls.__classes: cls.__load_classes() # Validate and init dataset manager if not DatasetSource.validate(dataset_source): raise DatasetInvalidError("Dataset source %s is not valid." % dataset_source) if str(dataset_source) not in cls.__classes: raise DatasetInvalidError("Dataset source %s has no manager." % dataset_source) return cls.__classes[str(dataset_source)]
class DatasetManagerTask (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Enumeration of DatasetManager Tasks
Expand source code
class DatasetManagerTask(IntEnum): """Enumeration of DatasetManager Tasks""" COLLECT = 1 FILTER = 2 UNIFY = 3 ANALYSE = 4 @classmethod def validate(cls, state: Union['DatasetManagerTask', str]) -> bool: """Validate DatasetManagerTask.""" if isinstance(state, cls): return state in cls if isinstance(state, str): return state in cls.__members__ return False def __str__(self): return str(self.name)
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var ANALYSE
var COLLECT
var FILTER
var UNIFY
Static methods
def validate(state: Union[_ForwardRef('DatasetManagerTask'), str]) ‑> bool
-
Validate DatasetManagerTask.
Expand source code
@classmethod def validate(cls, state: Union['DatasetManagerTask', str]) -> bool: """Validate DatasetManagerTask.""" if isinstance(state, cls): return state in cls if isinstance(state, str): return state in cls.__members__ return False
class DatasetRepository (repository: str)
-
Wrapper around the whole dataset repository providing overview and abstraction of the storage system.
Expand source code
class DatasetRepository: """ Wrapper around the whole dataset repository providing overview and abstraction of the storage system. """ def __init__(self, repository: str): if repository and os.path.exists(repository): self.repository = os.path.abspath(repository) else: raise FileNotFoundError("Dataset Repository %s not found." % repository) def dumps(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') -> str: """ Return string representation of the specified dataset repository. The parameters represent the output filter options. """ repo = self.get(source, state, dataset_id) repo_str = '' for d_src, d_states in repo.items(): repo_str += '{:<8}: '.format(d_src) first_state = True for d_state, d_datasets in d_states.items(): if first_state: first_state = False else: repo_str += " " * 10 repo_str += "{:<10}: ".format(d_state) first_dataset = True for dataset in d_datasets: if first_dataset: repo_str += dataset + "\n" first_dataset = False else: repo_str += " " * 22 + dataset + "\n" return repo_str def dump(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') -> None: """ Print string representation of the specified dataset repository to the STDOUT. The parameters represent the output filter options. """ print(self.dumps(source, state, dataset_id)) def get(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') -> dict: """ Return dictionary representation of the specified dataset repository. The parameters represent the output filter options. """ def get_source() -> dict: ret_src = {} states = [state] if state else DatasetState # Iterate through filtered states and get its datasets for d_state in states: ret_state = dataset_path.get(d_state) if ret_state: ret_src[str(d_state)] = ret_state return ret_src # Validate dataset source if source and not DatasetSource.validate(source): raise DatasetInvalidError("Dataset source %s is not valid." % source) # Validate dataset state if state and not DatasetState.validate(state): raise DatasetInvalidError("Dataset state %s is not valid." % state) ret_repo = {} sources = [source] if source else DatasetSource # Iterate through filtered sources and get its states for d_src in sources: dataset_path = Dataset(self.repository, d_src, dataset_id, None) ret_src = get_source() if ret_src: ret_repo[str(d_src)] = ret_src return ret_repo def __str__(self): return self.dumps()
Methods
def dump(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') ‑> NoneType
-
Print string representation of the specified dataset repository to the STDOUT. The parameters represent the output filter options.
Expand source code
def dump(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') -> None: """ Print string representation of the specified dataset repository to the STDOUT. The parameters represent the output filter options. """ print(self.dumps(source, state, dataset_id))
def dumps(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') ‑> str
-
Return string representation of the specified dataset repository. The parameters represent the output filter options.
Expand source code
def dumps(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') -> str: """ Return string representation of the specified dataset repository. The parameters represent the output filter options. """ repo = self.get(source, state, dataset_id) repo_str = '' for d_src, d_states in repo.items(): repo_str += '{:<8}: '.format(d_src) first_state = True for d_state, d_datasets in d_states.items(): if first_state: first_state = False else: repo_str += " " * 10 repo_str += "{:<10}: ".format(d_state) first_dataset = True for dataset in d_datasets: if first_dataset: repo_str += dataset + "\n" first_dataset = False else: repo_str += " " * 22 + dataset + "\n" return repo_str
def get(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') ‑> dict
-
Return dictionary representation of the specified dataset repository. The parameters represent the output filter options.
Expand source code
def get(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') -> dict: """ Return dictionary representation of the specified dataset repository. The parameters represent the output filter options. """ def get_source() -> dict: ret_src = {} states = [state] if state else DatasetState # Iterate through filtered states and get its datasets for d_state in states: ret_state = dataset_path.get(d_state) if ret_state: ret_src[str(d_state)] = ret_state return ret_src # Validate dataset source if source and not DatasetSource.validate(source): raise DatasetInvalidError("Dataset source %s is not valid." % source) # Validate dataset state if state and not DatasetState.validate(state): raise DatasetInvalidError("Dataset state %s is not valid." % state) ret_repo = {} sources = [source] if source else DatasetSource # Iterate through filtered sources and get its states for d_src in sources: dataset_path = Dataset(self.repository, d_src, dataset_id, None) ret_src = get_source() if ret_src: ret_repo[str(d_src)] = ret_src return ret_repo
class DatasetSource (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Enumaration class of all supported Dataset sources.
Expand source code
class DatasetSource(IntEnum): """Enumaration class of all supported Dataset sources.""" RAPID = 1 CENSYS = 2 @classmethod def validate(cls, source: Union['DatasetSource', str]) -> bool: """Validate DatasetSource.""" if isinstance(source, cls): return source in cls if isinstance(source, str): return source in cls.__members__ return False def __str__(self): return str(self.name)
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var CENSYS
var RAPID
Static methods
def validate(source: Union[_ForwardRef('DatasetSource'), str]) ‑> bool
-
Validate DatasetSource.
Expand source code
@classmethod def validate(cls, source: Union['DatasetSource', str]) -> bool: """Validate DatasetSource.""" if isinstance(source, cls): return source in cls if isinstance(source, str): return source in cls.__members__ return False
class DatasetState (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Enumaration class of all supported Dataset states.
Expand source code
class DatasetState(IntEnum): """Enumaration class of all supported Dataset states.""" COLLECTED = 1 # Dataset was collected and is available in a raw format FILTERED = 2 # Dataset was fitered UNIFIED = 3 # Dataset was unified to internal format, certificates were stored to CertDB ANALYSED = 4 # Dataset was already run through analysis, result might be available @classmethod def validate(cls, state: Union['DatasetState', str]) -> bool: """Validate DatasetState.""" if isinstance(state, cls): return state in cls if isinstance(state, str): return state in cls.__members__ return False def __str__(self): return str(self.name)
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var ANALYSED
var COLLECTED
var FILTERED
var UNIFIED
Static methods
def validate(state: Union[_ForwardRef('DatasetState'), str]) ‑> bool
-
Validate DatasetState.
Expand source code
@classmethod def validate(cls, state: Union['DatasetState', str]) -> bool: """Validate DatasetState.""" if isinstance(state, cls): return state in cls if isinstance(state, str): return state in cls.__members__ return False
class DatasetUnificationError (*args, **kwargs)
-
Raised when dataset unification fails.
Expand source code
class DatasetUnificationError(ValueError): """Raised when dataset unification fails."""
Ancestors
- builtins.ValueError
- builtins.Exception
- builtins.BaseException