Module cevast.dataset.collectors.rapid
This module contains collector implementation for Rapid dataset source.
Expand source code
"""This module contains collector implementation for Rapid dataset source."""
# pylint: disable=E1101
# - known Pylint issue https://github.com/PyCQA/pylint/issues/1411
import os
import logging
import re
from datetime import datetime
from typing import List, Union, Tuple
import requests
from cevast.dataset.dataset import DatasetSource, DatasetCollectionError, Dataset
__author__ = 'Radim Podola'
log = logging.getLogger(__name__)
cli_log = logging.getLogger('CEVAST_CLI')
class RapidCollector:
"""
Class collecting datasets from Rapid7 project Sonar study: "Project Sonar: IPv4 SSL Certificates".
Data are retrieved via Open Data API (more at: https://opendata.rapid7.com/apihelp/).
To use the API to collect the datasets, one must have an account and API User Key.
Set the key to "RAPID_API_KEY" enviroment variable or provide upon initialization.
"""
dataset_source = DatasetSource.RAPID.name
_DATASETS_URL = "https://us.api.insight.rapid7.com/opendata/studies/sonar.ssl/"
_DOWNLOAD_URL = _DATASETS_URL + "{}/download/"
_QUOTA_URL = "https://us.api.insight.rapid7.com/opendata/quota/"
def __init__(self, api_key: str = None):
if api_key is None:
try:
self.__api_key = os.environ['RAPID_API_KEY']
except KeyError:
log.error('RAPID_API_KEY is not set in enviroment!')
raise ValueError('RAPID_API_KEY is not provided nor set in the enviroment!')
else:
self.__api_key = api_key
# TODO add date_range strategy via paramater date_since=None
def collect(self, download_dir: str = '.', date: datetime.date = datetime.today().date(),
filter_ports: Union[Tuple[str], str] = '443',
filter_types: Union[Tuple[str], str] = ('hosts', 'certs')) -> Tuple[str]:
"""
Download the newest dataset by the given date and store it into `download_dir` directory.
If `date` is not specified, the newest dataset ever is downloaded by default.
Datasets can be filtered by PORT and TYPE via paramaters `filter_ports` and `filter_types`.
If a filter parameter is empty or None, the filter is not applied.
Tuple with downloaded dataset's paths is returned.
Function first query Open Data API and get fresh list of all datasets. If filter parameters
are given, list if filtered accordingly. Datasets are provided chronologically in a list
from the newest to the oldest ones. List is iterated from the newest until the closest
dataset to given date is reached.
Most of the dataset names has a fixed format: "{YYYYmmdd}/...._{PORT}_{TYPE}.gz", where
{YYYYmmdd} is the date when the dataset was collected,
{PORT} is port on which the certificates were retrieved,
{TYPE} is type of the dataset [hosts, endpoints, names, certs]
e.g. 20180213/2018-02-13-1518483602-https_get_443_certs.gz or
20170410_ssl_443_names.gz
Older datasets with different formats are not supported now.
"""
def match_filters(dataset: str) -> bool:
match = re.match(dataset_name_rgx, dataset)
if not match:
return False
if filter_ports and match.group('port') not in filter_ports:
return False
if filter_types and match.group('type') not in filter_types:
return False
return True
if isinstance(filter_ports, str):
filter_ports = (filter_ports,)
if isinstance(filter_types, str):
filter_types = (filter_types,)
dataset_name_rgx = r"^(?P<date>\d{8}).*_(?P<port>\d+)_(?P<type>\w+)\.gz$"
datasets_to_download = {}
target_date = None
log.info('Start collecting Rapid datasets with paramaters:')
log.info(' date=%s, ports=%s, types=%s to directory %s', date, filter_ports, filter_types, download_dir)
# Filter the datasets
datasets = list(filter(match_filters, self.get_datasets()))
log.debug('Filtered datasets: %s', datasets)
for dataset in datasets:
if target_date is None and date >= datetime.strptime(dataset[:8], '%Y%m%d').date():
# Found the target date
target_date = dataset[:8]
if target_date is not None:
match = re.match(dataset_name_rgx, dataset)
if match.group('date') != target_date:
break # Another date encountered, we have all datasets now -> break
path = os.path.join(
download_dir,
Dataset.format_filename(match.group('date'), match.group('port'), match.group('type') + '.gz'),
)
datasets_to_download[dataset] = path
# Download the datasets
for dataset_file, path in datasets_to_download.items():
log.info('Download dataset file <%s> to <%s>.', dataset_file, path)
if os.path.exists(path):
log.info('Dataset is already downloaded.')
else:
cli_log.info('Will download dataset file <%s>', dataset_file)
self.__download(dataset_file, path)
cli_log.info('Dataset file <%s> downloaded', dataset_file)
log.info('Rapid datasets collected.')
return tuple(datasets_to_download.values())
def get_datasets(self) -> List[str]:
"""
Get fresh list of datasets in the project study.
"""
req = requests.get(self._DATASETS_URL, headers={"X-Api-Key": self.__api_key})
if req.status_code != requests.codes.ok:
log.error('HTTP error <%s> when retrieving dataset list.', req.status_code)
return []
return req.json().get('sonarfile_set', [])
def get_quota(self) -> dict:
"""Get account download quota information."""
req = requests.get(self._QUOTA_URL, headers={"X-Api-Key": self.__api_key})
if req.status_code != requests.codes.ok:
log.error('HTTP error <%s> when retrieving download quota information.', req.status_code)
return {}
return req.json()
def quota_left(self) -> int:
"""Return how many download quota is left for the day."""
return self.get_quota().get('quota_left', 0)
def __download(self, dataset_name: str, target_filename: str):
"""Download the dataset from the project study."""
# Retrieve download URL of the dataset archive
req = requests.get(self._DOWNLOAD_URL.format(dataset_name), headers={"X-Api-Key": self.__api_key})
if req.status_code != requests.codes.ok:
log.error('HTTP error <%s> when retrieving download URL.', req.status_code)
retry = req.headers.get('Retry-After', 0)
raise DatasetCollectionError('Dataset download failed. Quota might exceeded, try again after {}[s].'.format(retry))
url = req.json().get('url', None)
if not url:
raise DatasetCollectionError('Dataset download failed.')
# Download the dataset archive
with requests.get(url, stream=True) as stream_f:
if stream_f.status_code != requests.codes.ok:
raise DatasetCollectionError('Dataset download failed with HTTP error <{}>.'.format(stream_f.status_code))
# Dataset file might have size of GBs, so read and write by increments
log.debug('Downloading dataset of size %sB.', stream_f.headers['Content-Length'])
with open(target_filename, 'wb') as w_obj:
for chunk in stream_f.iter_content(chunk_size=8192):
w_obj.write(chunk)
# TODO make some progress logs (25%, 50%, 75%)
Classes
class RapidCollector (api_key: str = None)
-
Class collecting datasets from Rapid7 project Sonar study: "Project Sonar: IPv4 SSL Certificates".
Data are retrieved via Open Data API (more at: https://opendata.rapid7.com/apihelp/). To use the API to collect the datasets, one must have an account and API User Key. Set the key to "RAPID_API_KEY" enviroment variable or provide upon initialization.
Expand source code
class RapidCollector: """ Class collecting datasets from Rapid7 project Sonar study: "Project Sonar: IPv4 SSL Certificates". Data are retrieved via Open Data API (more at: https://opendata.rapid7.com/apihelp/). To use the API to collect the datasets, one must have an account and API User Key. Set the key to "RAPID_API_KEY" enviroment variable or provide upon initialization. """ dataset_source = DatasetSource.RAPID.name _DATASETS_URL = "https://us.api.insight.rapid7.com/opendata/studies/sonar.ssl/" _DOWNLOAD_URL = _DATASETS_URL + "{}/download/" _QUOTA_URL = "https://us.api.insight.rapid7.com/opendata/quota/" def __init__(self, api_key: str = None): if api_key is None: try: self.__api_key = os.environ['RAPID_API_KEY'] except KeyError: log.error('RAPID_API_KEY is not set in enviroment!') raise ValueError('RAPID_API_KEY is not provided nor set in the enviroment!') else: self.__api_key = api_key # TODO add date_range strategy via paramater date_since=None def collect(self, download_dir: str = '.', date: datetime.date = datetime.today().date(), filter_ports: Union[Tuple[str], str] = '443', filter_types: Union[Tuple[str], str] = ('hosts', 'certs')) -> Tuple[str]: """ Download the newest dataset by the given date and store it into `download_dir` directory. If `date` is not specified, the newest dataset ever is downloaded by default. Datasets can be filtered by PORT and TYPE via paramaters `filter_ports` and `filter_types`. If a filter parameter is empty or None, the filter is not applied. Tuple with downloaded dataset's paths is returned. Function first query Open Data API and get fresh list of all datasets. If filter parameters are given, list if filtered accordingly. Datasets are provided chronologically in a list from the newest to the oldest ones. List is iterated from the newest until the closest dataset to given date is reached. Most of the dataset names has a fixed format: "{YYYYmmdd}/...._{PORT}_{TYPE}.gz", where {YYYYmmdd} is the date when the dataset was collected, {PORT} is port on which the certificates were retrieved, {TYPE} is type of the dataset [hosts, endpoints, names, certs] e.g. 20180213/2018-02-13-1518483602-https_get_443_certs.gz or 20170410_ssl_443_names.gz Older datasets with different formats are not supported now. """ def match_filters(dataset: str) -> bool: match = re.match(dataset_name_rgx, dataset) if not match: return False if filter_ports and match.group('port') not in filter_ports: return False if filter_types and match.group('type') not in filter_types: return False return True if isinstance(filter_ports, str): filter_ports = (filter_ports,) if isinstance(filter_types, str): filter_types = (filter_types,) dataset_name_rgx = r"^(?P<date>\d{8}).*_(?P<port>\d+)_(?P<type>\w+)\.gz$" datasets_to_download = {} target_date = None log.info('Start collecting Rapid datasets with paramaters:') log.info(' date=%s, ports=%s, types=%s to directory %s', date, filter_ports, filter_types, download_dir) # Filter the datasets datasets = list(filter(match_filters, self.get_datasets())) log.debug('Filtered datasets: %s', datasets) for dataset in datasets: if target_date is None and date >= datetime.strptime(dataset[:8], '%Y%m%d').date(): # Found the target date target_date = dataset[:8] if target_date is not None: match = re.match(dataset_name_rgx, dataset) if match.group('date') != target_date: break # Another date encountered, we have all datasets now -> break path = os.path.join( download_dir, Dataset.format_filename(match.group('date'), match.group('port'), match.group('type') + '.gz'), ) datasets_to_download[dataset] = path # Download the datasets for dataset_file, path in datasets_to_download.items(): log.info('Download dataset file <%s> to <%s>.', dataset_file, path) if os.path.exists(path): log.info('Dataset is already downloaded.') else: cli_log.info('Will download dataset file <%s>', dataset_file) self.__download(dataset_file, path) cli_log.info('Dataset file <%s> downloaded', dataset_file) log.info('Rapid datasets collected.') return tuple(datasets_to_download.values()) def get_datasets(self) -> List[str]: """ Get fresh list of datasets in the project study. """ req = requests.get(self._DATASETS_URL, headers={"X-Api-Key": self.__api_key}) if req.status_code != requests.codes.ok: log.error('HTTP error <%s> when retrieving dataset list.', req.status_code) return [] return req.json().get('sonarfile_set', []) def get_quota(self) -> dict: """Get account download quota information.""" req = requests.get(self._QUOTA_URL, headers={"X-Api-Key": self.__api_key}) if req.status_code != requests.codes.ok: log.error('HTTP error <%s> when retrieving download quota information.', req.status_code) return {} return req.json() def quota_left(self) -> int: """Return how many download quota is left for the day.""" return self.get_quota().get('quota_left', 0) def __download(self, dataset_name: str, target_filename: str): """Download the dataset from the project study.""" # Retrieve download URL of the dataset archive req = requests.get(self._DOWNLOAD_URL.format(dataset_name), headers={"X-Api-Key": self.__api_key}) if req.status_code != requests.codes.ok: log.error('HTTP error <%s> when retrieving download URL.', req.status_code) retry = req.headers.get('Retry-After', 0) raise DatasetCollectionError('Dataset download failed. Quota might exceeded, try again after {}[s].'.format(retry)) url = req.json().get('url', None) if not url: raise DatasetCollectionError('Dataset download failed.') # Download the dataset archive with requests.get(url, stream=True) as stream_f: if stream_f.status_code != requests.codes.ok: raise DatasetCollectionError('Dataset download failed with HTTP error <{}>.'.format(stream_f.status_code)) # Dataset file might have size of GBs, so read and write by increments log.debug('Downloading dataset of size %sB.', stream_f.headers['Content-Length']) with open(target_filename, 'wb') as w_obj: for chunk in stream_f.iter_content(chunk_size=8192): w_obj.write(chunk)
Class variables
var dataset_source
Methods
def collect(self, download_dir: str = '.', date:
= datetime.date(2021, 5, 4), filter_ports: Union[Tuple[str], str] = '443', filter_types: Union[Tuple[str], str] = ('hosts', 'certs')) ‑> Tuple[str] -
Download the newest dataset by the given date and store it into
download_dir
directory. Ifdate
is not specified, the newest dataset ever is downloaded by default. Datasets can be filtered by PORT and TYPE via paramatersfilter_ports
andfilter_types
. If a filter parameter is empty or None, the filter is not applied.Tuple with downloaded dataset's paths is returned.
Function first query Open Data API and get fresh list of all datasets. If filter parameters are given, list if filtered accordingly. Datasets are provided chronologically in a list from the newest to the oldest ones. List is iterated from the newest until the closest dataset to given date is reached.
Most of the dataset names has a fixed format: "{YYYYmmdd}/....{PORT}.gz", where {YYYYmmdd} is the date when the dataset was collected, {PORT} is port on which the certificates were retrieved, {TYPE} is type of the dataset [hosts, endpoints, names, certs] e.g. 20180213/2018-02-13-1518483602-https_get_443_certs.gz or 20170410_ssl_443_names.gz Older datasets with different formats are not supported now.
Expand source code
def collect(self, download_dir: str = '.', date: datetime.date = datetime.today().date(), filter_ports: Union[Tuple[str], str] = '443', filter_types: Union[Tuple[str], str] = ('hosts', 'certs')) -> Tuple[str]: """ Download the newest dataset by the given date and store it into `download_dir` directory. If `date` is not specified, the newest dataset ever is downloaded by default. Datasets can be filtered by PORT and TYPE via paramaters `filter_ports` and `filter_types`. If a filter parameter is empty or None, the filter is not applied. Tuple with downloaded dataset's paths is returned. Function first query Open Data API and get fresh list of all datasets. If filter parameters are given, list if filtered accordingly. Datasets are provided chronologically in a list from the newest to the oldest ones. List is iterated from the newest until the closest dataset to given date is reached. Most of the dataset names has a fixed format: "{YYYYmmdd}/...._{PORT}_{TYPE}.gz", where {YYYYmmdd} is the date when the dataset was collected, {PORT} is port on which the certificates were retrieved, {TYPE} is type of the dataset [hosts, endpoints, names, certs] e.g. 20180213/2018-02-13-1518483602-https_get_443_certs.gz or 20170410_ssl_443_names.gz Older datasets with different formats are not supported now. """ def match_filters(dataset: str) -> bool: match = re.match(dataset_name_rgx, dataset) if not match: return False if filter_ports and match.group('port') not in filter_ports: return False if filter_types and match.group('type') not in filter_types: return False return True if isinstance(filter_ports, str): filter_ports = (filter_ports,) if isinstance(filter_types, str): filter_types = (filter_types,) dataset_name_rgx = r"^(?P<date>\d{8}).*_(?P<port>\d+)_(?P<type>\w+)\.gz$" datasets_to_download = {} target_date = None log.info('Start collecting Rapid datasets with paramaters:') log.info(' date=%s, ports=%s, types=%s to directory %s', date, filter_ports, filter_types, download_dir) # Filter the datasets datasets = list(filter(match_filters, self.get_datasets())) log.debug('Filtered datasets: %s', datasets) for dataset in datasets: if target_date is None and date >= datetime.strptime(dataset[:8], '%Y%m%d').date(): # Found the target date target_date = dataset[:8] if target_date is not None: match = re.match(dataset_name_rgx, dataset) if match.group('date') != target_date: break # Another date encountered, we have all datasets now -> break path = os.path.join( download_dir, Dataset.format_filename(match.group('date'), match.group('port'), match.group('type') + '.gz'), ) datasets_to_download[dataset] = path # Download the datasets for dataset_file, path in datasets_to_download.items(): log.info('Download dataset file <%s> to <%s>.', dataset_file, path) if os.path.exists(path): log.info('Dataset is already downloaded.') else: cli_log.info('Will download dataset file <%s>', dataset_file) self.__download(dataset_file, path) cli_log.info('Dataset file <%s> downloaded', dataset_file) log.info('Rapid datasets collected.') return tuple(datasets_to_download.values())
def get_datasets(self) ‑> List[str]
-
Get fresh list of datasets in the project study.
Expand source code
def get_datasets(self) -> List[str]: """ Get fresh list of datasets in the project study. """ req = requests.get(self._DATASETS_URL, headers={"X-Api-Key": self.__api_key}) if req.status_code != requests.codes.ok: log.error('HTTP error <%s> when retrieving dataset list.', req.status_code) return [] return req.json().get('sonarfile_set', [])
def get_quota(self) ‑> dict
-
Get account download quota information.
Expand source code
def get_quota(self) -> dict: """Get account download quota information.""" req = requests.get(self._QUOTA_URL, headers={"X-Api-Key": self.__api_key}) if req.status_code != requests.codes.ok: log.error('HTTP error <%s> when retrieving download quota information.', req.status_code) return {} return req.json()
def quota_left(self) ‑> int
-
Return how many download quota is left for the day.
Expand source code
def quota_left(self) -> int: """Return how many download quota is left for the day.""" return self.get_quota().get('quota_left', 0)