Converting CSV to Parquet with AWS Lambda Trigger
Create an S3 bucket and IAM user with user-defined policy. Create Lambda layer and lambda function and add the layer to the function. Add S3 trigger for auto-transformation from csv to parquet and query with Glue.
Process flow
Create IAM role and policy
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
Once the trust policy is ready that AWS lambda will use to assume the role, we will use AWS cli to create the role:
aws iam create-role --role-name {role_name} --assume-role-policy-document file://role-trust.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:PutLogEvents",
"logs:CreateLogGroup",
"logs:CreateLogStream"
],
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": [
"s3:*"
],
"Resource": [
"arn:aws:s3:::wysde2-test/*",
"arn:aws:s3:::wysde2-test"
]
},
{
"Effect": "Allow",
"Action": [
"glue:*"
],
"Resource": "*"
}
]
}
We are also creating a policy using AWS cli:
aws iam create-policy --policy-name {policy_name} --policy-document file://lamdba-etl-policy.json
Now we are going to attach the policy to the role:
aws iam attach-role-policy --policy-arn {policy_arn} --role-name {role_name}
Setup the Lambda layer
We will be using AWS wrangler python library which is not available in the AWS lambda and therefore we have to create a lambda layer of this library that we will attach to our lambda function later:
wget -q --show-progress https://github.com/awslabs/aws-data-wrangler/releases/download/2.10.0/awswrangler-layer-2.10.0-py3.8.zip
aws lambda publish-layer-version --layer-name {layer_name} \
--description "this enables the usage of aws data wrangler library in lambda" \
--zip-file fileb://awswrangler-layer-2.10.0-py3.8.zip \
--compatible-runtimes python3.8 \
--cli-connect-timeout 6000
This layer is as per the lambda's python version 3.8 that we would use. But feel free to change the version of lambda and layer. Just make sure these versions are compatible.
Develop the python code
import boto3
import awswrangler as wr
from urllib.parse import unquote_plus
def lambda_handler(event, context):
# Get the source bucket and object name as passed to the Lambda function
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = unquote_plus(record['s3']['object']['key'])
# We will set the DB and table name based on the last two elements of
# the path prior to the file name. If key = 'dms/sakila/film/LOAD01.csv',
# then the following lines will set db to sakila and table_name to 'film'
key_list = key.split("/")
print(f'key_list: {key_list}')
# db_name = key_list[len(key_list)-3]
# table_name = key_list[len(key_list)-2]
db_name = key_list[-4]
table_name = key_list[-2]
print(f'Bucket: {bucket}')
print(f'Key: {key}')
print(f'DB Name: {db_name}')
print(f'Table Name: {table_name}')
input_path = f"s3://{bucket}/{key}"
print(f'Input_Path: {input_path}')
output_path = f"s3://wysde2-test/{db_name}/cleaned/{table_name}"
print(f'Output_Path: {output_path}')
input_df = wr.s3.read_csv([input_path])
current_databases = wr.catalog.databases()
wr.catalog.databases()
if db_name not in current_databases.values:
print(f'- Database {db_name} does not exist ... creating')
wr.catalog.create_database(db_name)
else:
print(f'- Database {db_name} already exists')
result = wr.s3.to_parquet(
df=input_df,
path=output_path,
dataset=True,
database=db_name,
table=table_name,
mode="append")
print("RESULT: ")
print(f'{result}')
return result
Setup the AWS Lambda
Go the the AWS Lambda console and create a lambda function. You can name it anythin. Here is the checklist you need to follow to configure the lambda:
- Select the role that we created.
- Also select the layer that we created.
- Copy the code and deploy it.
- Add an S3 trigger with
.csv
suffix filter. Select S3 bucket path.
Run the pipeline
Now upload any csv file into the S3 bucket where lambda is listening on. The lambda will be triggered and push the converted parquet file in the destination path and also update the glue catalog.