- Security Lake ETL Framework
- Prerequisites
- Assumptions/Limitations
- Environment Setup
- Solution Usage
- FAQs and Troubleshooting
This repository provides a sample approach to Transform Security Logs into OCSF format using a Config-Driven ETL Framework and provides step-by-step deployment and usage instructions.
- A VPC is required for the solution. The user may provide one or one will be created for them.
- If user is providing the VPC: VPC endpoints for Secrets manager, S3, DynamoDB, and SNS must exist. Subnets of type
PRIVATE_WITH_EGRESSmust exist as that is where Lambda/Glue/EMR resources are placed. - If Enrichment is being used, the user must provide the VPC where it exists along with the security groups and network configurations required for access during deployment.
- A security group with a self referencing rule as one of the inbound rules must exist for the Enrichment database.
- A user provided VPC should have enough Elastic IP capacity for EMR serverless to be deployed in it.
- If Encrichment is not being used, the user can either provide an existing VPC with the required endpoints or one will be created.
- Currently the security lake ETL runs in the same account of the S3 bucket. Cross account provisioning is not supported.
- In case of enrichment, the JDBC source in AWS should run in the same VPC as the ETL framework.
- The JDBC credentials, host and port should be present in secrets manager. The secret ID will be passed during dpeloyment.
- The Glue/EMR job should run in the same VPC as the JDBC source if enrichment is required.
- Historical source data should be partitioned based on year, month and day.
- The JDBC supported engines are POSTGRESQL, ORACLE, MYSQL and SQL_SERVER.
- Conditional mapping and one to many source to target mapping is currently not supported.
- The deployment is tested with python >=3.9
- We recommend users setup Named profiles to manage AWS CLI credentials. The deployment instructions here assume the user has done so.
- If users choose not to use named profiles and instead use the default profile, you won't need to pass the profile name as shown in the below commands.
cd cdkpython3 -m venv .venvsource .venv/bin/activatepip install -r requirements.txt- If CDK has never been deployed in the target environment, you may need to Bootstrap the Account
cdk bootstrap aws://ACCOUNT-NUMBER-1/REGION-1 --profile NAMED_PROFILE
For your convenience, a CLI script is provided which will interactively set your solution's configruation as described above. Skip this part if you'd like to deploy with a default configuration of no Enrichment Database into a newly created VPC with 2 subnets.
- From the
cdkdirectory runpython create_config.py. - Answer the questions as per your environment needs. A configuration file is created indicating your setup. For example, the default config:
The OCSF class names you indicate will determine what data is extracted from your logs as part of the solution. At the top of config.py, set the CLASS_NAMES variable to the values you want from https://schema.ocsf.io/1.0.0-rc.3/classes?extensions=. By default network_activity is used.
-
Run
cdk deploy SecureDatalakeStack --profile NAMED_PROFILE -
SNS Configuration - once the CDK has been deployed, subscribe desired emails to the SNS topics created
-
This CDK deployment pre-configures the solution to convert VPC Flow logs. Follow these steps for a quick execution:
- Execute Reference Lambda (asl-etl-framework_update-reference-ddb) which will extract schema definition of required classes from https://schema.ocsf.io/ and store them in Reference DynamoDB table.
- Upload test VPC Flow source data in Parquet format at following path : Source-Bucket/vpc-flow-logs/2019/12/31/
- Download metadata file from Artifact-Bucket/config/metadata/vpc-flow-log-metadata.csv and re-upload this file at the same path to trigger the required event notification which invokes the lambda function to update required dynamoDB tables.
- Go to Step Function and run the step function with following input:
{ "source_log_type": "vpc-flow-log", "load_type": "historical", "full_load": "false", "execution_engine": "glue", "ddb_lookup_table": "asl-etl-framework-table-details", "ddb_mapping_table": "asl-etl-framework-ocsf-attribute-mapping", "ddb_metadata_table": "asl-etl-framework-source-ocsf-metadata", "ddb_reference_table": "asl-etl-framework-ocsf-reference", "asl_status_table": "asl-etl-framework-run-status", "asl_job_name": "asl-etl-framework-init-ocsf-conversion" }
- Decide the target OCSF class for your use case and update the environment variable called "class_names" of the asl-etl-framework_update-reference-ddb lambda function, if required. This lambda function extracts the required fields for the given class from the https://schema.ocsf.io/ and store them in Reference DynamoDB table. Execute this lambda function before proceeding further.
- Prepare a CSV file and store the one to one mapping between source log and target ocsf attributes. Below are the details of the mapping file. A sample mapping file for VPC Flow log has been pre-provisioned by CDK deployment.
- Location : Artifact-Bucket/config/mapping
- Structure: This CSV should have 4 columns in following order:
- src_log_type : Name of the source log. This should be consistent and same across all inputs.
- src_column_name : Exact name of the column in the source log, keep the order exactly same as the actual source log files.
- tgt_column : This column stores the name and the data type of Target column as per the OCSF schema for that specific class. This column should be populated in a specific format i.e. :, if the target column is defined inside an OSCF object than use “object” as the data type. Few examples :
- “metadata.product.version” to be written as “metadata:object.product:object.version:string”
- “start_time” to be written as “time:timestamp“
- “bytes” to be written as “traffic:object.bytes:bigint“
- default_values : This column will be used to populate certain columns which will have default values for all the records. Provide the corresponding target column while entering value in this column.
- Following is an example of the S3 Access log mapping CSV file, this example file is stored here:

- Once mapping file is prepared, upload it to the Artifact S3 bucket
- Next step is to prepare the metadata file in CSV format. A sample file is already present at Artifact-Bucket/config/mapping location for your reference. Following are the inputs and description.
- source_log_type : Name of the source. This should be consistent and same across all inputs.
- load_type : This can be “historical” or “daily”
- batch_load_type : This determines the target partition type. This can be “daily”,“monthly”,“yearly”.
- daily_load_number_of_days: In case of “load_type” as “daily”. This determines how many days to go back. If the value is set to “1”, it means that if the job runs today, it will pick up the data from previous days’ partition in the source bucket.
- default_values: If there are default static values which are mandatory columns in target but not present in source, this flag should be set to “true”. The values can be “true” or “false”
- delimiter : Delimiter of the source file ( For example - |, “ ”). If the delimiter is whitespace, it should be set to “ ”.
- source_format : Format of the source file. This can be (json, parquet, csv, txt)
- header : “true” or “false”. This specifies if the source dataset has header/field names in it.
- historical_days_to_process : Number of days to process in the past starting today.
- historical_load_start_date : If the start date and end date is not relative to the current date, this column can be used to specify the start date ( Format - yyyy-MM-dd).
- historical_load_end_date : If the start date and end date is not relative to the current date, this column can be used to specify the end date ( Format - yyyy-MM-dd).
- batch_load_type: This specifies how the historical data needs to be loaded. This can be “monthly”,“yearly” or “daily”
- is_source_partitioned : This should be set to “true” if the source is partitioned based on year, month and day.
- source_log_s3_bucket : Bucket name of source files.
- source_log_s3_prefix : S3 prefix of the source files
- source_log_timestamp_format : If the source files have timestamp fields, the format should be specified, for example [dd/MMM/yyyy:HH:mm:ssZZZZ]. If the source column has timestamp as epoch, than the value of this field should "epoch".
- source_log_ts_and_delimiter_as_whitespaces : This should be set to true if the delimiter is a whitespace and the timestamp columns have whitespaces in it.
- target_s3_bucket : Bucket name where the transformed OCSF parquet files will be ingested.
- target_s3_prefix : S3 prefix where the transformed OCSF parquet files will be ingested.
- mapping_file_s3_bucket : Bucket name where OCSF mapping file will be uploaded
- mapping_file_s3_prefix: Mapping file S3 prefix
- hive_style_partition : If the source is partitioned in the format (year=xxxx/month=xx/day=xx) then this should be set to “true”. If the source is partitioned in the format (xxxx/xx/xx) then this should be set to “false”.
- hour_level_partition : If the hour level partition is present in the source , this should be set to “true”.
- is_multiline : Should be set to “true” if the source data spans across multiple lines.
- is_enrichment_required : ”true“ or ”false“
- enrichment_column : Column in the source which needs to be “enriched”. This is only required if “is_enrichment_required” is set to “true”.
- enrichment_col_db : Column in the JDBC data source which will be used for aligning with the enrichment column in source.This is only required if “is_enrichment_required” is set to “true”.
- enrichment_db : Database name of the JDBC source.This is only required if “is_enrichment_required” is set to “true”.
- enrichment_table : Name of the source table name for enrichment.This is only required if “is_enrichment_required” is set to “true”.
- enrichment_identifier: Secrets manager string with the username/password and JDBC details of the database
- ocsf_class_id : OCSF class id of the mapping
- ocsf_class_name : OCSF class name of the mapping.
- After the Metadata file is uploaded, a S3 event notification gets triggered and Insert Metadata will be triggered which will update the Metadata Dynamo DB table.
- This lambda will invoke “Update Mapping” lambda which will update the corresponding Mapping DynamoDB table.
- If enrichment is required, the corresponding enrichment details( example- JDBC table, database etc.) should be specified in the metadata CSV file. Details are mentioned above.
- The credentials of the enrichment source should be stored in secrets manager and the secrets manager string should be specified in the metadata CSV file.
- The job will be triggered based on the CRON schedule. It can even be triggered manually from Step function.
In order to run the step function, it needs an input which will be passed via EventBridge schedule. The example input parameters are as follows:
{
"source_log_type": "s3-access-log",
"load_type": "historical",
"full_load": "false",
"execution_engine": "glue",
"ddb_lookup_table": "asl-etl-framework-table-details",
"ddb_mapping_table": "asl-etl-framework-ocsf-attribute-mapping",
"ddb_metadata_table": "asl-etl-framework-source-ocsf-metadata",
"ddb_reference_table": "asl-etl-framework-ocsf-reference",
"asl_status_table": "asl-etl-framework-run-status",
"asl_job_name": "asl-etl-framework-init-ocsf-conversion"
}
Description of each step
- lambda checkpointing invoke : This step runs a preprocessor lambda function and returns the partitions to process based on load_type.If load_type is “daily”, it will return partition at daily level to process. If the load_type is “historical”, it will return partitions at monthly level to process.
- Execution Engine choice state: This step routes to respective map states for Glue and EMR-Serverless. This decision is based on input passed for execution_engine.The valid values are “glue” or “emr-serverless”.
- Glue Map - This step runs parallel glue jobs for each partition (month) returned by checkpointing lambda step. If there are any errors captured, if any of the glue jobs failed, it captures those errors in the “Some Map Failed” step. This step waits for each glue job to finish and returns the job run status if it failed or succeeded.
- EMR-Serverless Map - This step invokes parallel emr-serverless job for each partition returned by checkpointing lambda step.This step does not wait for emr-serverless to finish the job. We need to track the status of each run from EMR Serverless application.
- Gather Map Results - This step connects the results from map step ( Glue or EMR-Serverless). If the glue map step is successful, it goes to the pass step. If the emr-serverless map step is successful, it goes to SNS notification step to send successful job submission alert.
- SNS Job Submission Alert Step - This step sends a SNS notification to subscribers to alert them that “EMR Serverless Job has been submitted. Please track the status via EMR Serverless Console”.
- Capture Errors Step - This step captures errors of checkpointing lambda, glue map or EMR Serverless map steps if any errors and captured during the execution of each step.
- Choice Step (Success or Failed) - This step determines if any of the above step failed and captures the error.
- SNS Notification for Failure - If Choice state has captured any errors, it sends out SNS notification alert to subscribers with the step and reason for failure.
- Fail step - If there are any errors captured as per Choice step, it marks the step function execution as failed.
- Pass step - If there are no errors captured by choice state, it marks the step function as succeeded for execution.
- End - This step completes the execution of step function.
- In case the job fails an alert will be sent out to the email id specified in the SNS configuration.
- If the job needs to be restarted, it has to be done manually from eventbridge.
- In case of historical run, if load from start with full load is required “full_load” can be set to “true”.
- In case the job runs with “emr-serverless” as the execution engines, a job submission notification is also sent.
- asl-etl-framework-ddb-table-details - This is the lookup table which has all the Dynamo DB table details along with their respective keys.
- asl-etl-framework-ocsf-attribute-mapping - This table contains the OCSF mapping details.
- asl-etl-framework-ocsf-reference - This table contains the reference data such as “enum” values or values to be set for a corresponding value in the source. The data for this table and is extracted from the OCSF API by a Lambda job and ingested into this table.
- asl-etl-framework-ocsf-run-status - This is a checkpointing table which maintains the “run status“ of the job. If the load is completed, the ”load_completed“ column is set to ”true“ by the Glue/EMR job.
- asl-etl-framework-source-ocsf-metadata - This table contains the metadata input by the user in a CSV file.
Historical load can be done based on months to be processed. The user can provide the “load_start_date” and the “load_end_date” for the processing. The user can also provide “days_to_process”. In this case the load_end_date is considered to be the current date and load_start_date is calculated based on the value of “days_to_process”.Based on these dates, the framework calculates the months to be processed. Each month of data is executed in parallel and the transformation to OCSF is done.
Below are some examples:
Scenario 1 : User provides “days to process” with “full_load” as false:
-
User input in the eventbridge:
{ “source_type”:“s3-access-log”, “load_type“:”historical“, “full_load”:“false” } -
Metadata CSV file entries:
- “days_to_process”:“35”
- “historical_load_start_date”:“”,
- historical_load_end_date”:“”
In the above scenario, “historical_load_start_date” and “historical_load_end_date” can be left empty. The framework calculates the start date going back the number of days mentioned in “days_to_process”. The load_end_date is the current date. The framework calculates the “months” between the two dates and divides the processing into parallel “months” job. For example, if 5 months of data is calculated between “start_date” and “end_date”, 5 parallel Glue/EMR jobs are triggered.
Scenario 2: User provides “days to process” with “full_load” as true:
This scenario is same as scenario 1 except for the “full_load” flag. The framework maintains the “checkpoint” of how many months have been processed. In case of a failure, only the “months” not processed are triggered again. If the load has to be done from the start irrespective of the months of data already processed, “full_load” flag can be set to true in the Eventbridge JSON payload. This flag overrides the “checkpointing” and reloads all the “months” of data irrespective of the months already processed.
Scenario 3 : User provides “historical_load_start_date“ and ”historical_load_end_date with “full_load” as false:
-
User input in the eventbridge:
{ “source_type”:“s3-access-log”, “load_type“:”historical“, “full_load”:“false” } -
Metadata CSV file entries:
- “days_to_process”:“”
- “historical_load_start_date”:“2023-05-22”,
- historical_load_end_date”:““2023-06-22””
In the above scenario, “days_to_process” can be left empty. The framework calculates the “months” between the two dates and divides the processing into parallel “months” job. In the above example the framework triggers two Glue/EMR jobs with inputs as “2023-05” and “2023-06” respectively.
Scenario 4 : User provides “historical_load_start_date“ and ”historical_load_end_date with “full_load” as true:
This scenario is same as scenario 3 except for the “full_load” flag. The framework maintains the “checkpoint” of how many months have been processed. In case of a failure, only the “months” not processed are triggered again. If the load has to be done from the start irrespective of the months of data already processed, “full_load” flag can be set to true in the Eventbridge JSON payload. This flag overrides the “checkpointing” and reloads all the “months” of data irrespective of the months already processed.
Scenario 5 : User provides “days_to_process” as well as “historical_load_start_date“ and ”historical_load_end_date:
In this case “historical_start_date” and “historical_end_date” are ignored and the processing happens as per the process described in Scenario 1
Enrichment is a process where data from a JDBC source(On-Prem/AWS) can be used to “enrich” the final OCSF parquet dataset as an “enrichment” column. User needs to provide the below values in the metadata CSV file:
- is_enrichment_required : This should be set to “true”.
- enrichment_column : Column in the source which needs to be “enriched”.
- enrichment_col_db : Column in the JDBC data source which will be used for aligning with the enrichment column in source.
- enrichment_db : Database name of the JDBC source.
- enrichment_table : Name of the source table name for enrichment.
- enrichment_identifier : Secrets manager string with the username/password and JDBC details of the database
The VPC and security groups should be preconfigured with one security group having a self referencing rule as an inbound rule. Also, the framework currently supports all resources running in the same VPC. So if the database resides in AWS within a VPC, the glue or EMR job should run in the same VPC.
- In case of “glue” execution engine, the CDK code can be changed to modify/add glue parameters for Spark configurations.
- In case of “emr-serverless”, spark submit parameters are stored in the
config.pyat the cdk root directory. They can be changed and cdk redeployed to make changes - Based on use case, "Pre-initialized capacity" of EMR-Serverless application can be modified to optimize the performance.
- In a typical ETL use case, a common best practice is to compact a large number of small files into a smaller number of larger files. We recommend to use custom compaction script for creating small number of larger files. You may refer this compaction aws-glue-blueprint-libs ->> https://github.com/awslabs/aws-glue-blueprint-libs/tree/master/samples/compaction
- For historical load, EMR is recommended for better performance.
- In case of EMR, Spark Configuration can be adjusted based on data volume and specific use cases for performance optimization, steps are mentioned in "How to change Spark Submit configurations?" section.
- The framework currently supports a single OCSF class mapping for each source. If the source involves multiple OCSF class mappings for a single sources, the data for each mapping can reside in different prefixes in the same bucket. The framework can then be used for each prefix as a different source.
- if logs are zipped in gzip format, parallel processing in Spark can be impacted leading to slow performance. It is recommended to unzip the files before using this framework. You may customize this utility based on your use case for this purpose --> https://gitlab.aws.dev/prtkumar/gzip-decompression-utility

