Module cevast.dataset.dataset
This module contains structures and classes logically related to a certificate datasets.
Expand source code
"""
This module contains structures and classes logically related to a certificate datasets.
"""
import os
import logging
import shutil
import re
from typing import Union, Tuple
from enum import IntEnum
from cevast.utils import directory_with_prefix
log = logging.getLogger(__name__)
class DatasetCollectionError(ValueError):
"""Raised when dataset collection fails."""
class DatasetUnificationError(ValueError):
"""Raised when dataset unification fails."""
class DatasetInvalidError(ValueError):
"""Raised when the dataset has an invalid identifier."""
class DatasetSource(IntEnum):
"""Enumaration class of all supported Dataset sources."""
RAPID = 1
CENSYS = 2
@classmethod
def validate(cls, source: Union['DatasetSource', str]) -> bool:
"""Validate DatasetSource."""
if isinstance(source, cls):
return source in cls
if isinstance(source, str):
return source in cls.__members__
return False
def __str__(self):
return str(self.name)
class DatasetState(IntEnum):
"""Enumaration class of all supported Dataset states."""
COLLECTED = 1 # Dataset was collected and is available in a raw format
FILTERED = 2 # Dataset was fitered
UNIFIED = 3 # Dataset was unified to internal format, certificates were stored to CertDB
ANALYSED = 4 # Dataset was already run through analysis, result might be available
@classmethod
def validate(cls, state: Union['DatasetState', str]) -> bool:
"""Validate DatasetState."""
if isinstance(state, cls):
return state in cls
if isinstance(state, str):
return state in cls.__members__
return False
def __str__(self):
return str(self.name)
class Dataset:
"""
Class representing a single dataset and providing an interface to the dataset on the filesystem.
Dataset is identified by `source`, `state` and filename where filename consists of mandatory
`date_id`, `port` number and optional suffix. date_id represents the date (or date range) when
the dataset was created (certificates were collected and added to the dataset) and its string in fomat
"YYYYMMDD", port is application port on which was the data collceted, suffix can specify the dataset
in various ways and is used to distinguish the files internally. `source`, `date_id` and `port`
are static identifiers provided upon object initialization.
The dataset state is dynamic identifier that is the last part of its complete identification at the time.
Each dataset can be found at 1-N of the following generalized states:
- COLLECTED
- FILTERED
- UNIFIED
- ANALYSED
Full Dataset path template: {repository}/{source}/{state}/{date_id}[_{port}][_suffix].{extension}
"""
def __init__(self, repository: str, source: Union[DatasetSource, str],
date_id: str, port: Union[str, int], extension: str = 'gz'):
"""Initialize the static identifiers"""
# Validate and init dataset repository
if not os.path.exists(repository):
raise DatasetInvalidError("Repository %s not found" % repository)
self._repository = os.path.abspath(repository)
# Validate and init dataset source
if not DatasetSource.validate(source):
raise DatasetInvalidError("Dataset source %s is not valid." % source)
self._source = str(source)
self._date_id = date_id
self._port = str(port) if port is not None else ''
self._extension = extension
@property
def source(self) -> str:
"""Get the Dataset source."""
return self._source
@property
def date(self) -> str:
"""Get the DATE ID."""
return self._date_id
@property
def port(self) -> str:
"""Get the PORT."""
return self._port
@property
def extension(self) -> str:
"""Get the extension."""
return self._extension
@property
def static_filename(self) -> str:
"""Get the static part of dataset filename."""
return Dataset.format_filename(self._date_id, self._port)
@classmethod
def from_full_path(cls, path: str) -> 'Dataset':
"""
Initialize Dataset object from the given path,
or return None if object cannot be initialized.
"""
template = r"^(?P<repo>\S+)[/\\](?P<source>\S+)[/\\](?P<state>\S+)[/\\](?P<date>\d{8})(_(?P<port>\d+))?(_\S+)?\.(?P<ext>\S+)$"
match = re.match(template, path)
if not match:
return None
try:
return cls(repository=match.group('repo'),
source=match.group('source'),
date_id=match.group('date'),
port=match.group('port'),
extension=match.group('ext'),
)
except DatasetInvalidError:
log.exception("Cannot initialize Dataset class from the given path.")
return None
@staticmethod
def format_filename(date: str, port: str = '', suffix: str = '') -> str:
"""Format dataset filename."""
if port and suffix:
return "{}_{}_{}".format(date, port, suffix)
if port or suffix:
return "{}_{}".format(date, port or suffix)
return date
def path(self, state: Union[DatasetState, str], physically: bool = True) -> str:
"""
Assemble and return path to the dataset in given state.
If `physically` flag is set and and path does not exist, create it.
"""
# Validate dataset state
if not DatasetState.validate(state):
raise DatasetInvalidError("Dataset state %s is not valid." % state)
path = os.path.join(self._repository, self._source, str(state))
if physically and not os.path.exists(path):
log.info("Path <%s> does not exist yet, will be created.", path)
os.makedirs(path)
return path
def full_path(self, state: Union[DatasetState, str], suffix: str = '',
check_if_exists: bool = False, physically: bool = False) -> str:
"""
Assemble and return full path to the dataset file in given state including custome suffix.
Return None if `check_if_exists` and file does not exist.
"""
filename = Dataset.format_filename(self._date_id, self._port, suffix)
path = os.path.join(self.path(state, physically), filename + '.' + self._extension)
if check_if_exists and not os.path.exists(path):
return None
return path
def delete(self, state: Union[DatasetState, str]) -> None:
"""Delete the dataset of given state from the repository."""
path = self.path(state, False)
if not os.path.exists(path):
return
for file in directory_with_prefix(path, Dataset.format_filename(self._date_id, self._port)):
log.debug("Will delete dataset <%s>.", file)
os.remove(file)
if not os.listdir(path):
log.info("No more datasets in state <%s>, directory will be deleted.", state)
os.rmdir(path)
def purge(self) -> None:
"""Delete all datasets of specified source from the repository."""
shutil.rmtree(os.path.join(self._repository, self._source), ignore_errors=True)
def get(self, state: Union[DatasetState, str], suffix: str = '', full_path: bool = False) -> Tuple[str]:
"""Return all datasets stored in the dataset repository matching the paramaters."""
filename = Dataset.format_filename(self._date_id, self._port, suffix)
path = self.path(state, False)
return tuple(directory_with_prefix(path, filename, not full_path))
def exists(self, state: Union[DatasetState, str]) -> bool:
"""Test if the dataset exists in given state."""
path = self.path(state, False)
for _ in directory_with_prefix(path, Dataset.format_filename(self._date_id, self._port)):
return True
return False
def exists_any(self) -> bool:
"""Test if any dataset (of the specified type) exists."""
for state in DatasetState:
if self.exists(state):
return True
return False
def move(self, state: Union[DatasetState, str], source: str, format_name: bool = True) -> None:
"""
Move the source file to the repository of the dataset of given state.
If `format_name` is true, then name is formatted.
"""
if os.path.exists(source):
path = self.path(state)
filename = os.path.basename(source)
if format_name:
filename = Dataset.format_filename(self._date_id, self._port, filename)
shutil.move(os.path.abspath(source), os.path.join(path, filename))
def __str__(self):
return os.path.join(self._repository, self._source, "{}", Dataset.format_filename(self._date_id, self._port))
def __repr__(self):
return "<%s.%s source=%s, date_id=%s, port=%s>" % (
self.__class__.__module__,
self.__class__.__qualname__,
self._source,
self._date_id,
self._port,
)
def __eq__(self, other):
if not isinstance(other, Dataset):
# don't attempt to compare against unrelated types
return NotImplemented
return self._port == other.port and self._date_id == other.date and self._source == other.source
def __hash__(self):
return hash((self._port, self._date_id, self._source))
class DatasetRepository:
"""
Wrapper around the whole dataset repository providing overview and abstraction of the storage system.
"""
def __init__(self, repository: str):
if repository and os.path.exists(repository):
self.repository = os.path.abspath(repository)
else:
raise FileNotFoundError("Dataset Repository %s not found." % repository)
def dumps(self, source: Union[DatasetSource, str] = None,
state: Union[DatasetState, str] = None, dataset_id: str = '') -> str:
"""
Return string representation of the specified dataset repository.
The parameters represent the output filter options.
"""
repo = self.get(source, state, dataset_id)
repo_str = ''
for d_src, d_states in repo.items():
repo_str += '{:<8}: '.format(d_src)
first_state = True
for d_state, d_datasets in d_states.items():
if first_state:
first_state = False
else:
repo_str += " " * 10
repo_str += "{:<10}: ".format(d_state)
first_dataset = True
for dataset in d_datasets:
if first_dataset:
repo_str += dataset + "\n"
first_dataset = False
else:
repo_str += " " * 22 + dataset + "\n"
return repo_str
def dump(self, source: Union[DatasetSource, str] = None,
state: Union[DatasetState, str] = None, dataset_id: str = '') -> None:
"""
Print string representation of the specified dataset repository to the STDOUT.
The parameters represent the output filter options.
"""
print(self.dumps(source, state, dataset_id))
def get(self, source: Union[DatasetSource, str] = None,
state: Union[DatasetState, str] = None, dataset_id: str = '') -> dict:
"""
Return dictionary representation of the specified dataset repository.
The parameters represent the output filter options.
"""
def get_source() -> dict:
ret_src = {}
states = [state] if state else DatasetState
# Iterate through filtered states and get its datasets
for d_state in states:
ret_state = dataset_path.get(d_state)
if ret_state:
ret_src[str(d_state)] = ret_state
return ret_src
# Validate dataset source
if source and not DatasetSource.validate(source):
raise DatasetInvalidError("Dataset source %s is not valid." % source)
# Validate dataset state
if state and not DatasetState.validate(state):
raise DatasetInvalidError("Dataset state %s is not valid." % state)
ret_repo = {}
sources = [source] if source else DatasetSource
# Iterate through filtered sources and get its states
for d_src in sources:
dataset_path = Dataset(self.repository, d_src, dataset_id, None)
ret_src = get_source()
if ret_src:
ret_repo[str(d_src)] = ret_src
return ret_repo
def __str__(self):
return self.dumps()
Classes
class Dataset (repository: str, source: Union[DatasetSource, str], date_id: str, port: Union[str, int], extension: str = 'gz')
-
Class representing a single dataset and providing an interface to the dataset on the filesystem.
Dataset is identified by
source
,state
and filename where filename consists of mandatorydate_id
,port
number and optional suffix. date_id represents the date (or date range) when the dataset was created (certificates were collected and added to the dataset) and its string in fomat "YYYYMMDD", port is application port on which was the data collceted, suffix can specify the dataset in various ways and is used to distinguish the files internally.source
,date_id
andport
are static identifiers provided upon object initialization.The dataset state is dynamic identifier that is the last part of its complete identification at the time. Each dataset can be found at 1-N of the following generalized states: - COLLECTED - FILTERED - UNIFIED - ANALYSED
Full Dataset path template: {repository}/{source}/{state}/{date_id}[_{port}][_suffix].{extension}
Initialize the static identifiers
Expand source code
class Dataset: """ Class representing a single dataset and providing an interface to the dataset on the filesystem. Dataset is identified by `source`, `state` and filename where filename consists of mandatory `date_id`, `port` number and optional suffix. date_id represents the date (or date range) when the dataset was created (certificates were collected and added to the dataset) and its string in fomat "YYYYMMDD", port is application port on which was the data collceted, suffix can specify the dataset in various ways and is used to distinguish the files internally. `source`, `date_id` and `port` are static identifiers provided upon object initialization. The dataset state is dynamic identifier that is the last part of its complete identification at the time. Each dataset can be found at 1-N of the following generalized states: - COLLECTED - FILTERED - UNIFIED - ANALYSED Full Dataset path template: {repository}/{source}/{state}/{date_id}[_{port}][_suffix].{extension} """ def __init__(self, repository: str, source: Union[DatasetSource, str], date_id: str, port: Union[str, int], extension: str = 'gz'): """Initialize the static identifiers""" # Validate and init dataset repository if not os.path.exists(repository): raise DatasetInvalidError("Repository %s not found" % repository) self._repository = os.path.abspath(repository) # Validate and init dataset source if not DatasetSource.validate(source): raise DatasetInvalidError("Dataset source %s is not valid." % source) self._source = str(source) self._date_id = date_id self._port = str(port) if port is not None else '' self._extension = extension @property def source(self) -> str: """Get the Dataset source.""" return self._source @property def date(self) -> str: """Get the DATE ID.""" return self._date_id @property def port(self) -> str: """Get the PORT.""" return self._port @property def extension(self) -> str: """Get the extension.""" return self._extension @property def static_filename(self) -> str: """Get the static part of dataset filename.""" return Dataset.format_filename(self._date_id, self._port) @classmethod def from_full_path(cls, path: str) -> 'Dataset': """ Initialize Dataset object from the given path, or return None if object cannot be initialized. """ template = r"^(?P<repo>\S+)[/\\](?P<source>\S+)[/\\](?P<state>\S+)[/\\](?P<date>\d{8})(_(?P<port>\d+))?(_\S+)?\.(?P<ext>\S+)$" match = re.match(template, path) if not match: return None try: return cls(repository=match.group('repo'), source=match.group('source'), date_id=match.group('date'), port=match.group('port'), extension=match.group('ext'), ) except DatasetInvalidError: log.exception("Cannot initialize Dataset class from the given path.") return None @staticmethod def format_filename(date: str, port: str = '', suffix: str = '') -> str: """Format dataset filename.""" if port and suffix: return "{}_{}_{}".format(date, port, suffix) if port or suffix: return "{}_{}".format(date, port or suffix) return date def path(self, state: Union[DatasetState, str], physically: bool = True) -> str: """ Assemble and return path to the dataset in given state. If `physically` flag is set and and path does not exist, create it. """ # Validate dataset state if not DatasetState.validate(state): raise DatasetInvalidError("Dataset state %s is not valid." % state) path = os.path.join(self._repository, self._source, str(state)) if physically and not os.path.exists(path): log.info("Path <%s> does not exist yet, will be created.", path) os.makedirs(path) return path def full_path(self, state: Union[DatasetState, str], suffix: str = '', check_if_exists: bool = False, physically: bool = False) -> str: """ Assemble and return full path to the dataset file in given state including custome suffix. Return None if `check_if_exists` and file does not exist. """ filename = Dataset.format_filename(self._date_id, self._port, suffix) path = os.path.join(self.path(state, physically), filename + '.' + self._extension) if check_if_exists and not os.path.exists(path): return None return path def delete(self, state: Union[DatasetState, str]) -> None: """Delete the dataset of given state from the repository.""" path = self.path(state, False) if not os.path.exists(path): return for file in directory_with_prefix(path, Dataset.format_filename(self._date_id, self._port)): log.debug("Will delete dataset <%s>.", file) os.remove(file) if not os.listdir(path): log.info("No more datasets in state <%s>, directory will be deleted.", state) os.rmdir(path) def purge(self) -> None: """Delete all datasets of specified source from the repository.""" shutil.rmtree(os.path.join(self._repository, self._source), ignore_errors=True) def get(self, state: Union[DatasetState, str], suffix: str = '', full_path: bool = False) -> Tuple[str]: """Return all datasets stored in the dataset repository matching the paramaters.""" filename = Dataset.format_filename(self._date_id, self._port, suffix) path = self.path(state, False) return tuple(directory_with_prefix(path, filename, not full_path)) def exists(self, state: Union[DatasetState, str]) -> bool: """Test if the dataset exists in given state.""" path = self.path(state, False) for _ in directory_with_prefix(path, Dataset.format_filename(self._date_id, self._port)): return True return False def exists_any(self) -> bool: """Test if any dataset (of the specified type) exists.""" for state in DatasetState: if self.exists(state): return True return False def move(self, state: Union[DatasetState, str], source: str, format_name: bool = True) -> None: """ Move the source file to the repository of the dataset of given state. If `format_name` is true, then name is formatted. """ if os.path.exists(source): path = self.path(state) filename = os.path.basename(source) if format_name: filename = Dataset.format_filename(self._date_id, self._port, filename) shutil.move(os.path.abspath(source), os.path.join(path, filename)) def __str__(self): return os.path.join(self._repository, self._source, "{}", Dataset.format_filename(self._date_id, self._port)) def __repr__(self): return "<%s.%s source=%s, date_id=%s, port=%s>" % ( self.__class__.__module__, self.__class__.__qualname__, self._source, self._date_id, self._port, ) def __eq__(self, other): if not isinstance(other, Dataset): # don't attempt to compare against unrelated types return NotImplemented return self._port == other.port and self._date_id == other.date and self._source == other.source def __hash__(self): return hash((self._port, self._date_id, self._source))
Static methods
def format_filename(date: str, port: str = '', suffix: str = '') ‑> str
-
Format dataset filename.
Expand source code
@staticmethod def format_filename(date: str, port: str = '', suffix: str = '') -> str: """Format dataset filename.""" if port and suffix: return "{}_{}_{}".format(date, port, suffix) if port or suffix: return "{}_{}".format(date, port or suffix) return date
def from_full_path(path: str) ‑> Dataset
-
Initialize Dataset object from the given path, or return None if object cannot be initialized.
Expand source code
@classmethod def from_full_path(cls, path: str) -> 'Dataset': """ Initialize Dataset object from the given path, or return None if object cannot be initialized. """ template = r"^(?P<repo>\S+)[/\\](?P<source>\S+)[/\\](?P<state>\S+)[/\\](?P<date>\d{8})(_(?P<port>\d+))?(_\S+)?\.(?P<ext>\S+)$" match = re.match(template, path) if not match: return None try: return cls(repository=match.group('repo'), source=match.group('source'), date_id=match.group('date'), port=match.group('port'), extension=match.group('ext'), ) except DatasetInvalidError: log.exception("Cannot initialize Dataset class from the given path.") return None
Instance variables
var date : str
-
Get the DATE ID.
Expand source code
@property def date(self) -> str: """Get the DATE ID.""" return self._date_id
var extension : str
-
Get the extension.
Expand source code
@property def extension(self) -> str: """Get the extension.""" return self._extension
var port : str
-
Get the PORT.
Expand source code
@property def port(self) -> str: """Get the PORT.""" return self._port
var source : str
-
Get the Dataset source.
Expand source code
@property def source(self) -> str: """Get the Dataset source.""" return self._source
var static_filename : str
-
Get the static part of dataset filename.
Expand source code
@property def static_filename(self) -> str: """Get the static part of dataset filename.""" return Dataset.format_filename(self._date_id, self._port)
Methods
def delete(self, state: Union[DatasetState, str]) ‑> NoneType
-
Delete the dataset of given state from the repository.
Expand source code
def delete(self, state: Union[DatasetState, str]) -> None: """Delete the dataset of given state from the repository.""" path = self.path(state, False) if not os.path.exists(path): return for file in directory_with_prefix(path, Dataset.format_filename(self._date_id, self._port)): log.debug("Will delete dataset <%s>.", file) os.remove(file) if not os.listdir(path): log.info("No more datasets in state <%s>, directory will be deleted.", state) os.rmdir(path)
def exists(self, state: Union[DatasetState, str]) ‑> bool
-
Test if the dataset exists in given state.
Expand source code
def exists(self, state: Union[DatasetState, str]) -> bool: """Test if the dataset exists in given state.""" path = self.path(state, False) for _ in directory_with_prefix(path, Dataset.format_filename(self._date_id, self._port)): return True return False
def exists_any(self) ‑> bool
-
Test if any dataset (of the specified type) exists.
Expand source code
def exists_any(self) -> bool: """Test if any dataset (of the specified type) exists.""" for state in DatasetState: if self.exists(state): return True return False
def full_path(self, state: Union[DatasetState, str], suffix: str = '', check_if_exists: bool = False, physically: bool = False) ‑> str
-
Assemble and return full path to the dataset file in given state including custome suffix. Return None if
check_if_exists
and file does not exist.Expand source code
def full_path(self, state: Union[DatasetState, str], suffix: str = '', check_if_exists: bool = False, physically: bool = False) -> str: """ Assemble and return full path to the dataset file in given state including custome suffix. Return None if `check_if_exists` and file does not exist. """ filename = Dataset.format_filename(self._date_id, self._port, suffix) path = os.path.join(self.path(state, physically), filename + '.' + self._extension) if check_if_exists and not os.path.exists(path): return None return path
def get(self, state: Union[DatasetState, str], suffix: str = '', full_path: bool = False) ‑> Tuple[str]
-
Return all datasets stored in the dataset repository matching the paramaters.
Expand source code
def get(self, state: Union[DatasetState, str], suffix: str = '', full_path: bool = False) -> Tuple[str]: """Return all datasets stored in the dataset repository matching the paramaters.""" filename = Dataset.format_filename(self._date_id, self._port, suffix) path = self.path(state, False) return tuple(directory_with_prefix(path, filename, not full_path))
def move(self, state: Union[DatasetState, str], source: str, format_name: bool = True) ‑> NoneType
-
Move the source file to the repository of the dataset of given state. If
format_name
is true, then name is formatted.Expand source code
def move(self, state: Union[DatasetState, str], source: str, format_name: bool = True) -> None: """ Move the source file to the repository of the dataset of given state. If `format_name` is true, then name is formatted. """ if os.path.exists(source): path = self.path(state) filename = os.path.basename(source) if format_name: filename = Dataset.format_filename(self._date_id, self._port, filename) shutil.move(os.path.abspath(source), os.path.join(path, filename))
def path(self, state: Union[DatasetState, str], physically: bool = True) ‑> str
-
Assemble and return path to the dataset in given state. If
physically
flag is set and and path does not exist, create it.Expand source code
def path(self, state: Union[DatasetState, str], physically: bool = True) -> str: """ Assemble and return path to the dataset in given state. If `physically` flag is set and and path does not exist, create it. """ # Validate dataset state if not DatasetState.validate(state): raise DatasetInvalidError("Dataset state %s is not valid." % state) path = os.path.join(self._repository, self._source, str(state)) if physically and not os.path.exists(path): log.info("Path <%s> does not exist yet, will be created.", path) os.makedirs(path) return path
def purge(self) ‑> NoneType
-
Delete all datasets of specified source from the repository.
Expand source code
def purge(self) -> None: """Delete all datasets of specified source from the repository.""" shutil.rmtree(os.path.join(self._repository, self._source), ignore_errors=True)
class DatasetCollectionError (*args, **kwargs)
-
Raised when dataset collection fails.
Expand source code
class DatasetCollectionError(ValueError): """Raised when dataset collection fails."""
Ancestors
- builtins.ValueError
- builtins.Exception
- builtins.BaseException
class DatasetInvalidError (*args, **kwargs)
-
Raised when the dataset has an invalid identifier.
Expand source code
class DatasetInvalidError(ValueError): """Raised when the dataset has an invalid identifier."""
Ancestors
- builtins.ValueError
- builtins.Exception
- builtins.BaseException
class DatasetRepository (repository: str)
-
Wrapper around the whole dataset repository providing overview and abstraction of the storage system.
Expand source code
class DatasetRepository: """ Wrapper around the whole dataset repository providing overview and abstraction of the storage system. """ def __init__(self, repository: str): if repository and os.path.exists(repository): self.repository = os.path.abspath(repository) else: raise FileNotFoundError("Dataset Repository %s not found." % repository) def dumps(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') -> str: """ Return string representation of the specified dataset repository. The parameters represent the output filter options. """ repo = self.get(source, state, dataset_id) repo_str = '' for d_src, d_states in repo.items(): repo_str += '{:<8}: '.format(d_src) first_state = True for d_state, d_datasets in d_states.items(): if first_state: first_state = False else: repo_str += " " * 10 repo_str += "{:<10}: ".format(d_state) first_dataset = True for dataset in d_datasets: if first_dataset: repo_str += dataset + "\n" first_dataset = False else: repo_str += " " * 22 + dataset + "\n" return repo_str def dump(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') -> None: """ Print string representation of the specified dataset repository to the STDOUT. The parameters represent the output filter options. """ print(self.dumps(source, state, dataset_id)) def get(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') -> dict: """ Return dictionary representation of the specified dataset repository. The parameters represent the output filter options. """ def get_source() -> dict: ret_src = {} states = [state] if state else DatasetState # Iterate through filtered states and get its datasets for d_state in states: ret_state = dataset_path.get(d_state) if ret_state: ret_src[str(d_state)] = ret_state return ret_src # Validate dataset source if source and not DatasetSource.validate(source): raise DatasetInvalidError("Dataset source %s is not valid." % source) # Validate dataset state if state and not DatasetState.validate(state): raise DatasetInvalidError("Dataset state %s is not valid." % state) ret_repo = {} sources = [source] if source else DatasetSource # Iterate through filtered sources and get its states for d_src in sources: dataset_path = Dataset(self.repository, d_src, dataset_id, None) ret_src = get_source() if ret_src: ret_repo[str(d_src)] = ret_src return ret_repo def __str__(self): return self.dumps()
Methods
def dump(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') ‑> NoneType
-
Print string representation of the specified dataset repository to the STDOUT. The parameters represent the output filter options.
Expand source code
def dump(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') -> None: """ Print string representation of the specified dataset repository to the STDOUT. The parameters represent the output filter options. """ print(self.dumps(source, state, dataset_id))
def dumps(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') ‑> str
-
Return string representation of the specified dataset repository. The parameters represent the output filter options.
Expand source code
def dumps(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') -> str: """ Return string representation of the specified dataset repository. The parameters represent the output filter options. """ repo = self.get(source, state, dataset_id) repo_str = '' for d_src, d_states in repo.items(): repo_str += '{:<8}: '.format(d_src) first_state = True for d_state, d_datasets in d_states.items(): if first_state: first_state = False else: repo_str += " " * 10 repo_str += "{:<10}: ".format(d_state) first_dataset = True for dataset in d_datasets: if first_dataset: repo_str += dataset + "\n" first_dataset = False else: repo_str += " " * 22 + dataset + "\n" return repo_str
def get(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') ‑> dict
-
Return dictionary representation of the specified dataset repository. The parameters represent the output filter options.
Expand source code
def get(self, source: Union[DatasetSource, str] = None, state: Union[DatasetState, str] = None, dataset_id: str = '') -> dict: """ Return dictionary representation of the specified dataset repository. The parameters represent the output filter options. """ def get_source() -> dict: ret_src = {} states = [state] if state else DatasetState # Iterate through filtered states and get its datasets for d_state in states: ret_state = dataset_path.get(d_state) if ret_state: ret_src[str(d_state)] = ret_state return ret_src # Validate dataset source if source and not DatasetSource.validate(source): raise DatasetInvalidError("Dataset source %s is not valid." % source) # Validate dataset state if state and not DatasetState.validate(state): raise DatasetInvalidError("Dataset state %s is not valid." % state) ret_repo = {} sources = [source] if source else DatasetSource # Iterate through filtered sources and get its states for d_src in sources: dataset_path = Dataset(self.repository, d_src, dataset_id, None) ret_src = get_source() if ret_src: ret_repo[str(d_src)] = ret_src return ret_repo
class DatasetSource (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Enumaration class of all supported Dataset sources.
Expand source code
class DatasetSource(IntEnum): """Enumaration class of all supported Dataset sources.""" RAPID = 1 CENSYS = 2 @classmethod def validate(cls, source: Union['DatasetSource', str]) -> bool: """Validate DatasetSource.""" if isinstance(source, cls): return source in cls if isinstance(source, str): return source in cls.__members__ return False def __str__(self): return str(self.name)
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var CENSYS
var RAPID
Static methods
def validate(source: Union[_ForwardRef('DatasetSource'), str]) ‑> bool
-
Validate DatasetSource.
Expand source code
@classmethod def validate(cls, source: Union['DatasetSource', str]) -> bool: """Validate DatasetSource.""" if isinstance(source, cls): return source in cls if isinstance(source, str): return source in cls.__members__ return False
class DatasetState (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Enumaration class of all supported Dataset states.
Expand source code
class DatasetState(IntEnum): """Enumaration class of all supported Dataset states.""" COLLECTED = 1 # Dataset was collected and is available in a raw format FILTERED = 2 # Dataset was fitered UNIFIED = 3 # Dataset was unified to internal format, certificates were stored to CertDB ANALYSED = 4 # Dataset was already run through analysis, result might be available @classmethod def validate(cls, state: Union['DatasetState', str]) -> bool: """Validate DatasetState.""" if isinstance(state, cls): return state in cls if isinstance(state, str): return state in cls.__members__ return False def __str__(self): return str(self.name)
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var ANALYSED
var COLLECTED
var FILTERED
var UNIFIED
Static methods
def validate(state: Union[_ForwardRef('DatasetState'), str]) ‑> bool
-
Validate DatasetState.
Expand source code
@classmethod def validate(cls, state: Union['DatasetState', str]) -> bool: """Validate DatasetState.""" if isinstance(state, cls): return state in cls if isinstance(state, str): return state in cls.__members__ return False
class DatasetUnificationError (*args, **kwargs)
-
Raised when dataset unification fails.
Expand source code
class DatasetUnificationError(ValueError): """Raised when dataset unification fails."""
Ancestors
- builtins.ValueError
- builtins.Exception
- builtins.BaseException