Kanji
・ Cloud engineer / freelance ・ Born in 1993 ・ Born in Ehime Prefecture / Lives in Shibuya-ku, Tokyo ・ AWS history 5 years Profile details
Table of Contents
Prowler is an open-source security tool for AWS, Azure, Google Cloud, and Kubernetes that performs security best practices assessments, audits, incident response, continuous monitoring, hardening, and forensics readiness, and also performs remediation. There is Prowler CLI (Command Line Interface) called Prowler Open Source and a service on top of it called Prowler SaaS. Source: GitHub - prowler-cloud/prowler
Prowler is an open-source security tool for AWS, Azure, Google Cloud, and Kubernetes that performs security best practices assessments, audits, incident response, continuous monitoring, hardening, and forensics readiness, and also performs remediation. There is Prowler CLI (Command Line Interface) called Prowler Open Source and a service on top of it called Prowler SaaS.
The custom checks folder must contain one subfolder per check, each subfolder must be named after the check and contain the following: An empty init .py: This tells Python to treat this check folder as a package. check_name.py: Contains the logic of the check. check_name.metadata.json: Contains the metadata of the check. The check name must start with the service name followed by an underscore (e.g., ec2_instance_public_ip). For more details on how to write checks, refer to the “Developer Guide”. Source: Miscellaneous - Prowler Documentation
The custom checks folder must contain one subfolder per check, each subfolder must be named after the check and contain the following:
An empty init .py: This tells Python to treat this check folder as a package. check_name.py: Contains the logic of the check. check_name.metadata.json: Contains the metadata of the check. The check name must start with the service name followed by an underscore (e.g., ec2_instance_public_ip).
For more details on how to write checks, refer to the “Developer Guide”.
${parent_folder_name}/ ∟ ${AWS_service_name}_${check_name} ∟ __init__.py ∟ ${AWS_service_name}_${check_name}.py ∟ ${AWS_service_name}_${check_name}.metadata.json # Example file structure: Check if alternative contact information is registered for the account custom-checks-folder/ ∟ account_alternative_contact_information_is_registered/ ∟ __init__.py ∟ account_alternative_contact_information_is_registered.py ∟ account_alternative_contact_information_is_registered.metadata.json
${parent_folder_name} specifies an appropriate folder name. Create a folder named ${AWS_service_name}_${check_name} directly under the parent folder.
${parent_folder_name}
${AWS_service_name}_${check_name}
The parent folder can be placed in any nested structure. However, the folder specified as an argument must be the path of the parent folder containing the check folder.
Multiple ${AWS_service_name}_${check_name} folders can be created.
${AWS_service_name} must be one of the following. The detailed reason is explained later. (As of v3.12.1)
${AWS_service_name}
${check_name} specifies the name of the check in lower snake case.
${check_name}
Next, I will explain the content of the files.
The content of the files is created with reference to prowler/prowler/providers/aws/services at master · prowler-cloud/prowler .
__init__.py is an empty file. There is no need to write any processing.
__init__.py
${AWS_service_name}_${check_name}.py describes the logic of the check.
${AWS_service_name}_${check_name}.py
For example, to create a check to see if alternative contact information is registered for the account, it is described as follows.
Check_Report_AWS
region
resource_arn
resource_id
status
status_extended
resource_tags
account_client
prowler/providers/aws/services/${AWS_service_name}/${AWS_service_name}_client.py
prowler/providers/aws/services/${AWS_service_name}/${AWS_service_name}_service.py
${AWS_service_name}_service.py
import json from prowler.lib.logger import logger from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.account.account_client import account_client class account_alternative_contact_information_is_registered(Check): def execute(self): desired_alternative_contact_count = account_client.audit_config.get("desired_alternative_contact_count", 1) findings = [] report = Check_Report_AWS(self.metadata()) # For each Prowler check we MUST fill the following # Check_Report_AWS fields: # - region # - resource_id # - resource_arn # - status # - status_extended # - resource_tags (optional) logger.debug(f"account_client.contact_names: {account_client.contact_names}") logger.debug(f"account_client.contact_phone_numbers: {account_client.contact_phone_numbers}") logger.debug(f"account_client.contact_emails: {account_client.contact_emails}") contact_emails = account_client.contact_emails contact_emails.discard(None) report = Check_Report_AWS(self.metadata()) report.region = account_client.region report.resource_arn = account_client.audited_account_arn report.resource_id = account_client.audited_account if len(contact_emails) != desired_alternative_contact_count: report.status = "FAIL" else: report.status = "PASS" report.status_extended = json.dumps( { "desired_alternative_contact_count": desired_alternative_contact_count, "contact_emails_count": len(contact_emails), "contact_emails": list(contact_emails), }, ) findings.append(report) return findings
${AWS_service_name}_${check_name}.metadata.json
Provider
aws
CheckID
ServiceName
Severity
low
medium
high
critical
{ "Provider": "aws", "CheckID": "account_alternative_contact_information_is_registered", "CheckTitle": "Account alternative contact information is registered", "CheckType": [ "IAM" ], "ServiceName": "account", "SubServiceName": "", "ResourceIdTemplate": "arn:partition:access-recorder:region:account-id:recorder/resource-id", "Severity": "medium", "ResourceType": "Other", "Description": "", "Risk": "", "RelatedUrl": "", "Remediation": { "Code": { "CLI": "", "NativeIaC": "", "Other": "", "Terraform": "" }, "Recommendation": { "Text": "", "Url": "" } }, "Categories": [ "custom-checks" ], "DependsOn": [], "RelatedTo": [], "Notes": "" }
cst_*.py
${parent_folder_name}/ ∟ ${AWS_service_name}_${check_name} ∟ __init__.py ∟ ${AWS_service_name}_${check_name}.py ∟ ${AWS_service_name}_${check_name}.metadata.json ∟ cst_${AWS_service_name}_client.py ∟ cst_${AWS_service_name}_service.py # Example file structure: Check if the Config delivery channel is enabled custom-checks-folder/ ∟ config_delivery_channel_enabled/ ∟ __init__.py ∟ config_delivery_channel_enabled.py ∟ config_delivery_channel_enabled.metadata.json ∟ cst_config_client.py ∟ cst_config_service.py
${AWS_service_name}_${check_name}/
sys.path.append
from prowler.lib.logger import logger from prowler.lib.check.models import Check, Check_Report_AWS import os import sys sys.path.append(os.path.join(os.path.dirname(__file__))) from cst_config_client import config_client # noqa: E402 class config_delivery_channel_enabled(Check): def execute(self): findings = [] report = Check_Report_AWS(self.metadata()) // Omitted
cst_config_client.py
cst_config_service.py
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from cst_config_service import Config config_client = Config(current_audit_info)
Config(AWSService)
__describe_delivery_channels__
regional_client.describe_delivery_channels()
DeliveryChannel
self.delivery_channels
DeliveryChannel(BaseModel)
Optional
None
from typing import Optional from pydantic import BaseModel from prowler.lib.logger import logger from prowler.lib.scan_filters.scan_filters import is_resource_filtered from prowler.providers.aws.lib.service.service import AWSService class Config(AWSService): def __init__(self, audit_info): super().__init__(__class__.__name__, audit_info) self.delivery_channels = [] self.__threading_call__(self.__describe_delivery_channels__) def __describe_delivery_channels__(self, regional_client): logger.info("Config - Listing Delivery Channels...") try: delivery_channel_count = 0 delivery_channels = regional_client.describe_delivery_channels()[ "DeliveryChannels" ] for delivery_channel in delivery_channels: if not self.audit_resources or ( is_resource_filtered(delivery_channel["name"], self.audit_resources) ): self.delivery_channels.append( DeliveryChannel( name=delivery_channel["name"], s3_bucket_name=delivery_channel["s3BucketName"], s3_key_prefix=delivery_channel["s3KeyPrefix"], sns_topic_arn=delivery_channel["snsTopicARN"], region=regional_client.region, ) ) delivery_channel_count += 1 # No delivery channels in region if delivery_channel_count == 0: self.delivery_channels.append( DeliveryChannel( name=self.audited_account, s3_bucket_name=None, s3_key_prefix=None, sns_topic_arn=None, region=regional_client.region, ) ) except Exception as error: logger.error( f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) class DeliveryChannel(BaseModel): name: str s3_bucket_name: Optional[str] s3_key_prefix: Optional[str] sns_topic_arn: Optional[str] region: str
--checks
--log-level
prowler aws \ --checks-folder ./custom-checks-folder/ \ --checks \ account_alternative_contact_information_is_registered \ --log-level ERROR \ -M csv \ --output-directory ./output
prowler aws \ --checks-folder ./custom-checks-folder/ \ --checks \ account_alternative_contact_information_is_registered \ --log-level DEBUG \ -M csv \ --output-directory ./output > debug.log 2>&1
prowler()
prowler/__main__.py
-x
--checks-folder
checks_folder
def prowler(): # Parse Arguments parser = ProwlerArgumentParser() args = parser.parse() # Save Arguments provider = args.provider checks = args.checks excluded_checks = args.excluded_checks excluded_services = args.excluded_services services = args.services categories = args.categories checks_file = args.checks_file checks_folder = args.checks_folder severities = args.severity compliance_framework = args.compliance custom_checks_metadata_file = args.custom_checks_metadata_file
parse_checks_from_folder(audit_info, checks_folder, provider)
# Import custom checks from folder if checks_folder: parse_checks_from_folder(audit_info, checks_folder, provider)
checks_to_execute
# Exclude checks if -e/--excluded-checks if excluded_checks: checks_to_execute = exclude_checks_to_run(checks_to_execute, excluded_checks) # Exclude services if --excluded-services if excluded_services: checks_to_execute = exclude_services_to_run( checks_to_execute, excluded_services, provider ) # Once the audit_info is set and we have the eventual checks based on the resource identifier, # it is time to check what Prowler's checks are going to be executed if audit_info.audit_resources: checks_from_resources = set_provider_execution_parameters(provider, audit_info) checks_to_execute = checks_to_execute.intersection(checks_from_resources) # Sort final check list checks_to_execute = sorted(checks_to_execute)
execute_checks(checks_to_execute,provider,audit_info,audit_output_options,custom_checks_metadata)
findings
# Execute checks findings = [] if len(checks_to_execute): findings = execute_checks( checks_to_execute, provider, audit_info, audit_output_options, custom_checks_metadata, ) else: logger.error( "There are no checks to execute. Please, check your input arguments" )
remove_custom_checks_module(checks_folder, provider)
# If custom checks were passed, remove the modules if checks_folder: remove_custom_checks_module(checks_folder, provider)
prowler/lib/check/check.py
{prowler_dir[0]}/providers/{provider}/services/{check_service}/{check.name}
custom-checks-folder/account_alternative_contact_information_is_registered/
{prowler_dir[0]}/providers/aws/services/account/account_alternative_contact_information_is_registered/
check_service = check.name.split("_")[0]
{prowler_dir[0]}/providers/{provider}/services/{check_service}/
def parse_checks_from_folder(audit_info, input_folder: str, provider: str) -> int: try: imported_checks = 0 # Check if input folder is a S3 URI if provider == "aws" and re.search( "^s3://([^/]+)/(.*?([^/]+))/$", input_folder ): bucket = input_folder.split("/")[2] key = ("/").join(input_folder.split("/")[3:]) s3_resource = audit_info.audit_session.resource("s3") bucket = s3_resource.Bucket(bucket) for obj in bucket.objects.filter(Prefix=key): if not os.path.exists(os.path.dirname(obj.key)): os.makedirs(os.path.dirname(obj.key)) if obj.key[-1] == "/": continue bucket.download_file(obj.key, obj.key) input_folder = key # Import custom checks by moving the checks folders to the corresponding services with os.scandir(input_folder) as checks: for check in checks: if check.is_dir(): check_module = input_folder + "/" + check.name # Copy checks to specific provider/service folder check_service = check.name.split("_")[0] prowler_dir = prowler.__path__ prowler_module = f"{prowler_dir[0]}/providers/{provider}/services/{check_service}/{check.name}" if os.path.exists(prowler_module): shutil.rmtree(prowler_module) shutil.copytree(check_module, prowler_module) imported_checks += 1 return imported_checks except Exception as error: logger.critical( f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}" ) sys.exit(1)
--only-logs
all_findings
importlib.import_module
execute
ModuleNotFoundError
logger.error
# Default execution checks_num = len(checks_to_execute) plural_string = "checks" singular_string = "check" check_noun = plural_string if checks_num > 1 else singular_string print( f"{Style.BRIGHT}Executing {checks_num} {check_noun}, please wait...{Style.RESET_ALL}\n" ) with alive_bar( total=len(checks_to_execute), ctrl_c=False, bar="blocks", spinner="classic", stats=False, enrich_print=False, ) as bar: for check_name in checks_to_execute: # Recover service from check name service = check_name.split("_")[0] bar.title = ( f"-> Scanning {orange_color}{service}{Style.RESET_ALL} service" ) try: check_findings = execute( service, check_name, provider, audit_output_options, audit_info, services_executed, checks_executed, custom_checks_metadata, ) all_findings.extend(check_findings) # If check does not exists in the provider or is from another provider except ModuleNotFoundError: logger.error( f"Check '{check_name}' was not found for the {provider.upper()} provider" ) except Exception as error: logger.error( f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) bar() bar.title = f"-> {Fore.GREEN}Scan completed!{Style.RESET_ALL}" return all_findings