Module cevast.dataset.unifiers
This package provides tools for unification of certificate datasets.
Expand source code
"""This package provides tools for unification of certificate datasets."""
__all__ = ['RapidUnifier']
__author__ = 'Radim Podola'
from .rapid import RapidUnifier
Sub-modules
cevast.dataset.unifiers.rapid
-
This module contains implementation of RAPID dataset source unifier.
Classes
class RapidUnifier (certs_dataset: str, hosts_dataset: str, chain_file: str, broken_chain_file: str = None)
-
A unifier class that unifies RAPID dataset.
Class parses RAPID dataset, stores the parsed certificates into CertDB and builds certificate chains. Chains are written as unified dataset format.
Expand source code
class RapidUnifier: """ A unifier class that unifies RAPID dataset. Class parses RAPID dataset, stores the parsed certificates into CertDB and builds certificate chains. Chains are written as unified dataset format. """ dataset_source = DatasetSource.RAPID def __init__(self, certs_dataset: str, hosts_dataset: str, chain_file: str, broken_chain_file: str = None): # Check dataset files if not os.path.isfile(certs_dataset): raise FileNotFoundError(certs_dataset) if not os.path.isfile(hosts_dataset): raise FileNotFoundError(hosts_dataset) # Initialize unifier log.info('Initializing unifier for dataset files (%s:%s)', certs_dataset, hosts_dataset) self._certs_dataset = certs_dataset self._hosts_dataset = hosts_dataset self._chain_file = chain_file self._broken_chain_file = broken_chain_file # Initialize dataset unification log self.__unification_log = { 'total_certs': 0, 'total_hosts': 0, 'total_host_certs': 0, 'broken_chains': 0, } @property def certs_dataset(self) -> str: """Getter property of certs dataset.""" return self._certs_dataset @property def hosts_dataset(self) -> str: """Getter property of hosts dataset.""" return self._hosts_dataset @property def chain_file(self) -> str: """Getter property of chain file.""" return self._chain_file @property def unification_log(self) -> dict: """Getter property of unification log.""" return self.__unification_log @staticmethod def parse_certs(dataset: str) -> tuple: """ Generator parsing certificates from dataset one by one. Tuple ('cert_id', 'certificate') is returned for each parsed certificated. """ log.info('Start parsing certificates from dataset: %s', dataset) with gzip.open(dataset, 'rt') as r_file: for line in r_file: yield [x.strip() for x in line.split(',')] @staticmethod def parse_chains(dataset: str) -> tuple: """ Generator parsing host certificate records and building chains from dataset one by one. Tuple ('host IP', [certificate chain]) is returned for each parsed host IP. """ log.info('Start parsing and building host chains from dataset: %s', dataset) chain = [] last = None with gzip.open(dataset, 'rt') as r_file: for line in r_file: curr, sha = [x.strip() for x in line.split(',')] if last and curr != last: yield last, chain chain.clear() # Building the chain chain.append(sha) last = curr yield last, chain # TODO this one might be generic @staticmethod def read_chains(dataset: str) -> tuple: """ Generator reading certificate chains from unified dataset one by one. Tuple ('host IP', [certificate chain]) is returned for each host. """ log.info('Start reading certificate chains from dataset: %s', dataset) with gzip.open(dataset, 'rt') as r_file: for line in r_file: read_line = line.strip().split(',') yield read_line[0], read_line[1:] def store_certs(self, certdb: CertDB) -> None: """Parses certificates from dataset and stores them into CertDB.""" for sha, cert in self.parse_certs(self._certs_dataset): certdb.insert(sha, BASE64_to_PEM(cert)) self.__unification_log['total_certs'] += 1 def store_chains(self, certdb: CertDB) -> None: """ Parses and builds certificate chains from dataset and stores them into the unified `chain_file` file. If `broken_chain_file` is provided, the chains that are not available (in the dataset nor the CertDB) are stored into this separate file. """ def write_chain(host: str, chain: list): self.__unification_log['total_hosts'] += 1 line = host + ',' + ','.join(chain) + '\n' if self._broken_chain_file: # Try to find all the certificates in DB if certdb.exists_all(chain): f_full_chains.write(line) else: self.__unification_log['broken_chains'] += 1 f_broken_chains.write(line) else: f_full_chains.write(line) if not self._broken_chain_file: self.__unification_log['broken_chains'] = -1 with ExitStack() as stack: f_full_chains = stack.enter_context(gzip.open(self._chain_file, 'wt')) if self._broken_chain_file: f_broken_chains = stack.enter_context(gzip.open(self._broken_chain_file, 'wt')) for host, chain in self.parse_chains(self._hosts_dataset): self.__unification_log['total_host_certs'] += len(chain) # Writing chain write_chain(host, chain) def save_unification_log(self, filename: str) -> None: """Save unification log to filename.""" log_str = json.dumps(self.unification_log, sort_keys=True, indent=4) log.info('Saving unification log: %s', filename) with open(filename, 'w') as outfile: outfile.write(log_str)
Class variables
var dataset_source
Static methods
def parse_certs(dataset: str) ‑> tuple
-
Generator parsing certificates from dataset one by one. Tuple ('cert_id', 'certificate') is returned for each parsed certificated.
Expand source code
@staticmethod def parse_certs(dataset: str) -> tuple: """ Generator parsing certificates from dataset one by one. Tuple ('cert_id', 'certificate') is returned for each parsed certificated. """ log.info('Start parsing certificates from dataset: %s', dataset) with gzip.open(dataset, 'rt') as r_file: for line in r_file: yield [x.strip() for x in line.split(',')]
def parse_chains(dataset: str) ‑> tuple
-
Generator parsing host certificate records and building chains from dataset one by one. Tuple ('host IP', [certificate chain]) is returned for each parsed host IP.
Expand source code
@staticmethod def parse_chains(dataset: str) -> tuple: """ Generator parsing host certificate records and building chains from dataset one by one. Tuple ('host IP', [certificate chain]) is returned for each parsed host IP. """ log.info('Start parsing and building host chains from dataset: %s', dataset) chain = [] last = None with gzip.open(dataset, 'rt') as r_file: for line in r_file: curr, sha = [x.strip() for x in line.split(',')] if last and curr != last: yield last, chain chain.clear() # Building the chain chain.append(sha) last = curr yield last, chain
def read_chains(dataset: str) ‑> tuple
-
Generator reading certificate chains from unified dataset one by one. Tuple ('host IP', [certificate chain]) is returned for each host.
Expand source code
@staticmethod def read_chains(dataset: str) -> tuple: """ Generator reading certificate chains from unified dataset one by one. Tuple ('host IP', [certificate chain]) is returned for each host. """ log.info('Start reading certificate chains from dataset: %s', dataset) with gzip.open(dataset, 'rt') as r_file: for line in r_file: read_line = line.strip().split(',') yield read_line[0], read_line[1:]
Instance variables
var certs_dataset : str
-
Getter property of certs dataset.
Expand source code
@property def certs_dataset(self) -> str: """Getter property of certs dataset.""" return self._certs_dataset
var chain_file : str
-
Getter property of chain file.
Expand source code
@property def chain_file(self) -> str: """Getter property of chain file.""" return self._chain_file
var hosts_dataset : str
-
Getter property of hosts dataset.
Expand source code
@property def hosts_dataset(self) -> str: """Getter property of hosts dataset.""" return self._hosts_dataset
var unification_log : dict
-
Getter property of unification log.
Expand source code
@property def unification_log(self) -> dict: """Getter property of unification log.""" return self.__unification_log
Methods
def save_unification_log(self, filename: str) ‑> NoneType
-
Save unification log to filename.
Expand source code
def save_unification_log(self, filename: str) -> None: """Save unification log to filename.""" log_str = json.dumps(self.unification_log, sort_keys=True, indent=4) log.info('Saving unification log: %s', filename) with open(filename, 'w') as outfile: outfile.write(log_str)
def store_certs(self, certdb: CertDB) ‑> NoneType
-
Parses certificates from dataset and stores them into CertDB.
Expand source code
def store_certs(self, certdb: CertDB) -> None: """Parses certificates from dataset and stores them into CertDB.""" for sha, cert in self.parse_certs(self._certs_dataset): certdb.insert(sha, BASE64_to_PEM(cert)) self.__unification_log['total_certs'] += 1
def store_chains(self, certdb: CertDB) ‑> NoneType
-
Parses and builds certificate chains from dataset and stores them into the unified
chain_file
file.If
broken_chain_file
is provided, the chains that are not available (in the dataset nor the CertDB) are stored into this separate file.Expand source code
def store_chains(self, certdb: CertDB) -> None: """ Parses and builds certificate chains from dataset and stores them into the unified `chain_file` file. If `broken_chain_file` is provided, the chains that are not available (in the dataset nor the CertDB) are stored into this separate file. """ def write_chain(host: str, chain: list): self.__unification_log['total_hosts'] += 1 line = host + ',' + ','.join(chain) + '\n' if self._broken_chain_file: # Try to find all the certificates in DB if certdb.exists_all(chain): f_full_chains.write(line) else: self.__unification_log['broken_chains'] += 1 f_broken_chains.write(line) else: f_full_chains.write(line) if not self._broken_chain_file: self.__unification_log['broken_chains'] = -1 with ExitStack() as stack: f_full_chains = stack.enter_context(gzip.open(self._chain_file, 'wt')) if self._broken_chain_file: f_broken_chains = stack.enter_context(gzip.open(self._broken_chain_file, 'wt')) for host, chain in self.parse_chains(self._hosts_dataset): self.__unification_log['total_host_certs'] += len(chain) # Writing chain write_chain(host, chain)