Data Transformation with dbt
Step 1: Install the libraries
requirements.txt
dbt-core==1.2.0
dbt-postgres==1.1.1
psycopg2-binary==2.9.3
ipython-sql==0.4.1
boto3==1.24.31
Use the following shell command to install the libraries:
pip install -r requirements.txt
Extract and Load
In this part, we extract the data from the NYC Taxi website and load into postgres.
# dataset 1 - getting a sample of 1000 records for faster processing
yellow_tripdata_df =pd.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-05.parquet").sample(1000)
# select only few columns that we are interested in
yellow_tripdata_df = yellow_tripdata_df[['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'PULocationID', 'DOLocationID', 'fare_amount']]
# rename the columns
yellow_tripdata_df.columns = ['vendor_id', 'pickup_datetime', 'dropoff_datetime', 'passenger_count', 'pickup_location_id', 'dropoff_location_id', 'fare_amount']
# dataset 2
lookup_zone = pd.read_csv('https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv')
# rename the columns
lookup_zone.columns = ['locationid','borough','zone','service_zone']
# Setup the credentials
def get_secret(secret_name):
    region_name = "us-east-1"
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name)
    get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    get_secret_value_response = json.loads(get_secret_value_response['SecretString'])
    return get_secret_value_response
db_credentials = get_secret(secret_name='wysde')
USERNAME = db_credentials["RDS_POSTGRES_USERNAME"]
PASSWORD = db_credentials["RDS_POSTGRES_PASSWORD"]
HOST = "database-1.cy8ltogyfgas.us-east-1.rds.amazonaws.com"
PORT = 5432
DBNAME = "sparsh"
CONN = f"postgresql://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DBNAME}"
# load the data into our postgres database
alchemyEngine = create_engine(CONN, pool_recycle=3600);
postgreSQLConnection = alchemyEngine.connect();
DBT_SCHEMA = "dbt_taxi"
lookup_zone.to_sql('taxi_zone_lookup', postgreSQLConnection, if_exists='replace', schema=DBT_SCHEMA, index=False)
yellow_tripdata_df.to_sql('yellow_taxi_trips', postgreSQLConnection, if_exists='replace', schema=DBT_SCHEMA, index=False);
postgreSQLConnection.close();
Validate the load:
%reload_ext sql
%sql {CONN}
%sql select * from {DBT_SCHEMA}.yellow_taxi_trips limit 10;
%sql select count(*) from {DBT_SCHEMA}.yellow_taxi_trips;
%sql select * from {DBT_SCHEMA}.taxi_zone_lookup limit 10;
%sql select count(*) from {DBT_SCHEMA}.taxi_zone_lookup;
Step 2: Setup the dbt project
This command will create a dbt project named nyctaxi:
dbt init nyctaxi
%cd nyctaxi
Step 3: Design the models
Staging Schema
./models/staging/schema.yml
version: 2
sources:
  - name: source
    schema: dbt_taxi
    tables:
      - name: yellow_taxi_trips
      - name: taxi_zone_lookup
models:
  - name: taxi_zone_lookup_model
    description: "A list of all taxi zones with codes in NYC"
    columns:
      - name: locationid
        tests:
          - not_null
      - name: borough
        tests:
          - not_null
      - name: zone
        tests:
          - not_null
      - name: service_zone
        tests:
          - not_null
  - name: yellow_taxi_trips_models
    description: "A reduced version of yellow taxi trip data in NYC"
    columns:
      - name: vendor_id
        tests:
          - not_null
          - accepted_values:
              values: ['1', '2', '4']
      - name: pickup_datetime
        tests:
          - not_null
      - name: dropoff_datetime
        tests:
          - not_null
      - name: passenger_count
        tests:
          - not_null
      - name: pickup_location_id
        tests:
          - not_null
      - name: dropoff_location_id
        tests:
          - not_null
      - name: fare_amount
        tests:
          - not_null
Staging model 1
./models/staging/yellow_taxi_trips_models.sql
select 
    vendor_id,
    pickup_datetime, 
    dropoff_datetime, 
    passenger_count, 
    pickup_location_id, 
    dropoff_location_id, 
    fare_amount
from {{ source('source', 'yellow_taxi_trips') }}
Staging model 2
./models/staging/taxi_zone_lookup_model.sql
select 
    locationid,
    borough,
    zone,
    service_zone
from {{ source('source', 'taxi_zone_lookup') }}
Serving schema
./models/schema.yml
version: 2
models:
  - name: trips_with_borough_name
    description: "Combines taxi rides with the borough names for pickup and dropoff locations."
    columns:
      - name: vendor_id
      - name: pickup_datetime
      - name: dropoff_datetime
      - name: pickup_borough
      - name: dropoff_borough
      - name: passenger_count
      - name: fare_amount
Serving model 1
We will now create another dbt model, which combines data from the two staging models. Let's assume we want to write a query to join the staging tables on the location ID fields and add the actual location names to the pickup and dropoff locations of the taxi ride data.
./models/trips_with_borough_name_model.sql
select
    t.vendor_id,
    t.pickup_datetime,
    t.dropoff_datetime,
    z1.borough as pickup_borough,
    z2.borough as dropoff_borough,
    t.passenger_count,
    t.fare_amount
from {{ ref('yellow_taxi_trips_models') }} t
left join {{ ref('taxi_zone_lookup_model') }} z1
on t.pickup_location_id = z1.locationid
left join {{ ref('taxi_zone_lookup_model') }} z2
on t.dropoff_location_id = z2.locationid
Step 4: Run the models
dbt run
The output will be like this:
[0m02:52:30  Running with dbt=1.2.0
[0m02:52:31  [[33mWARNING[0m]: Did not find matching node for patch with name 'trips_with_borough_name' in the 'models' section of file 'models/schema.yml'
[0m02:52:31  Found 5 models, 4 tests, 0 snapshots, 0 analyses, 245 macros, 0 operations, 0 seed files, 2 sources, 0 exposures, 0 metrics
[0m02:52:31  
[0m02:52:40  Concurrency: 1 threads (target='dev')
[0m02:52:40  
[0m02:52:40  1 of 5 START table model dbt_taxi.my_first_dbt_model ........................... [RUN]
[0m02:52:44  1 of 5 OK created table model dbt_taxi.my_first_dbt_model ...................... [[32mSELECT 2[0m in 4.12s]
[0m02:52:44  2 of 5 START view model dbt_taxi.taxi_zone_lookup_model ........................ [RUN]
[0m02:52:48  2 of 5 OK created view model dbt_taxi.taxi_zone_lookup_model ................... [[32mCREATE VIEW[0m in 3.71s]
[0m02:52:48  3 of 5 START view model dbt_taxi.yellow_taxi_trips_models ...................... [RUN]
[0m02:52:52  3 of 5 OK created view model dbt_taxi.yellow_taxi_trips_models ................. [[32mCREATE VIEW[0m in 3.95s]
[0m02:52:52  4 of 5 START view model dbt_taxi.my_second_dbt_model ........................... [RUN]
[0m02:52:55  4 of 5 OK created view model dbt_taxi.my_second_dbt_model ...................... [[32mCREATE VIEW[0m in 3.72s]
[0m02:52:55  5 of 5 START view model dbt_taxi.trips_with_borough_name_model ................. [RUN]
[0m02:52:59  5 of 5 OK created view model dbt_taxi.trips_with_borough_name_model ............ [[32mCREATE VIEW[0m in 3.51s]
[0m02:53:02  
[0m02:53:02  Finished running 1 table model, 4 view models in 0 hours 0 minutes and 31.09 seconds (31.09s).
[0m02:53:02  
[0m02:53:02  [32mCompleted successfully[0m
[0m02:53:02  
[0m02:53:02  Done. PASS=5 WARN=0 ERROR=0 SKIP=0 TOTAL=5
Step 5: Generate and analyze the docs
dbt docs generate
dbt also provides the facility to serve the doc site:
dbt docs serve
In the doc site, you will also find a lineage graph that will look like this:
