Kanji
・クラウドエンジニア / フリーランス ・1993年生まれ ・愛媛県出身 / 東京都渋谷区在住 ・AWS歴5年 プロフィールの詳細
目次
Prowler は、AWS、Azure、Google Cloud、Kubernetes のセキュリティのベスト プラクティスの評価、監査、インシデント対応、継続的な監視、強化とフォレンジックの準備、 さらに修復を実行するオープンソースのセキュリティ ツールです。 Prowler Open Source と呼ばれる Prowler CLI (コマンド ライン インターフェイス) と、 Prowler SaaSと呼ばれるその上のサービスがあります。 和訳元:GitHub - prowler-cloud/prowler
Prowler は、AWS、Azure、Google Cloud、Kubernetes のセキュリティのベスト プラクティスの評価、監査、インシデント対応、継続的な監視、強化とフォレンジックの準備、 さらに修復を実行するオープンソースのセキュリティ ツールです。 Prowler Open Source と呼ばれる Prowler CLI (コマンド ライン インターフェイス) と、 Prowler SaaSと呼ばれるその上のサービスがあります。
カスタム チェック フォルダーには、チェックごとに 1 つのサブフォルダーが含まれている必要があり、各サブフォルダーにはチェックの名前が付けられ、次のものが含まれている必要があります。 空の__init__.py: を指定すると、Python はこのチェック フォルダーをパッケージとして扱います。 check_name.pyチェックのロジックを含む。 check_name.metadata.jsonチェックのメタデータを含む。 チェック名はサービス名で始まり、その後にアンダースコアが続く必要があります (例: ec2_instance_public_ip)。 チェックの書き方の詳細については、「開発者ガイド」を参照してください。 和訳元:Miscellaneous - Prowler Documentation
カスタム チェック フォルダーには、チェックごとに 1 つのサブフォルダーが含まれている必要があり、各サブフォルダーにはチェックの名前が付けられ、次のものが含まれている必要があります。
空の__init__.py: を指定すると、Python はこのチェック フォルダーをパッケージとして扱います。 check_name.pyチェックのロジックを含む。 check_name.metadata.jsonチェックのメタデータを含む。 チェック名はサービス名で始まり、その後にアンダースコアが続く必要があります (例: ec2_instance_public_ip)。
チェックの書き方の詳細については、「開発者ガイド」を参照してください。
${親フォルダ名}/ ∟ ${AWSサービス名}_${チェック名} ∟ __init__.py ∟ ${AWSサービス名}_${チェック名}.py ∟ ${AWSサービス名}_${チェック名}.metadata.json # ファイル構成例:アカウントの代替連絡先情報が登録されているか確認するチェック custom-checks-folder/ ∟ account_alternative_contact_information_is_registered/ ∟ __init__.py ∟ account_alternative_contact_information_is_registered.py ∟ account_alternative_contact_information_is_registered.metadata.json
${親フォルダ名}
${AWSサービス名}_${チェック名}
${AWSサービス名}
${チェック名}
__init__.py
${AWSサービス名}_${チェック名}.py
Check_Report_AWS
region
resource_arn
resource_id
status
status_extended
resource_tags
account_client
prowler/providers/aws/services/${AWSサービス名}/{AWSサービス名}_client.py
prowler/providers/aws/services/{AWSサービス名}/{AWSサービス名}_service.py
{AWSサービス名}_service.py
import json from prowler.lib.logger import logger from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.account.account_client import account_client class account_alternative_contact_information_is_registered(Check): def execute(self): desired_alternative_contact_count = account_client.audit_config.get("desired_alternative_contact_count", 1) findings = [] report = Check_Report_AWS(self.metadata()) # For each Prowler check we MUST fill the following # Check_Report_AWS fields: # - region # - resource_id # - resource_arn # - status # - status_extended # - resource_tags (optional) logger.debug(f"account_client.contact_names: {account_client.contact_names}") logger.debug(f"account_client.contact_phone_numbers: {account_client.contact_phone_numbers}") logger.debug(f"account_client.contact_emails: {account_client.contact_emails}") contact_emails = account_client.contact_emails contact_emails.discard(None) report = Check_Report_AWS(self.metadata()) report.region = account_client.region report.resource_arn = account_client.audited_account_arn report.resource_id = account_client.audited_account if len(contact_emails) != desired_alternative_contact_count: report.status = "FAIL" else: report.status = "PASS" report.status_extended = json.dumps( { "desired_alternative_contact_count": desired_alternative_contact_count, "contact_emails_count": len(contact_emails), "contact_emails": list(contact_emails), }, ) findings.append(report) return findings
${AWSサービス名}_${チェック名}.metadata.json
Provider
aws
CheckID
ServiceName
Severity
low
medium
high
critical
{ "Provider": "aws", "CheckID": "account_alternative_contact_information_is_registered", "CheckTitle": "Account alternative contact information is registered", "CheckType": [ "IAM" ], "ServiceName": "account", "SubServiceName": "", "ResourceIdTemplate": "arn:partition:access-recorder:region:account-id:recorder/resource-id", "Severity": "medium", "ResourceType": "Other", "Description": "", "Risk": "", "RelatedUrl": "", "Remediation": { "Code": { "CLI": "", "NativeIaC": "", "Other": "", "Terraform": "" }, "Recommendation": { "Text": "", "Url": "" } }, "Categories": [ "custom-checks" ], "DependsOn": [], "RelatedTo": [], "Notes": "" }
cst_*.py
${親フォルダ名}/ ∟ ${AWSサービス名}_${チェック名} ∟ __init__.py ∟ ${AWSサービス名}_${チェック名}.py ∟ ${AWSサービス名}_${チェック名}.metadata.json ∟ cst_${AWSサービス名}_client.py ∟ cst_${AWSサービス名}_service.py # ファイル構成例:Configの配信チャネルが有効になっているか確認するチェック custom-checks-folder/ ∟ config_delivery_channel_enabled/ ∟ __init__.py ∟ config_delivery_channel_enabled.py ∟ config_delivery_channel_enabled.metadata.json ∟ cst_config_client.py ∟ cst_config_service.py
${AWSサービス名}_${チェック名}/
sys.path.append
from prowler.lib.logger import logger from prowler.lib.check.models import Check, Check_Report_AWS import os import sys sys.path.append(os.path.join(os.path.dirname(__file__))) from cst_config_client import config_client # noqa: E402 class config_delivery_channel_enabled(Check): def execute(self): findings = [] report = Check_Report_AWS(self.metadata()) // 省略
cst_config_client.py
cst_config_service.py
from prowler.providers.aws.lib.audit_info.audit_info import current_audit_info from cst_config_service import Config config_client = Config(current_audit_info)
Config(AWSService)
__describe_delivery_channels__
regional_client.describe_delivery_channels()
DeliveryChannel
self.delivery_channels
DeliveryChannel(BaseModel)
Optional
None
from typing import Optional from pydantic import BaseModel from prowler.lib.logger import logger from prowler.lib.scan_filters.scan_filters import is_resource_filtered from prowler.providers.aws.lib.service.service import AWSService class Config(AWSService): def __init__(self, audit_info): super().__init__(__class__.__name__, audit_info) self.delivery_channels = [] self.__threading_call__(self.__describe_delivery_channels__) def __describe_delivery_channels__(self, regional_client): logger.info("Config - Listing Delivery Channels...") try: delivery_channel_count = 0 delivery_channels = regional_client.describe_delivery_channels()[ "DeliveryChannels" ] for delivery_channel in delivery_channels: if not self.audit_resources or ( is_resource_filtered(delivery_channel["name"], self.audit_resources) ): self.delivery_channels.append( DeliveryChannel( name=delivery_channel["name"], s3_bucket_name=delivery_channel["s3BucketName"], s3_key_prefix=delivery_channel["s3KeyPrefix"], sns_topic_arn=delivery_channel["snsTopicARN"], region=regional_client.region, ) ) delivery_channel_count += 1 # No delivery channels in region if delivery_channel_count == 0: self.delivery_channels.append( DeliveryChannel( name=self.audited_account, s3_bucket_name=None, s3_key_prefix=None, sns_topic_arn=None, region=regional_client.region, ) ) except Exception as error: logger.error( f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) class DeliveryChannel(BaseModel): name: str s3_bucket_name: Optional[str] s3_key_prefix: Optional[str] sns_topic_arn: Optional[str] region: str
--checks
--log-level
prowler aws \ --checks-folder ./custom-checks-folder/ \ --checks \ account_alternative_contact_information_is_registered \ --log-level ERROR \ -M csv \ --output-directory ./output
prowler aws \ --checks-folder ./custom-checks-folder/ \ --checks \ account_alternative_contact_information_is_registered \ --log-level DEBUG \ -M csv \ --output-directory ./output > debug.log 2>&1
prowler/__main__.py
prowler()
-x
--checks-folder
checks_folder
def prowler(): # Parse Arguments parser = ProwlerArgumentParser() args = parser.parse() # Save Arguments provider = args.provider checks = args.checks excluded_checks = args.excluded_checks excluded_services = args.excluded_services services = args.services categories = args.categories checks_file = args.checks_file checks_folder = args.checks_folder severities = args.severity compliance_framework = args.compliance custom_checks_metadata_file = args.custom_checks_metadata_file
parse_checks_from_folder(audit_info, checks_folder, provider)
# Import custom checks from folder if checks_folder: parse_checks_from_folder(audit_info, checks_folder, provider)
checks_to_execute
# Exclude checks if -e/--excluded-checks if excluded_checks: checks_to_execute = exclude_checks_to_run(checks_to_execute, excluded_checks) # Exclude services if --excluded-services if excluded_services: checks_to_execute = exclude_services_to_run( checks_to_execute, excluded_services, provider ) # Once the audit_info is set and we have the eventual checks based on the resource identifier, # it is time to check what Prowler's checks are going to be executed if audit_info.audit_resources: checks_from_resources = set_provider_execution_parameters(provider, audit_info) checks_to_execute = checks_to_execute.intersection(checks_from_resources) # Sort final check list checks_to_execute = sorted(checks_to_execute)
execute_checks(checks_to_execute,provider,audit_info,audit_output_options,custom_checks_metadata)
findings
# Execute checks findings = [] if len(checks_to_execute): findings = execute_checks( checks_to_execute, provider, audit_info, audit_output_options, custom_checks_metadata, ) else: logger.error( "There are no checks to execute. Please, check your input arguments" )
remove_custom_checks_module(checks_folder, provider)
# If custom checks were passed, remove the modules if checks_folder: remove_custom_checks_module(checks_folder, provider)
prowler/lib/check/check.py
{prowler_dir[0]}/providers/{provider}/services/{check_service}/{check.name}
custom-checks-folder/ account_alternative_contact_information_is_registered/
custom-checks-folder/account_alternative_contact_information_is_registered/
{prowler_dir[0]}/providers/aws/services/account/account_alternative_contact_information_is_registered/
check_service = check.name.split("_")[0]
{prowler_dir[0]}/providers/{provider}/services/{check_service}/
def parse_checks_from_folder(audit_info, input_folder: str, provider: str) -> int: try: imported_checks = 0 # Check if input folder is a S3 URI if provider == "aws" and re.search( "^s3://([^/]+)/(.*?([^/]+))/$", input_folder ): bucket = input_folder.split("/")[2] key = ("/").join(input_folder.split("/")[3:]) s3_resource = audit_info.audit_session.resource("s3") bucket = s3_resource.Bucket(bucket) for obj in bucket.objects.filter(Prefix=key): if not os.path.exists(os.path.dirname(obj.key)): os.makedirs(os.path.dirname(obj.key)) if obj.key[-1] == "/": continue bucket.download_file(obj.key, obj.key) input_folder = key # Import custom checks by moving the checks folders to the corresponding services with os.scandir(input_folder) as checks: for check in checks: if check.is_dir(): check_module = input_folder + "/" + check.name # Copy checks to specific provider/service folder check_service = check.name.split("_")[0] prowler_dir = prowler.__path__ prowler_module = f"{prowler_dir[0]}/providers/{provider}/services/{check_service}/{check.name}" if os.path.exists(prowler_module): shutil.rmtree(prowler_module) shutil.copytree(check_module, prowler_module) imported_checks += 1 return imported_checks except Exception as error: logger.critical( f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}" ) sys.exit(1)
--only-logs
all_findings
execute
importlib.import_module
ModuleNotFoundError
logger.error
# Default execution checks_num = len(checks_to_execute) plural_string = "checks" singular_string = "check" check_noun = plural_string if checks_num > 1 else singular_string print( f"{Style.BRIGHT}Executing {checks_num} {check_noun}, please wait...{Style.RESET_ALL}\n" ) with alive_bar( total=len(checks_to_execute), ctrl_c=False, bar="blocks", spinner="classic", stats=False, enrich_print=False, ) as bar: for check_name in checks_to_execute: # Recover service from check name service = check_name.split("_")[0] bar.title = ( f"-> Scanning {orange_color}{service}{Style.RESET_ALL} service" ) try: check_findings = execute( service, check_name, provider, audit_output_options, audit_info, services_executed, checks_executed, custom_checks_metadata, ) all_findings.extend(check_findings) # If check does not exists in the provider or is from another provider except ModuleNotFoundError: logger.error( f"Check '{check_name}' was not found for the {provider.upper()} provider" ) except Exception as error: logger.error( f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) bar() bar.title = f"-> {Fore.GREEN}Scan completed!{Style.RESET_ALL}" return all_findings