Module cevast.certdb

CertDB is a database managing X.509 certificates.

Expand source code
"""CertDB is a database managing X.509 certificates."""

__all__ = (
    'CertDB',
    'CertDBReadOnly',
    'CertFileDB',
    'CertFileDBReadOnly',
    'CertNotAvailableError',
    'CertInvalidError',
    'CompositeCertDB',
    'CompositeCertDBReadOnly',
)
__version__ = '1.1'
__author__ = 'Radim Podola'

from .cert_db import CertNotAvailableError, CertInvalidError, CertDB, CertDBReadOnly
from .cert_file_db import CertFileDBReadOnly, CertFileDB
from .composite_cert_db import CompositeCertDB, CompositeCertDBReadOnly

Sub-modules

cevast.certdb.cert_db

This module provides interface of CertDB class …

cevast.certdb.cert_file_db

This module contains implementation of CertFileDB …

cevast.certdb.cli

Group of CLI commands used for CertDB management and access …

cevast.certdb.composite_cert_db

This module contains implementation of CompositeCertDB …

Classes

class CertDB

Abstract class representing CertDB interface.

Expand source code
class CertDB(CertDBReadOnly):
    """Abstract class representing CertDB interface."""

    @abstractmethod
    def insert(self, cert_id: str, cert: str) -> None:
        """
        Insert the certificate to the database under `cert_id` identifier.

        Inserted certificate is not persisted immediatelly but
        remains in current open transaction untill commit or rollback.
        A expected format of certificate is PEM.
        """

    @abstractmethod
    def delete(self, cert_id: str) -> None:
        """
        Delete the certificate from the database.

        Persisted certificate is not immediatelly deleted but
        remains untill commit or rollback. Certificate inserted
        in the current transaction is deleted immediatelly.
        """

    @abstractmethod
    def rollback(self) -> None:
        """
        Revert the changes made by the current transaction.

        All inserted certificates waiting to persist are removed.
        All deleted certificates in the current transaction stay untouched.
        """

    @abstractmethod
    def commit(self) -> Tuple[int, int]:
        """
        Apply the changes made by the current transaction.

        All inserted certificates waiting to persist are persisted.
        All deleted certificates in the current transaction are permanently removed.
        Return tuple of numbers (number of inserted; number of deleted)
        """

Ancestors

Subclasses

Methods

def commit(self) ‑> Tuple[int, int]

Apply the changes made by the current transaction.

All inserted certificates waiting to persist are persisted. All deleted certificates in the current transaction are permanently removed. Return tuple of numbers (number of inserted; number of deleted)

Expand source code
@abstractmethod
def commit(self) -> Tuple[int, int]:
    """
    Apply the changes made by the current transaction.

    All inserted certificates waiting to persist are persisted.
    All deleted certificates in the current transaction are permanently removed.
    Return tuple of numbers (number of inserted; number of deleted)
    """
def delete(self, cert_id: str) ‑> NoneType

Delete the certificate from the database.

Persisted certificate is not immediatelly deleted but remains untill commit or rollback. Certificate inserted in the current transaction is deleted immediatelly.

Expand source code
@abstractmethod
def delete(self, cert_id: str) -> None:
    """
    Delete the certificate from the database.

    Persisted certificate is not immediatelly deleted but
    remains untill commit or rollback. Certificate inserted
    in the current transaction is deleted immediatelly.
    """
def insert(self, cert_id: str, cert: str) ‑> NoneType

Insert the certificate to the database under cert_id identifier.

Inserted certificate is not persisted immediatelly but remains in current open transaction untill commit or rollback. A expected format of certificate is PEM.

Expand source code
@abstractmethod
def insert(self, cert_id: str, cert: str) -> None:
    """
    Insert the certificate to the database under `cert_id` identifier.

    Inserted certificate is not persisted immediatelly but
    remains in current open transaction untill commit or rollback.
    A expected format of certificate is PEM.
    """
def rollback(self) ‑> NoneType

Revert the changes made by the current transaction.

All inserted certificates waiting to persist are removed. All deleted certificates in the current transaction stay untouched.

Expand source code
@abstractmethod
def rollback(self) -> None:
    """
    Revert the changes made by the current transaction.

    All inserted certificates waiting to persist are removed.
    All deleted certificates in the current transaction stay untouched.
    """

Inherited members

class CertDBReadOnly

Abstract class representing read-only CertDB interface.

Expand source code
class CertDBReadOnly(ABC):
    """Abstract class representing read-only CertDB interface."""

    @abstractmethod
    def get(self, cert_id: str) -> str:
        """
        Retrieve a certificate from the database.

        `cert_id` is the certificate identifier.
        Certificate is returned in a PEM format.
        Raise CertNotAvailableError if the certificate is not found.
        """

    @abstractmethod
    def export(self, cert_id: str, target_dir: str, copy_if_exists: bool = True) -> str:
        """
        Export a certificate from the database and saves it as a PEM file.

        `cert_id` is the certificate identifier,
        `target_dir` is the target directory.
        If `copy_if_exists` is false and file already exists (e.g. temporary in open transaction),
        the file is not copied to the target directory, instead the existing file path is returned.

        Full path of the certificate file is returned.
        Raise CertNotAvailableError if the certificate is not found.
        """

    @abstractmethod
    def exists(self, cert_id: str) -> bool:
        """
        Test whether a certificate exists in the database.

        `cert_id` is the certificate cert_identifier.
        """

    @abstractmethod
    def exists_all(self, cert_ids: list) -> bool:
        """
        Test that all certificates exist in the database.

        `cert_ids` is a list of certificate identifiers.
        """

Ancestors

  • abc.ABC

Subclasses

Methods

def exists(self, cert_id: str) ‑> bool

Test whether a certificate exists in the database.

cert_id is the certificate cert_identifier.

Expand source code
@abstractmethod
def exists(self, cert_id: str) -> bool:
    """
    Test whether a certificate exists in the database.

    `cert_id` is the certificate cert_identifier.
    """
def exists_all(self, cert_ids: list) ‑> bool

Test that all certificates exist in the database.

cert_ids is a list of certificate identifiers.

Expand source code
@abstractmethod
def exists_all(self, cert_ids: list) -> bool:
    """
    Test that all certificates exist in the database.

    `cert_ids` is a list of certificate identifiers.
    """
def export(self, cert_id: str, target_dir: str, copy_if_exists: bool = True) ‑> str

Export a certificate from the database and saves it as a PEM file.

cert_id is the certificate identifier, target_dir is the target directory. If copy_if_exists is false and file already exists (e.g. temporary in open transaction), the file is not copied to the target directory, instead the existing file path is returned.

Full path of the certificate file is returned. Raise CertNotAvailableError if the certificate is not found.

Expand source code
@abstractmethod
def export(self, cert_id: str, target_dir: str, copy_if_exists: bool = True) -> str:
    """
    Export a certificate from the database and saves it as a PEM file.

    `cert_id` is the certificate identifier,
    `target_dir` is the target directory.
    If `copy_if_exists` is false and file already exists (e.g. temporary in open transaction),
    the file is not copied to the target directory, instead the existing file path is returned.

    Full path of the certificate file is returned.
    Raise CertNotAvailableError if the certificate is not found.
    """
def get(self, cert_id: str) ‑> str

Retrieve a certificate from the database.

cert_id is the certificate identifier. Certificate is returned in a PEM format. Raise CertNotAvailableError if the certificate is not found.

Expand source code
@abstractmethod
def get(self, cert_id: str) -> str:
    """
    Retrieve a certificate from the database.

    `cert_id` is the certificate identifier.
    Certificate is returned in a PEM format.
    Raise CertNotAvailableError if the certificate is not found.
    """
class CertFileDB (storage: str, cpu_cores: int = 1)

CertDB interface implementation which uses files and a file system properties as a storage mechanism.

storage is path to the database storage. cpu_cores is max number of CPU cores that might be used.

Expand source code
class CertFileDB(CertDB, CertFileDBReadOnly):
    """
    CertDB interface implementation which uses files
    and a file system properties as a storage mechanism.

    `storage` is path to the database storage.
    `cpu_cores` is max number of CPU cores that might be used.
    """

    def __init__(self, storage: str, cpu_cores: int = 1):
        CertFileDBReadOnly.__init__(self, storage)
        # Dict containing all inserted certificates grouped in blocks that will be persisted with commit
        self._to_insert: dict = {}
        # Dict containing all deleted certificates grouped in blocks that will be deleted with commit
        self._to_delete: dict = {}
        # Max number of CPU cores that can be used (-1 is max limit by hardware)
        self.__cpu_cores = int(cpu_cores)
        log.info('Will use %d CPUs', self.__cpu_cores)

    def get(self, cert_id: str) -> str:
        # Check if certificate exists as a file (transaction still open)
        if self._is_in_transaction(cert_id, self._to_insert):
            cert_file = self._get_block_path(cert_id) + cert_id
            with open(cert_file, 'r') as source:
                log.debug('<%s> found in open transaction', cert_file)
                return source.read()
        # Check if certificate is scheduled for delete
        if self._is_in_transaction(cert_id, self._to_delete):
            log.info('<%s> was deleted in current transaction', cert_id)
            raise CertNotAvailableError(cert_id)
        # Check if certificate exists persisted
        return CertFileDBReadOnly.get(self, cert_id)

    def export(self, cert_id: str, target_dir: str, copy_if_exists: bool = True) -> str:
        # Check if certificate exists as a file (transaction still open)
        if self._is_in_transaction(cert_id, self._to_insert):
            cert_src_file = self._get_block_path(cert_id) + cert_id
            log.debug('<%s> found in open transaction', cert_src_file)
            if not copy_if_exists:
                return cert_src_file
            # Copy file to the target directory
            cert_trg_file = os.path.join(target_dir, make_PEM_filename(cert_id))
            shutil.copyfile(cert_src_file, cert_trg_file)
            return cert_trg_file
        # Check if certificate is scheduled for delete
        if self._is_in_transaction(cert_id, self._to_delete):
            log.info('<%s> was deleted in current transaction', cert_id)
            raise CertNotAvailableError(cert_id)
        # Check if certificate exists persisted
        return CertFileDBReadOnly.export(self, cert_id, target_dir, copy_if_exists)

    def exists(self, cert_id: str) -> bool:
        # Check the open transaction first
        if self._is_in_transaction(cert_id, self._to_insert):
            log.debug('<%s> exists in open transaction', cert_id)
            return True
        # Check if certificate is scheduled for delete
        if self._is_in_transaction(cert_id, self._to_delete):
            log.info('<%s> was deleted in current transaction', cert_id)
            return False
        # Check if certificate exists persisted
        return CertFileDBReadOnly.exists(self, cert_id) if cert_id in self._certs_in_db else False

    def insert(self, cert_id: str, cert: str) -> None:
        if not cert_id or not cert:
            raise CertInvalidError('cert_id <{}> or cert <{}> invalid'.format(cert_id, cert))
        # Save certificate to temporary file
        block = self._get_block_path(cert_id)
        cert_file = block + cert_id
        if os.path.exists(cert_file):
            log.info('Certificate %s already exists', cert_file)
        else:
            try:
                with open(cert_file, 'w') as w_file:
                    w_file.write(cert)
            except FileNotFoundError:
                os.makedirs(block, exist_ok=True)
                with open(cert_file, 'w') as w_file:
                    w_file.write(cert)
        # Add certificate to transaction for insert upon commit
        self._add_to_transaction(cert_id, self._to_insert)
        log.debug('Certificate %s inserted to block %s', cert_id, block)

    def delete(self, cert_id: str) -> None:
        if not cert_id:
            raise CertInvalidError('cert_id <{}> invalid'.format(cert_id))

        if self._is_in_transaction(cert_id, self._to_insert):
            # Immediatelly delete certificate in open transaction if exists
            cert_file = self._get_block_path(cert_id) + cert_id
            self._remove_from_transaction(cert_id, self._to_insert)
            os.remove(cert_file)
            log.debug('Certificate %s deleted from open transaction', cert_id)
        else:
            # Add certificate to transaction for delete upon commit
            self._add_to_transaction(cert_id, self._to_delete)
            log.debug('Certificate %s will be deleted upon commit', cert_id)

        # Delete certificates from cache
        self._cache.discard(cert_id)

    def rollback(self) -> None:
        log.info('Rollback started')
        # Remove uncommitted certificates
        for block, certs in self._to_insert.items():
            block_path = self._get_block_path(block)
            for cert_id in certs:
                os.remove(block_path + cert_id)
        self._to_insert.clear()
        self._to_delete.clear()
        # Clean up empty folders
        remove_empty_folders(self.storage)
        log.info('Rollback finished')

    def commit(self) -> Tuple[int, int]:
        log.info('Commit started')
        cnt_deleted = 0
        cnt_inserted = 0

        if self.__cpu_cores != 1:
            cnt_inserted, cnt_deleted = self.__commit_async()
        else:
            # Handle delete first because sequence matter
            for block, certs in self._to_delete.items():
                cnt_deleted += CertFileDB.delete_certs(self._get_block_archive(block), certs)
            # Now handle insert
            for block, certs in self._to_insert.items():
                cnt_inserted += CertFileDB.persist_certs(self._get_block_path(block), self._get_block_archive(block), certs)

        self._update_index()
        self._write_index()

        self._to_delete.clear()
        self._to_insert.clear()
        log.info('Deleted %d certificates', cnt_deleted)
        log.info('Inserted %d certificates', cnt_inserted)
        # Clean up empty folders
        remove_empty_folders(self.storage)  # TODO seems not working properly in benchmark
        # Write commit info
        if self._params['maintain_info']:
            self.__write_commit_info(cnt_inserted, cnt_deleted)
        log.info('Commit finished')
        return cnt_inserted, cnt_deleted

    def __commit_async(self) -> Tuple[int, int]:
        """
        Function acomplishing the same as commit() but with use of multiprocessing.Pool
        of asynchronous workers to persist/delete multiple certificate blocks in parallel.
        """
        cnt_deleted = 0
        cnt_inserted = 0
        cpus = self.__cpu_cores if self.__cpu_cores > 0 else None

        pool = mp.Pool(cpus)
        # Handle delete first because sequence matter
        results = []
        for block, certs in self._to_delete.items():
            results.append(pool.apply_async(CertFileDB.delete_certs, args=(self._get_block_archive(block), certs)))
        cnt_deleted = sum([result.get() for result in results])
        # Now handle insert
        results = []
        for block, certs in self._to_insert.items():
            results.append(
                pool.apply_async(
                    CertFileDB.persist_certs, args=(self._get_block_path(block), self._get_block_archive(block), certs)
                )
            )
        cnt_inserted = sum([result.get() for result in results])
        pool.close()
        pool.join()

        return cnt_inserted, cnt_deleted

    # static so I can use it in async pool or find a way hot to use private
    @staticmethod
    def delete_certs(block_archive: str, certs: set) -> int:
        """Delete persisted certificates from block archive"""
        cnt_deleted = 0
        if certs and os.path.exists(block_archive):
            deleted_all = True
            new_block_archive = block_archive + '_new'
            with ZipFile(block_archive, 'r', ZIP_DEFLATED) as zin,\
                 ZipFile(new_block_archive, 'w', ZIP_DEFLATED) as zout:
                for name in zin.namelist():
                    if os.path.splitext(name)[0] not in certs:
                        zout.writestr(name, zin.read(name))
                        deleted_all = False
                    else:
                        cnt_deleted += 1
            # Remove the original zipfile and replace it with new one
            os.remove(block_archive)
            if deleted_all:
                # Delete the empty zipfile
                os.remove(new_block_archive)
            else:
                os.rename(new_block_archive, block_archive)

        log.debug('Deleted %d certificates from block %s', cnt_deleted, block_archive)
        return cnt_deleted

    # static so I can use it in async pool
    @staticmethod
    def persist_certs(block_path: str, block_archive: str, certs: set) -> int:
        """Persist certificates to block archive"""
        if not certs:
            log.debug('Nothing to insert from block %s', block_path)
            return 0
        cnt_inserted = 0
        if os.path.exists(block_archive):
            append = True
            log.debug('Appending to archive: %s', block_archive)
        else:
            append = False
            log.debug('Creating archive: %s', block_archive)

        # TODO compare performance for higher compresslevel
        with ZipFile(block_archive, "a" if append else "w", ZIP_DEFLATED) as zout:
            if append:
                persisted_certs = zout.namelist()

            for cert in certs:
                cert_file = block_path + cert
                if append and cert in persisted_certs:
                    pass  # do not insert duplicates
                else:
                    zout.write(cert_file, cert)
                    cnt_inserted += 1
                os.remove(cert_file)

        log.debug('Persisted %d certificates from block %s', cnt_inserted, block_path)
        return cnt_inserted

    def _is_in_transaction(self, cert_id: str, trans_dict: dict) -> bool:
        return cert_id in trans_dict.get(self._get_block_id(cert_id), {})

    def _add_to_transaction(self, cert_id: str, trans_dict: dict) -> None:
        block_id = self._get_block_id(cert_id)
        try:
            trans_dict[block_id].add(cert_id)
        except KeyError:
            trans_dict[block_id] = set()
            trans_dict[block_id].add(cert_id)

    def _remove_from_transaction(self, cert_id: str, trans_dict: dict) -> None:
        block_id = self._get_block_id(cert_id)
        try:
            trans_dict[block_id].discard(cert_id)
            if not trans_dict[block_id]:
                del trans_dict[block_id]
        except KeyError:
            pass

    def __write_commit_info(self, inserted: int, deleted: int) -> None:
        meta_path = os.path.join(self.storage, self.META_FILENAME)
        if not os.path.exists(meta_path):
            # Create META file
            meta = OrderedDict()
            meta['INFO'] = OrderedDict()
            meta['INFO']['owner'] = ""
            meta['INFO']['description'] = ""
            meta['INFO']['created'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S%Z')
        else:
            meta = toml.load(meta_path, OrderedDict)
        # Update DB INFO
        total_cnt = meta['INFO'].get('number_of_certificates', 0)
        meta['INFO']['number_of_certificates'] = total_cnt + inserted - deleted
        meta['INFO']['last_commit'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S%Z')
        # Append commit HISTORY
        if 'HISTORY' not in meta:
            meta['HISTORY'] = OrderedDict()
        commit_nr = str(len(meta['HISTORY']) + 1)
        meta['HISTORY'][commit_nr] = OrderedDict()
        meta['HISTORY'][commit_nr]['date'] = meta['INFO']['last_commit']
        meta['HISTORY'][commit_nr]['inserted'] = inserted
        meta['HISTORY'][commit_nr]['deleted'] = deleted

        log.debug(meta)
        with open(meta_path, 'w') as meta_file:
            toml.dump(meta, meta_file)

    def _update_index(self):
        log.info('Updating DB index')

        for certs in self._to_delete.values():
            self._certs_in_db = self._certs_in_db.difference(certs)

        self._certs_in_db.update(*(self._to_insert.values()))

        log.info('DB index now contains {0} hashes'.format(len(self._certs_in_db)))

    def _write_index(self):
        log.info('Writing DB index')

        with open(self._index_path, 'wt') as output_file:
            for certificate_hash in self._certs_in_db:
                output_file.write('{0}\n'.format(certificate_hash))

        log.info('DB index is written')

Ancestors

Static methods

def delete_certs(block_archive: str, certs: set) ‑> int

Delete persisted certificates from block archive

Expand source code
@staticmethod
def delete_certs(block_archive: str, certs: set) -> int:
    """Delete persisted certificates from block archive"""
    cnt_deleted = 0
    if certs and os.path.exists(block_archive):
        deleted_all = True
        new_block_archive = block_archive + '_new'
        with ZipFile(block_archive, 'r', ZIP_DEFLATED) as zin,\
             ZipFile(new_block_archive, 'w', ZIP_DEFLATED) as zout:
            for name in zin.namelist():
                if os.path.splitext(name)[0] not in certs:
                    zout.writestr(name, zin.read(name))
                    deleted_all = False
                else:
                    cnt_deleted += 1
        # Remove the original zipfile and replace it with new one
        os.remove(block_archive)
        if deleted_all:
            # Delete the empty zipfile
            os.remove(new_block_archive)
        else:
            os.rename(new_block_archive, block_archive)

    log.debug('Deleted %d certificates from block %s', cnt_deleted, block_archive)
    return cnt_deleted
def persist_certs(block_path: str, block_archive: str, certs: set) ‑> int

Persist certificates to block archive

Expand source code
@staticmethod
def persist_certs(block_path: str, block_archive: str, certs: set) -> int:
    """Persist certificates to block archive"""
    if not certs:
        log.debug('Nothing to insert from block %s', block_path)
        return 0
    cnt_inserted = 0
    if os.path.exists(block_archive):
        append = True
        log.debug('Appending to archive: %s', block_archive)
    else:
        append = False
        log.debug('Creating archive: %s', block_archive)

    # TODO compare performance for higher compresslevel
    with ZipFile(block_archive, "a" if append else "w", ZIP_DEFLATED) as zout:
        if append:
            persisted_certs = zout.namelist()

        for cert in certs:
            cert_file = block_path + cert
            if append and cert in persisted_certs:
                pass  # do not insert duplicates
            else:
                zout.write(cert_file, cert)
                cnt_inserted += 1
            os.remove(cert_file)

    log.debug('Persisted %d certificates from block %s', cnt_inserted, block_path)
    return cnt_inserted

Inherited members

class CertFileDBReadOnly (storage: str)

CertDBReadOnly interface implementation which uses files and a file system properties as a storage mechanism.

storage is path to the database storage.

Expand source code
class CertFileDBReadOnly(CertDBReadOnly):
    """
    CertDBReadOnly interface implementation which uses files
    and a file system properties as a storage mechanism.

    `storage` is path to the database storage.
    """

    CONF_FILENAME = 'CertFileDB.toml'
    META_FILENAME = '.CertFileDB-META.toml'
    INDEX_FILENAME = 'CertFileDBIndex'

    @staticmethod
    def setup(storage_path: str, structure_level: int = 3, cert_format: str = 'PEM',
              desc: str = 'CertFileDB', owner: str = '', maintain_info: bool = True) -> None:
        """
        Setup CertFileDB storage directory with the given parameters.
        `storage_path` is path to the database root directory,
        `structure_level` is hierarchy level of certificate blocks,
        `cert_format` is used format of stored certificates,
        `desc` is database description,
        `owner` is database owner,
        `maintain_info` is flag whether to maintain META file or not.

        Directory, configuration and META file is created.
        Raise ValueError for wrong parameters or if DB already exists.
        """
        storage_path = os.path.abspath(storage_path)
        config_path = os.path.join(storage_path, CertFileDB.CONF_FILENAME)
        meta_path = os.path.join(storage_path, CertFileDB.META_FILENAME)
        if os.path.exists(config_path):
            raise ValueError('CertFileDB already exists')
        if not isinstance(structure_level, int):
            raise ValueError('structure_level must be an integer')
        os.makedirs(storage_path, exist_ok=True)
        # Create configuration file
        config = OrderedDict()
        config['PARAMETERS'] = OrderedDict()
        config['PARAMETERS']['storage'] = storage_path
        config['PARAMETERS']['structure_level'] = structure_level
        config['PARAMETERS']['cert_format'] = cert_format
        config['PARAMETERS']['compression_method'] = 'ZIP_DEFLATED'
        config['PARAMETERS']['maintain_info'] = maintain_info
        with open(config_path, 'w') as cfg_file:
            toml.dump(config, cfg_file)
        # Create META file
        meta = OrderedDict()
        meta['INFO'] = OrderedDict()
        meta['INFO']['owner'] = owner
        meta['INFO']['description'] = desc
        meta['INFO']['created'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S%Z')
        with open(meta_path, 'w') as meta_file:
            toml.dump(meta, meta_file)

        log.info('CertFileDB was setup:\n%s', config)

    def __init__(self, storage: str):
        # Get config
        try:
            config_path = os.path.join(os.path.abspath(storage), self.CONF_FILENAME)
            self.storage = os.path.abspath(storage)
            config = toml.load(config_path)
            self._params = config['PARAMETERS']
            log.info('Found CertFileDB <%s>:\n%s', config_path, config)
        except FileNotFoundError:
            raise ValueError('CertFileDB <{}> does not exists -> call CertFileDB.setup() first'.format(config_path))
        # Init DB instance
        log.info('Initializing %s transaction...', self.__class__.__name__)
        # hashes of certificates stored in DB
        self._certs_in_db: set = set()
        # Set maintaining all known certificate IDs for better EXISTS performance
        self._cache: set = set()
        # Pre-compute index used for block_id
        self._block_id_index = self._params['structure_level'] + 1
        # Redefine _get_block_id method for special case with structure_level = 0
        if self._params['structure_level'] == 0:
            fixed_block_id = os.path.basename(self.storage)
            self._get_block_id = lambda _: fixed_block_id

        self._index_path = os.path.join(os.path.abspath(self.storage), self.INDEX_FILENAME)
        self._load_index()

    def get(self, cert_id: str) -> str:
        # Check if certificate exists
        try:
            zip_file = self._get_block_archive(cert_id)
            with ZipFile(zip_file, 'r', ZIP_DEFLATED) as z_obj:
                with z_obj.open(cert_id) as cert:
                    log.debug('<%s> found persisted in zip <%s>', cert_id, zip_file)
                    return cert.read().decode('utf-8')
        except (KeyError, FileNotFoundError):
            pass

        log.info('<%s> not found', cert_id)
        if cert_id in self._cache:
            self._cache.clear()  # Cache seems to be invalidated, so clear it
        raise CertNotAvailableError(cert_id)

    def export(self, cert_id: str, target_dir: str, copy_if_exists: bool = True) -> str:
        # Check if certificate exists persisted
        try:
            zip_file = self._get_block_archive(cert_id)
            with ZipFile(zip_file, 'r', ZIP_DEFLATED) as z_obj:
                zipinfo = z_obj.getinfo(cert_id)
                zipinfo.filename = make_PEM_filename(cert_id)
                z_obj.extract(zipinfo, target_dir)
                log.debug('<%s> found persisted in zip <%s>', cert_id, zip_file)
                return os.path.join(target_dir, make_PEM_filename(cert_id))
        except (KeyError, FileNotFoundError):
            pass

        log.info('<%s> not found', cert_id)
        if cert_id in self._cache:
            self._cache.clear()  # Cache seems to be invalidated, so clear it
        raise CertNotAvailableError(cert_id)

    def exists(self, cert_id: str) -> bool:
        # Check cache first
        if cert_id in self._cache:
            log.debug('<%s> found in cache', cert_id)
            return True
        # Check if certificate exists persisted
        try:
            zip_file = self._get_block_archive(cert_id)
            with ZipFile(zip_file, 'r', ZIP_DEFLATED) as z_obj:
                z_obj.getinfo(cert_id)
                log.debug('<%s> exists persisted <%s>', cert_id, zip_file)
                self._cache.add(cert_id)
                return True
        except (KeyError, FileNotFoundError):
            pass

        log.debug('<%s> does not exist', cert_id)
        return False

    def exists_all(self, cert_ids: list) -> bool:
        for cert_id in cert_ids:
            if not self.exists(cert_id):
                return False

        return True

    def _get_block_path(self, cert_or_block_id: str) -> str:
        """Return full block path of certificate or block id"""
        paths = [cert_or_block_id[: 2 + i] for i in range(self._params['structure_level'])]
        return "/".join([self._params['storage']] + paths) + '/'

    def _get_block_id(self, cert_id: str) -> str:  # pylint: disable=E0202
        return cert_id[: self._block_id_index]

    def _get_block_archive(self, cert_or_block_id: str) -> str:
        block_path = self._get_block_path(cert_or_block_id)
        return block_path + self._get_block_id(cert_or_block_id) + '.zip'

    def _load_index(self):
        if os.path.isfile(self._index_path):
            log.info('Loading DB index')

            with open(self._index_path, 'rt') as input_file:
                for line in input_file:
                    self._certs_in_db.add(line.strip())

            log.info('Loaded {0} hashes'.format(len(self._certs_in_db)))
        else:
            log.info('DB index does not exist yet')

Ancestors

Subclasses

Class variables

var CONF_FILENAME
var INDEX_FILENAME
var META_FILENAME

Static methods

def setup(storage_path: str, structure_level: int = 3, cert_format: str = 'PEM', desc: str = 'CertFileDB', owner: str = '', maintain_info: bool = True) ‑> NoneType

Setup CertFileDB storage directory with the given parameters. storage_path is path to the database root directory, structure_level is hierarchy level of certificate blocks, cert_format is used format of stored certificates, desc is database description, owner is database owner, maintain_info is flag whether to maintain META file or not.

Directory, configuration and META file is created. Raise ValueError for wrong parameters or if DB already exists.

Expand source code
@staticmethod
def setup(storage_path: str, structure_level: int = 3, cert_format: str = 'PEM',
          desc: str = 'CertFileDB', owner: str = '', maintain_info: bool = True) -> None:
    """
    Setup CertFileDB storage directory with the given parameters.
    `storage_path` is path to the database root directory,
    `structure_level` is hierarchy level of certificate blocks,
    `cert_format` is used format of stored certificates,
    `desc` is database description,
    `owner` is database owner,
    `maintain_info` is flag whether to maintain META file or not.

    Directory, configuration and META file is created.
    Raise ValueError for wrong parameters or if DB already exists.
    """
    storage_path = os.path.abspath(storage_path)
    config_path = os.path.join(storage_path, CertFileDB.CONF_FILENAME)
    meta_path = os.path.join(storage_path, CertFileDB.META_FILENAME)
    if os.path.exists(config_path):
        raise ValueError('CertFileDB already exists')
    if not isinstance(structure_level, int):
        raise ValueError('structure_level must be an integer')
    os.makedirs(storage_path, exist_ok=True)
    # Create configuration file
    config = OrderedDict()
    config['PARAMETERS'] = OrderedDict()
    config['PARAMETERS']['storage'] = storage_path
    config['PARAMETERS']['structure_level'] = structure_level
    config['PARAMETERS']['cert_format'] = cert_format
    config['PARAMETERS']['compression_method'] = 'ZIP_DEFLATED'
    config['PARAMETERS']['maintain_info'] = maintain_info
    with open(config_path, 'w') as cfg_file:
        toml.dump(config, cfg_file)
    # Create META file
    meta = OrderedDict()
    meta['INFO'] = OrderedDict()
    meta['INFO']['owner'] = owner
    meta['INFO']['description'] = desc
    meta['INFO']['created'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S%Z')
    with open(meta_path, 'w') as meta_file:
        toml.dump(meta, meta_file)

    log.info('CertFileDB was setup:\n%s', config)

Inherited members

class CertInvalidError (*args, **kwargs)

Raised when the certificate has an invalid identifier or a structure

Expand source code
class CertInvalidError(ValueError):
    """Raised when the certificate has an invalid identifier or a structure"""

Ancestors

  • builtins.ValueError
  • builtins.Exception
  • builtins.BaseException
class CertNotAvailableError (*args, **kwargs)

Raised when the certificate is not available in database

Expand source code
class CertNotAvailableError(Exception):
    """Raised when the certificate is not available in database"""

Ancestors

  • builtins.Exception
  • builtins.BaseException
class CompositeCertDB

Composite manager of CertDB components.

Expand source code
class CompositeCertDB(CertDB, CompositeCertDBReadOnly):
    """
    Composite manager of CertDB components.
    """

    def __init__(self):  # pylint: disable=W0231
        log.info('Initializing CompositeCertDB composite manager...')
        # List managing all registered CertDB components
        self._children: List(Union[CertDB, CertDBReadOnly]) = []
        # List managing all registered CertDBReadOnly components
        self.__io_allowed: List(CertDB) = []

    def register(self, certdb: Union[CertDB, CertDBReadOnly]) -> None:
        if certdb not in self._children:
            self._children.append(certdb)
            if isinstance(certdb, CertDB):
                self.__io_allowed.append(certdb)

    def unregister(self, certdb: Union[CertDB, CertDBReadOnly]) -> None:
        try:
            self._children.remove(certdb)
        except ValueError:
            pass
        try:
            self.__io_allowed.remove(certdb)
        except ValueError:
            pass

    def insert(self, cert_id: str, cert: str) -> None:
        for child in self.__io_allowed:
            child.insert(cert_id, cert)

    def delete(self, cert_id: str) -> None:
        for child in self.__io_allowed:
            child.delete(cert_id)

    def rollback(self) -> None:
        for child in self.__io_allowed:
            child.rollback()

    def commit(self) -> Tuple[int, int]:
        cnt_inserted, cnt_deleted = 0, 0
        for child in self.__io_allowed:
            cnt_inserted, cnt_deleted = child.commit()
        return cnt_inserted, cnt_deleted

Ancestors

Methods

def register(self, certdb: CertDBReadOnly) ‑> NoneType

Register a virtual subclass of an ABC.

Returns the subclass, to allow usage as a class decorator.

Expand source code
def register(self, certdb: Union[CertDB, CertDBReadOnly]) -> None:
    if certdb not in self._children:
        self._children.append(certdb)
        if isinstance(certdb, CertDB):
            self.__io_allowed.append(certdb)

Inherited members

class CompositeCertDBReadOnly

Composite manager of CertDBReadOnly components.

Expand source code
class CompositeCertDBReadOnly(CertDBReadOnly):
    """
    Composite manager of CertDBReadOnly components.
    """

    def __init__(self):
        log.info('Initializing CompositeCertDBReadOnly composite manager...')
        # List managing all registered CertDBReadOnly components
        self._children: List(CertDBReadOnly) = []

    def register(self, certdb: CertDBReadOnly) -> None:
        """Add component object to the composite manager."""
        if certdb not in self._children:
            self._children.append(certdb)

    def unregister(self, certdb: CertDBReadOnly) -> None:
        """Remove component object from the composite manager."""
        try:
            self._children.remove(certdb)
        except ValueError:
            pass

    def is_registered(self, certdb: CertDBReadOnly) -> bool:
        """Test if component object is registered in the composite manager."""
        return certdb in self._children

    def get(self, cert_id: str) -> str:
        for child in self._children:
            try:
                return child.get(cert_id)
            except CertNotAvailableError:
                pass
        raise CertNotAvailableError

    def export(self, cert_id: str, target_dir: str, copy_if_exists: bool = True) -> str:
        for child in self._children:
            try:
                return child.export(cert_id, target_dir, copy_if_exists)
            except CertNotAvailableError:
                pass
        raise CertNotAvailableError

    def exists(self, cert_id: str) -> bool:
        for child in self._children:
            if child.exists(cert_id):
                return True
        return False

    def exists_all(self, cert_ids: list) -> bool:
        for cert_id in cert_ids:
            for child in self._children:
                if child.exists(cert_id):
                    break
            else:
                return False  # only executed if the inner loop did NOT break
            pass  # only executed if the inner loop DID break

        return True

Ancestors

Subclasses

Methods

def is_registered(self, certdb: CertDBReadOnly) ‑> bool

Test if component object is registered in the composite manager.

Expand source code
def is_registered(self, certdb: CertDBReadOnly) -> bool:
    """Test if component object is registered in the composite manager."""
    return certdb in self._children
def register(self, certdb: CertDBReadOnly) ‑> NoneType

Add component object to the composite manager.

Expand source code
def register(self, certdb: CertDBReadOnly) -> None:
    """Add component object to the composite manager."""
    if certdb not in self._children:
        self._children.append(certdb)
def unregister(self, certdb: CertDBReadOnly) ‑> NoneType

Remove component object from the composite manager.

Expand source code
def unregister(self, certdb: CertDBReadOnly) -> None:
    """Remove component object from the composite manager."""
    try:
        self._children.remove(certdb)
    except ValueError:
        pass

Inherited members