Module cevast.dataset.unifiers.rapid
This module contains implementation of RAPID dataset source unifier.
Expand source code
"""This module contains implementation of RAPID dataset source unifier."""
import os
import gzip
import logging
import json
from contextlib import ExitStack
from cevast.certdb import CertDB
from cevast.utils import BASE64_to_PEM
from ..dataset import DatasetSource
__author__ = 'Radim Podola'
log = logging.getLogger(__name__)
# TODO metoda recalculate broken chain
class RapidUnifier:
"""
A unifier class that unifies RAPID dataset.
Class parses RAPID dataset, stores the parsed certificates into CertDB
and builds certificate chains. Chains are written as unified dataset format.
"""
dataset_source = DatasetSource.RAPID
def __init__(self, certs_dataset: str, hosts_dataset: str, chain_file: str, broken_chain_file: str = None):
# Check dataset files
if not os.path.isfile(certs_dataset):
raise FileNotFoundError(certs_dataset)
if not os.path.isfile(hosts_dataset):
raise FileNotFoundError(hosts_dataset)
# Initialize unifier
log.info('Initializing unifier for dataset files (%s:%s)', certs_dataset, hosts_dataset)
self._certs_dataset = certs_dataset
self._hosts_dataset = hosts_dataset
self._chain_file = chain_file
self._broken_chain_file = broken_chain_file
# Initialize dataset unification log
self.__unification_log = {
'total_certs': 0,
'total_hosts': 0,
'total_host_certs': 0,
'broken_chains': 0,
}
@property
def certs_dataset(self) -> str:
"""Getter property of certs dataset."""
return self._certs_dataset
@property
def hosts_dataset(self) -> str:
"""Getter property of hosts dataset."""
return self._hosts_dataset
@property
def chain_file(self) -> str:
"""Getter property of chain file."""
return self._chain_file
@property
def unification_log(self) -> dict:
"""Getter property of unification log."""
return self.__unification_log
@staticmethod
def parse_certs(dataset: str) -> tuple:
"""
Generator parsing certificates from dataset one by one.
Tuple ('cert_id', 'certificate') is returned for each parsed certificated.
"""
log.info('Start parsing certificates from dataset: %s', dataset)
with gzip.open(dataset, 'rt') as r_file:
for line in r_file:
yield [x.strip() for x in line.split(',')]
@staticmethod
def parse_chains(dataset: str) -> tuple:
"""
Generator parsing host certificate records and building chains from dataset one by one.
Tuple ('host IP', [certificate chain]) is returned for each parsed host IP.
"""
log.info('Start parsing and building host chains from dataset: %s', dataset)
chain = []
last = None
with gzip.open(dataset, 'rt') as r_file:
for line in r_file:
curr, sha = [x.strip() for x in line.split(',')]
if last and curr != last:
yield last, chain
chain.clear()
# Building the chain
chain.append(sha)
last = curr
yield last, chain
# TODO this one might be generic
@staticmethod
def read_chains(dataset: str) -> tuple:
"""
Generator reading certificate chains from unified dataset one by one.
Tuple ('host IP', [certificate chain]) is returned for each host.
"""
log.info('Start reading certificate chains from dataset: %s', dataset)
with gzip.open(dataset, 'rt') as r_file:
for line in r_file:
read_line = line.strip().split(',')
yield read_line[0], read_line[1:]
def store_certs(self, certdb: CertDB) -> None:
"""Parses certificates from dataset and stores them into CertDB."""
for sha, cert in self.parse_certs(self._certs_dataset):
certdb.insert(sha, BASE64_to_PEM(cert))
self.__unification_log['total_certs'] += 1
def store_chains(self, certdb: CertDB) -> None:
"""
Parses and builds certificate chains from dataset and stores them into the unified `chain_file` file.
If `broken_chain_file` is provided, the chains that are not available (in the dataset nor the CertDB)
are stored into this separate file.
"""
def write_chain(host: str, chain: list):
self.__unification_log['total_hosts'] += 1
line = host + ',' + ','.join(chain) + '\n'
if self._broken_chain_file:
# Try to find all the certificates in DB
if certdb.exists_all(chain):
f_full_chains.write(line)
else:
self.__unification_log['broken_chains'] += 1
f_broken_chains.write(line)
else:
f_full_chains.write(line)
if not self._broken_chain_file:
self.__unification_log['broken_chains'] = -1
with ExitStack() as stack:
f_full_chains = stack.enter_context(gzip.open(self._chain_file, 'wt'))
if self._broken_chain_file:
f_broken_chains = stack.enter_context(gzip.open(self._broken_chain_file, 'wt'))
for host, chain in self.parse_chains(self._hosts_dataset):
self.__unification_log['total_host_certs'] += len(chain)
# Writing chain
write_chain(host, chain)
def save_unification_log(self, filename: str) -> None:
"""Save unification log to filename."""
log_str = json.dumps(self.unification_log, sort_keys=True, indent=4)
log.info('Saving unification log: %s', filename)
with open(filename, 'w') as outfile:
outfile.write(log_str)
Classes
class RapidUnifier (certs_dataset: str, hosts_dataset: str, chain_file: str, broken_chain_file: str = None)
-
A unifier class that unifies RAPID dataset.
Class parses RAPID dataset, stores the parsed certificates into CertDB and builds certificate chains. Chains are written as unified dataset format.
Expand source code
class RapidUnifier: """ A unifier class that unifies RAPID dataset. Class parses RAPID dataset, stores the parsed certificates into CertDB and builds certificate chains. Chains are written as unified dataset format. """ dataset_source = DatasetSource.RAPID def __init__(self, certs_dataset: str, hosts_dataset: str, chain_file: str, broken_chain_file: str = None): # Check dataset files if not os.path.isfile(certs_dataset): raise FileNotFoundError(certs_dataset) if not os.path.isfile(hosts_dataset): raise FileNotFoundError(hosts_dataset) # Initialize unifier log.info('Initializing unifier for dataset files (%s:%s)', certs_dataset, hosts_dataset) self._certs_dataset = certs_dataset self._hosts_dataset = hosts_dataset self._chain_file = chain_file self._broken_chain_file = broken_chain_file # Initialize dataset unification log self.__unification_log = { 'total_certs': 0, 'total_hosts': 0, 'total_host_certs': 0, 'broken_chains': 0, } @property def certs_dataset(self) -> str: """Getter property of certs dataset.""" return self._certs_dataset @property def hosts_dataset(self) -> str: """Getter property of hosts dataset.""" return self._hosts_dataset @property def chain_file(self) -> str: """Getter property of chain file.""" return self._chain_file @property def unification_log(self) -> dict: """Getter property of unification log.""" return self.__unification_log @staticmethod def parse_certs(dataset: str) -> tuple: """ Generator parsing certificates from dataset one by one. Tuple ('cert_id', 'certificate') is returned for each parsed certificated. """ log.info('Start parsing certificates from dataset: %s', dataset) with gzip.open(dataset, 'rt') as r_file: for line in r_file: yield [x.strip() for x in line.split(',')] @staticmethod def parse_chains(dataset: str) -> tuple: """ Generator parsing host certificate records and building chains from dataset one by one. Tuple ('host IP', [certificate chain]) is returned for each parsed host IP. """ log.info('Start parsing and building host chains from dataset: %s', dataset) chain = [] last = None with gzip.open(dataset, 'rt') as r_file: for line in r_file: curr, sha = [x.strip() for x in line.split(',')] if last and curr != last: yield last, chain chain.clear() # Building the chain chain.append(sha) last = curr yield last, chain # TODO this one might be generic @staticmethod def read_chains(dataset: str) -> tuple: """ Generator reading certificate chains from unified dataset one by one. Tuple ('host IP', [certificate chain]) is returned for each host. """ log.info('Start reading certificate chains from dataset: %s', dataset) with gzip.open(dataset, 'rt') as r_file: for line in r_file: read_line = line.strip().split(',') yield read_line[0], read_line[1:] def store_certs(self, certdb: CertDB) -> None: """Parses certificates from dataset and stores them into CertDB.""" for sha, cert in self.parse_certs(self._certs_dataset): certdb.insert(sha, BASE64_to_PEM(cert)) self.__unification_log['total_certs'] += 1 def store_chains(self, certdb: CertDB) -> None: """ Parses and builds certificate chains from dataset and stores them into the unified `chain_file` file. If `broken_chain_file` is provided, the chains that are not available (in the dataset nor the CertDB) are stored into this separate file. """ def write_chain(host: str, chain: list): self.__unification_log['total_hosts'] += 1 line = host + ',' + ','.join(chain) + '\n' if self._broken_chain_file: # Try to find all the certificates in DB if certdb.exists_all(chain): f_full_chains.write(line) else: self.__unification_log['broken_chains'] += 1 f_broken_chains.write(line) else: f_full_chains.write(line) if not self._broken_chain_file: self.__unification_log['broken_chains'] = -1 with ExitStack() as stack: f_full_chains = stack.enter_context(gzip.open(self._chain_file, 'wt')) if self._broken_chain_file: f_broken_chains = stack.enter_context(gzip.open(self._broken_chain_file, 'wt')) for host, chain in self.parse_chains(self._hosts_dataset): self.__unification_log['total_host_certs'] += len(chain) # Writing chain write_chain(host, chain) def save_unification_log(self, filename: str) -> None: """Save unification log to filename.""" log_str = json.dumps(self.unification_log, sort_keys=True, indent=4) log.info('Saving unification log: %s', filename) with open(filename, 'w') as outfile: outfile.write(log_str)
Class variables
var dataset_source
Static methods
def parse_certs(dataset: str) ‑> tuple
-
Generator parsing certificates from dataset one by one. Tuple ('cert_id', 'certificate') is returned for each parsed certificated.
Expand source code
@staticmethod def parse_certs(dataset: str) -> tuple: """ Generator parsing certificates from dataset one by one. Tuple ('cert_id', 'certificate') is returned for each parsed certificated. """ log.info('Start parsing certificates from dataset: %s', dataset) with gzip.open(dataset, 'rt') as r_file: for line in r_file: yield [x.strip() for x in line.split(',')]
def parse_chains(dataset: str) ‑> tuple
-
Generator parsing host certificate records and building chains from dataset one by one. Tuple ('host IP', [certificate chain]) is returned for each parsed host IP.
Expand source code
@staticmethod def parse_chains(dataset: str) -> tuple: """ Generator parsing host certificate records and building chains from dataset one by one. Tuple ('host IP', [certificate chain]) is returned for each parsed host IP. """ log.info('Start parsing and building host chains from dataset: %s', dataset) chain = [] last = None with gzip.open(dataset, 'rt') as r_file: for line in r_file: curr, sha = [x.strip() for x in line.split(',')] if last and curr != last: yield last, chain chain.clear() # Building the chain chain.append(sha) last = curr yield last, chain
def read_chains(dataset: str) ‑> tuple
-
Generator reading certificate chains from unified dataset one by one. Tuple ('host IP', [certificate chain]) is returned for each host.
Expand source code
@staticmethod def read_chains(dataset: str) -> tuple: """ Generator reading certificate chains from unified dataset one by one. Tuple ('host IP', [certificate chain]) is returned for each host. """ log.info('Start reading certificate chains from dataset: %s', dataset) with gzip.open(dataset, 'rt') as r_file: for line in r_file: read_line = line.strip().split(',') yield read_line[0], read_line[1:]
Instance variables
var certs_dataset : str
-
Getter property of certs dataset.
Expand source code
@property def certs_dataset(self) -> str: """Getter property of certs dataset.""" return self._certs_dataset
var chain_file : str
-
Getter property of chain file.
Expand source code
@property def chain_file(self) -> str: """Getter property of chain file.""" return self._chain_file
var hosts_dataset : str
-
Getter property of hosts dataset.
Expand source code
@property def hosts_dataset(self) -> str: """Getter property of hosts dataset.""" return self._hosts_dataset
var unification_log : dict
-
Getter property of unification log.
Expand source code
@property def unification_log(self) -> dict: """Getter property of unification log.""" return self.__unification_log
Methods
def save_unification_log(self, filename: str) ‑> NoneType
-
Save unification log to filename.
Expand source code
def save_unification_log(self, filename: str) -> None: """Save unification log to filename.""" log_str = json.dumps(self.unification_log, sort_keys=True, indent=4) log.info('Saving unification log: %s', filename) with open(filename, 'w') as outfile: outfile.write(log_str)
def store_certs(self, certdb: CertDB) ‑> NoneType
-
Parses certificates from dataset and stores them into CertDB.
Expand source code
def store_certs(self, certdb: CertDB) -> None: """Parses certificates from dataset and stores them into CertDB.""" for sha, cert in self.parse_certs(self._certs_dataset): certdb.insert(sha, BASE64_to_PEM(cert)) self.__unification_log['total_certs'] += 1
def store_chains(self, certdb: CertDB) ‑> NoneType
-
Parses and builds certificate chains from dataset and stores them into the unified
chain_file
file.If
broken_chain_file
is provided, the chains that are not available (in the dataset nor the CertDB) are stored into this separate file.Expand source code
def store_chains(self, certdb: CertDB) -> None: """ Parses and builds certificate chains from dataset and stores them into the unified `chain_file` file. If `broken_chain_file` is provided, the chains that are not available (in the dataset nor the CertDB) are stored into this separate file. """ def write_chain(host: str, chain: list): self.__unification_log['total_hosts'] += 1 line = host + ',' + ','.join(chain) + '\n' if self._broken_chain_file: # Try to find all the certificates in DB if certdb.exists_all(chain): f_full_chains.write(line) else: self.__unification_log['broken_chains'] += 1 f_broken_chains.write(line) else: f_full_chains.write(line) if not self._broken_chain_file: self.__unification_log['broken_chains'] = -1 with ExitStack() as stack: f_full_chains = stack.enter_context(gzip.open(self._chain_file, 'wt')) if self._broken_chain_file: f_broken_chains = stack.enter_context(gzip.open(self._broken_chain_file, 'wt')) for host, chain in self.parse_chains(self._hosts_dataset): self.__unification_log['total_host_certs'] += len(chain) # Writing chain write_chain(host, chain)