Skip to main content

Data Transformation with dbt

Step 1: Install the libraries

requirements.txt
dbt-core==1.2.0
dbt-postgres==1.1.1
psycopg2-binary==2.9.3
ipython-sql==0.4.1
boto3==1.24.31

Use the following shell command to install the libraries:

pip install -r requirements.txt

Extract and Load

In this part, we extract the data from the NYC Taxi website and load into postgres.

# dataset 1 - getting a sample of 1000 records for faster processing
yellow_tripdata_df =pd.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-05.parquet").sample(1000)

# select only few columns that we are interested in
yellow_tripdata_df = yellow_tripdata_df[['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'PULocationID', 'DOLocationID', 'fare_amount']]

# rename the columns
yellow_tripdata_df.columns = ['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count', 'pickup_location_id', 'dropoff_location_id', 'fare_amount']

# dataset 2
lookup_zone = pd.read_csv('https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv')

# rename the columns
lookup_zone.columns = ['locationid','borough','zone','service_zone']

# Setup the credentials
def get_secret(secret_name):
region_name = "us-east-1"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
get_secret_value_response = json.loads(get_secret_value_response['SecretString'])
return get_secret_value_response

db_credentials = get_secret(secret_name='wysde')

USERNAME = db_credentials["RDS_POSTGRES_USERNAME"]
PASSWORD = db_credentials["RDS_POSTGRES_PASSWORD"]
HOST = "database-1.cy8ltogyfgas.us-east-1.rds.amazonaws.com"
PORT = 5432
DBNAME = "sparsh"
CONN = f"postgresql://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DBNAME}"

# load the data into our postgres database
alchemyEngine = create_engine(CONN, pool_recycle=3600);
postgreSQLConnection = alchemyEngine.connect();

DBT_SCHEMA = "dbt_taxi"

lookup_zone.to_sql('taxi_zone_lookup', postgreSQLConnection, if_exists='replace', schema=DBT_SCHEMA, index=False)
yellow_tripdata_df.to_sql('yellow_taxi_trips', postgreSQLConnection, if_exists='replace', schema=DBT_SCHEMA, index=False);

postgreSQLConnection.close();

Validate the load:

%reload_ext sql
%sql {CONN}
%sql select * from {DBT_SCHEMA}.yellow_taxi_trips limit 10;
%sql select count(*) from {DBT_SCHEMA}.yellow_taxi_trips;
%sql select * from {DBT_SCHEMA}.taxi_zone_lookup limit 10;
%sql select count(*) from {DBT_SCHEMA}.taxi_zone_lookup;

Step 2: Setup the dbt project

This command will create a dbt project named nyctaxi:

dbt init nyctaxi
%cd nyctaxi

Step 3: Design the models

Staging Schema

./models/staging/schema.yml
version: 2

sources:
- name: source
schema: dbt_taxi
tables:
- name: yellow_taxi_trips
- name: taxi_zone_lookup

models:
- name: taxi_zone_lookup_model
description: "A list of all taxi zones with codes in NYC"
columns:
- name: locationid
tests:
- not_null
- name: borough
tests:
- not_null
- name: zone
tests:
- not_null
- name: service_zone
tests:
- not_null
- name: yellow_taxi_trips_models
description: "A reduced version of yellow taxi trip data in NYC"
columns:
- name: vendor_id
tests:
- not_null
- accepted_values:
values: ['1', '2', '4']
- name: pickup_datetime
tests:
- not_null
- name: dropoff_datetime
tests:
- not_null
- name: passenger_count
tests:
- not_null
- name: pickup_location_id
tests:
- not_null
- name: dropoff_location_id
tests:
- not_null
- name: fare_amount
tests:
- not_null

Staging model 1

./models/staging/yellow_taxi_trips_models.sql
select 
vendor_id,
pickup_datetime,
dropoff_datetime,
passenger_count,
pickup_location_id,
dropoff_location_id,
fare_amount
from {{ source('source', 'yellow_taxi_trips') }}

Staging model 2

./models/staging/taxi_zone_lookup_model.sql
select 
locationid,
borough,
zone,
service_zone
from {{ source('source', 'taxi_zone_lookup') }}

Serving schema

./models/schema.yml
version: 2

models:
- name: trips_with_borough_name
description: "Combines taxi rides with the borough names for pickup and dropoff locations."
columns:
- name: vendor_id
- name: pickup_datetime
- name: dropoff_datetime
- name: pickup_borough
- name: dropoff_borough
- name: passenger_count
- name: fare_amount

Serving model 1

We will now create another dbt model, which combines data from the two staging models. Let's assume we want to write a query to join the staging tables on the location ID fields and add the actual location names to the pickup and dropoff locations of the taxi ride data.

./models/trips_with_borough_name_model.sql
select
t.vendor_id,
t.pickup_datetime,
t.dropoff_datetime,
z1.borough as pickup_borough,
z2.borough as dropoff_borough,
t.passenger_count,
t.fare_amount
from {{ ref('yellow_taxi_trips_models') }} t
left join {{ ref('taxi_zone_lookup_model') }} z1
on t.pickup_location_id = z1.locationid
left join {{ ref('taxi_zone_lookup_model') }} z2
on t.dropoff_location_id = z2.locationid

Step 4: Run the models

dbt run

The output will be like this:

02:52:30  Running with dbt=1.2.0
02:52:31 [WARNING]: Did not find matching node for patch with name 'trips_with_borough_name' in the 'models' section of file 'models/schema.yml'
02:52:31 Found 5 models, 4 tests, 0 snapshots, 0 analyses, 245 macros, 0 operations, 0 seed files, 2 sources, 0 exposures, 0 metrics
02:52:31
02:52:40 Concurrency: 1 threads (target='dev')
02:52:40
02:52:40 1 of 5 START table model dbt_taxi.my_first_dbt_model ........................... [RUN]
02:52:44 1 of 5 OK created table model dbt_taxi.my_first_dbt_model ...................... [SELECT 2 in 4.12s]
02:52:44 2 of 5 START view model dbt_taxi.taxi_zone_lookup_model ........................ [RUN]
02:52:48 2 of 5 OK created view model dbt_taxi.taxi_zone_lookup_model ................... [CREATE VIEW in 3.71s]
02:52:48 3 of 5 START view model dbt_taxi.yellow_taxi_trips_models ...................... [RUN]
02:52:52 3 of 5 OK created view model dbt_taxi.yellow_taxi_trips_models ................. [CREATE VIEW in 3.95s]
02:52:52 4 of 5 START view model dbt_taxi.my_second_dbt_model ........................... [RUN]
02:52:55 4 of 5 OK created view model dbt_taxi.my_second_dbt_model ...................... [CREATE VIEW in 3.72s]
02:52:55 5 of 5 START view model dbt_taxi.trips_with_borough_name_model ................. [RUN]
02:52:59 5 of 5 OK created view model dbt_taxi.trips_with_borough_name_model ............ [CREATE VIEW in 3.51s]
02:53:02
02:53:02 Finished running 1 table model, 4 view models in 0 hours 0 minutes and 31.09 seconds (31.09s).
02:53:02
02:53:02 Completed successfully
02:53:02
02:53:02 Done. PASS=5 WARN=0 ERROR=0 SKIP=0 TOTAL=5

Step 5: Generate and analyze the docs

dbt docs generate

dbt also provides the facility to serve the doc site:

dbt docs serve

In the doc site, you will also find a lineage graph that will look like this: