Module cevast.dataset.managers.rapid
This module contains DatasetManager interface implementation of RAPID dataset source.
Expand source code
"""This module contains DatasetManager interface implementation of RAPID dataset source."""
import os
import logging
from collections import OrderedDict
from datetime import datetime
from typing import Tuple, Type
from cevast.certdb import CertDB
from cevast.analysis import CertAnalyser
from .manager import DatasetManager, DatasetManagerTask
from ..unifiers import RapidUnifier
from ..collectors import RapidCollector
from ..dataset import DatasetSource, Dataset, DatasetState, DatasetUnificationError
__author__ = 'Radim Podola'
log = logging.getLogger(__name__)
cli_log = logging.getLogger('CEVAST_CLI')
class RapidDatasetManager(DatasetManager):
"""DatasetManager interface implementation of RAPID dataset source."""
_CERT_NAME_SUFFIX = 'certs'
_HOSTS_NAME_SUFFIX = 'hosts'
_CHAINS_NAME_SUFFIX = 'chains'
_BROKEN_CHAINS_NAME_SUFFIX = 'broken_chains'
dataset_source = DatasetSource.RAPID.name
# TODO add date range
# TODO make ports optional
# TODO support port range in single validate or unify tasks
def __init__(self, repository: str, date: datetime.date = datetime.today().date(),
ports: Tuple[str] = ('443',), cpu_cores: int = 1):
self._repository = repository
self._date = date
self._ports = (ports,) if isinstance(ports, str) else ports
self._cpu_cores = cpu_cores
self.__date_id = date.strftime('%Y%m%d')
log.info('RapidDatasetManager initialized with repository=%s, date=%s, ports=%s', repository, date, ports)
def run(self, task_pipline: Tuple[Tuple[DatasetManagerTask, dict]]) -> None:
collected_datasets, unified_datasets, analysed_datasets = None, None, None
# Sort just to ensure valid sequence
task_pipline = sorted(task_pipline, key=lambda x: x[0])
log.info('Started with task pipeline %s', task_pipline)
try:
# Run tasks
for task_item in task_pipline:
task, params = task_item
log.info('Run task %s with parameters: %s', task, params)
cli_log.info('Running TASK %s', task)
# Runs collection TASK, collected datasets might be used in next task
if task == DatasetManagerTask.COLLECT:
collected_datasets = self.collect(**params)
log.info("Collected datasets: %s", collected_datasets)
# Runs filtering TASK
elif task == DatasetManagerTask.FILTER:
raise NotImplementedError # Not implemented yet
# Runs unifying TASK, unified datasets might be used in next task
elif task == DatasetManagerTask.UNIFY:
if collected_datasets: # If some datasets were just collected in pipeline, use these
unified_datasets = self.__unify(datasets=collected_datasets, **params)
else:
unified_datasets = self.unify(**params)
log.info("unified datasets: %s", unified_datasets)
# Runs analytical TASK
elif task == DatasetManagerTask.ANALYSE:
if unified_datasets:
analysed_datasets = self.__analyse(datasets=unified_datasets, **params)
else: # If some datasets were just unified in pipeline, use these
analysed_datasets = self.analyse(**params)
log.info("Analysed datasets: %s", analysed_datasets)
except TypeError:
log.exception("Error when running task pipeline, are the arguments set correctly?")
log.info("Finished")
return collected_datasets, unified_datasets, analysed_datasets
def collect(self, api_key: str = None) -> Tuple[Dataset]:
log.info('Collecting started')
cli_log.info('Collecting started')
collector = RapidCollector(api_key)
# Create dummy dataset only to get target dir
dummy_dataset = Dataset(self._repository, self.dataset_source, self.__date_id, None)
download_dir = dummy_dataset.path(DatasetState.COLLECTED)
# Collect datasets
collected = collector.collect(
download_dir=download_dir, date=self._date, filter_ports=self._ports, filter_types=('hosts', 'certs')
)
# Remove duplicates (same datasets with e.g. different suffix)
datasets = tuple(OrderedDict.fromkeys(map(Dataset.from_full_path, collected)))
log.info('%d dataset were downloaded', len(datasets))
log.info('Collecting finished')
cli_log.info('Collecting finished')
return datasets if datasets else None
def filter(self, methods: list = None) -> str:
raise NotImplementedError
def unify(self, certdb: CertDB) -> Tuple[Dataset]:
datasets = self.__init_datasets()
# Unify datasets
unified = self.__unify(certdb=certdb, datasets=datasets)
return unified if unified else None
def analyse(self, analyser: CertAnalyser, analyser_cfg: dict) -> Tuple[Dataset]:
datasets = self.__init_datasets()
# Analyse datasets
analysed = self.__analyse(datasets=datasets, analyser=analyser, analyser_cfg=analyser_cfg)
return analysed if analysed else None
def __init_datasets(self) -> Tuple[Dataset]:
return tuple(Dataset(self._repository, self.dataset_source, self.__date_id, port) for port in self._ports)
def __init_unifier(self, dataset: Dataset) -> RapidUnifier:
certs_file = dataset.full_path(DatasetState.COLLECTED, self._CERT_NAME_SUFFIX, True)
hosts_file = dataset.full_path(DatasetState.COLLECTED, self._HOSTS_NAME_SUFFIX, True)
if certs_file and hosts_file:
chain_file = dataset.full_path(DatasetState.UNIFIED, self._CHAINS_NAME_SUFFIX, physically=True)
broken_file = dataset.full_path(DatasetState.UNIFIED, self._BROKEN_CHAINS_NAME_SUFFIX, physically=True)
try:
unifier = RapidUnifier(certs_file, hosts_file, chain_file, broken_file)
log.info("Will unify dataset: %s", dataset.static_filename)
return unifier
except FileNotFoundError:
log.exception("Collected dataset not found")
return None
def __unify(self, certdb: CertDB, datasets: Tuple[Dataset], store_log: bool = True) -> Tuple[Dataset]:
log.info('Unifying started')
cli_log.info('Unifying started')
# First validate datasets and init unifiers
unifyable, unifiers = [], []
for dataset in datasets:
unifier = self.__init_unifier(dataset)
if unifier is not None:
unifiers.append(unifier)
unifyable.append(dataset)
# Parse and store certificates
for unifier in unifiers:
try:
cli_log.info('Parsing certificates from Dataset file <%s>', unifier.certs_dataset)
unifier.store_certs(certdb)
except (OSError, ValueError):
log.exception("Error during certs dataset parsing -> rollback")
certdb.rollback()
raise DatasetUnificationError("Error during certs dataset parsing")
# Now parse and store chains
for unifier in unifiers:
try:
cli_log.info('Parsing host chains from Dataset file <%s>', unifier.certs_dataset)
unifier.store_chains(certdb)
if store_log:
# Store dataset unification log
unifier.save_unification_log(os.path.splitext(unifier.chain_file)[0] + '.log')
except OSError:
log.exception("Error during hosts dataset parsing -> commit")
certdb.commit()
raise DatasetUnificationError("Error during hosts dataset parsing")
# Remove unified datasets
# for dataset in unifyable:
# dataset.delete(DatasetState.COLLECTED)
log.info('Unifying finished')
cli_log.info('Unifying finished')
return tuple(unifyable) if unifyable else None
def __analyse(self, datasets: Tuple[Dataset], analyser: Type[CertAnalyser], analyser_cfg: dict) -> Tuple[Dataset]:
log.info('Analysis started')
cli_log.info('Analysis started')
analysable = []
for dataset in datasets:
chain_file = dataset.full_path(DatasetState.UNIFIED, self._CHAINS_NAME_SUFFIX, True)
if chain_file:
analysable.append(dataset)
filename = os.path.join(dataset.path(DatasetState.ANALYSED), dataset.static_filename)
# Open CertAnalyser as context manager
with analyser(output_file=filename, processes=self._cpu_cores, **analyser_cfg) as analyser_ctx:
log.info("Will analyse dataset: %s", dataset.static_filename)
for host, chain in RapidUnifier.read_chains(chain_file):
analyser_ctx.schedule(host, chain)
# Indicate that no more data for analysis will be scheduled
analyser_ctx.done()
log.info("Dataset analysis finished")
log.info('Analysis finished')
cli_log.info('Analysis finished')
return tuple(analysable) if analysable else None
Classes
class RapidDatasetManager (repository: str, date:
= datetime.date(2021, 5, 4), ports: Tuple[str] = ('443',), cpu_cores: int = 1) -
DatasetManager interface implementation of RAPID dataset source.
Initialize Manager.
repository
is dataset repository, 'date' is date, 'ports' is list of ports more specifying datasets, 'cpu_cores' is maximum number of CPU cores that might be used.Expand source code
class RapidDatasetManager(DatasetManager): """DatasetManager interface implementation of RAPID dataset source.""" _CERT_NAME_SUFFIX = 'certs' _HOSTS_NAME_SUFFIX = 'hosts' _CHAINS_NAME_SUFFIX = 'chains' _BROKEN_CHAINS_NAME_SUFFIX = 'broken_chains' dataset_source = DatasetSource.RAPID.name # TODO add date range # TODO make ports optional # TODO support port range in single validate or unify tasks def __init__(self, repository: str, date: datetime.date = datetime.today().date(), ports: Tuple[str] = ('443',), cpu_cores: int = 1): self._repository = repository self._date = date self._ports = (ports,) if isinstance(ports, str) else ports self._cpu_cores = cpu_cores self.__date_id = date.strftime('%Y%m%d') log.info('RapidDatasetManager initialized with repository=%s, date=%s, ports=%s', repository, date, ports) def run(self, task_pipline: Tuple[Tuple[DatasetManagerTask, dict]]) -> None: collected_datasets, unified_datasets, analysed_datasets = None, None, None # Sort just to ensure valid sequence task_pipline = sorted(task_pipline, key=lambda x: x[0]) log.info('Started with task pipeline %s', task_pipline) try: # Run tasks for task_item in task_pipline: task, params = task_item log.info('Run task %s with parameters: %s', task, params) cli_log.info('Running TASK %s', task) # Runs collection TASK, collected datasets might be used in next task if task == DatasetManagerTask.COLLECT: collected_datasets = self.collect(**params) log.info("Collected datasets: %s", collected_datasets) # Runs filtering TASK elif task == DatasetManagerTask.FILTER: raise NotImplementedError # Not implemented yet # Runs unifying TASK, unified datasets might be used in next task elif task == DatasetManagerTask.UNIFY: if collected_datasets: # If some datasets were just collected in pipeline, use these unified_datasets = self.__unify(datasets=collected_datasets, **params) else: unified_datasets = self.unify(**params) log.info("unified datasets: %s", unified_datasets) # Runs analytical TASK elif task == DatasetManagerTask.ANALYSE: if unified_datasets: analysed_datasets = self.__analyse(datasets=unified_datasets, **params) else: # If some datasets were just unified in pipeline, use these analysed_datasets = self.analyse(**params) log.info("Analysed datasets: %s", analysed_datasets) except TypeError: log.exception("Error when running task pipeline, are the arguments set correctly?") log.info("Finished") return collected_datasets, unified_datasets, analysed_datasets def collect(self, api_key: str = None) -> Tuple[Dataset]: log.info('Collecting started') cli_log.info('Collecting started') collector = RapidCollector(api_key) # Create dummy dataset only to get target dir dummy_dataset = Dataset(self._repository, self.dataset_source, self.__date_id, None) download_dir = dummy_dataset.path(DatasetState.COLLECTED) # Collect datasets collected = collector.collect( download_dir=download_dir, date=self._date, filter_ports=self._ports, filter_types=('hosts', 'certs') ) # Remove duplicates (same datasets with e.g. different suffix) datasets = tuple(OrderedDict.fromkeys(map(Dataset.from_full_path, collected))) log.info('%d dataset were downloaded', len(datasets)) log.info('Collecting finished') cli_log.info('Collecting finished') return datasets if datasets else None def filter(self, methods: list = None) -> str: raise NotImplementedError def unify(self, certdb: CertDB) -> Tuple[Dataset]: datasets = self.__init_datasets() # Unify datasets unified = self.__unify(certdb=certdb, datasets=datasets) return unified if unified else None def analyse(self, analyser: CertAnalyser, analyser_cfg: dict) -> Tuple[Dataset]: datasets = self.__init_datasets() # Analyse datasets analysed = self.__analyse(datasets=datasets, analyser=analyser, analyser_cfg=analyser_cfg) return analysed if analysed else None def __init_datasets(self) -> Tuple[Dataset]: return tuple(Dataset(self._repository, self.dataset_source, self.__date_id, port) for port in self._ports) def __init_unifier(self, dataset: Dataset) -> RapidUnifier: certs_file = dataset.full_path(DatasetState.COLLECTED, self._CERT_NAME_SUFFIX, True) hosts_file = dataset.full_path(DatasetState.COLLECTED, self._HOSTS_NAME_SUFFIX, True) if certs_file and hosts_file: chain_file = dataset.full_path(DatasetState.UNIFIED, self._CHAINS_NAME_SUFFIX, physically=True) broken_file = dataset.full_path(DatasetState.UNIFIED, self._BROKEN_CHAINS_NAME_SUFFIX, physically=True) try: unifier = RapidUnifier(certs_file, hosts_file, chain_file, broken_file) log.info("Will unify dataset: %s", dataset.static_filename) return unifier except FileNotFoundError: log.exception("Collected dataset not found") return None def __unify(self, certdb: CertDB, datasets: Tuple[Dataset], store_log: bool = True) -> Tuple[Dataset]: log.info('Unifying started') cli_log.info('Unifying started') # First validate datasets and init unifiers unifyable, unifiers = [], [] for dataset in datasets: unifier = self.__init_unifier(dataset) if unifier is not None: unifiers.append(unifier) unifyable.append(dataset) # Parse and store certificates for unifier in unifiers: try: cli_log.info('Parsing certificates from Dataset file <%s>', unifier.certs_dataset) unifier.store_certs(certdb) except (OSError, ValueError): log.exception("Error during certs dataset parsing -> rollback") certdb.rollback() raise DatasetUnificationError("Error during certs dataset parsing") # Now parse and store chains for unifier in unifiers: try: cli_log.info('Parsing host chains from Dataset file <%s>', unifier.certs_dataset) unifier.store_chains(certdb) if store_log: # Store dataset unification log unifier.save_unification_log(os.path.splitext(unifier.chain_file)[0] + '.log') except OSError: log.exception("Error during hosts dataset parsing -> commit") certdb.commit() raise DatasetUnificationError("Error during hosts dataset parsing") # Remove unified datasets # for dataset in unifyable: # dataset.delete(DatasetState.COLLECTED) log.info('Unifying finished') cli_log.info('Unifying finished') return tuple(unifyable) if unifyable else None def __analyse(self, datasets: Tuple[Dataset], analyser: Type[CertAnalyser], analyser_cfg: dict) -> Tuple[Dataset]: log.info('Analysis started') cli_log.info('Analysis started') analysable = [] for dataset in datasets: chain_file = dataset.full_path(DatasetState.UNIFIED, self._CHAINS_NAME_SUFFIX, True) if chain_file: analysable.append(dataset) filename = os.path.join(dataset.path(DatasetState.ANALYSED), dataset.static_filename) # Open CertAnalyser as context manager with analyser(output_file=filename, processes=self._cpu_cores, **analyser_cfg) as analyser_ctx: log.info("Will analyse dataset: %s", dataset.static_filename) for host, chain in RapidUnifier.read_chains(chain_file): analyser_ctx.schedule(host, chain) # Indicate that no more data for analysis will be scheduled analyser_ctx.done() log.info("Dataset analysis finished") log.info('Analysis finished') cli_log.info('Analysis finished') return tuple(analysable) if analysable else None
Ancestors
- DatasetManager
- abc.ABC
Inherited members