dbt-data-transformation

dbt Data Transformation

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "dbt-data-transformation" with this command: npx skills add manutej/luxor-claude-marketplace/manutej-luxor-claude-marketplace-dbt-data-transformation

dbt Data Transformation

A comprehensive skill for mastering dbt (data build tool) for analytics engineering. This skill covers model development, testing strategies, documentation practices, incremental builds, Jinja templating, macro development, package management, and production deployment workflows.

When to Use This Skill

Use this skill when:

  • Building data transformation pipelines for analytics and business intelligence

  • Creating a data warehouse with modular, testable SQL transformations

  • Implementing ELT (Extract, Load, Transform) workflows

  • Developing dimensional models (facts, dimensions) for analytics

  • Managing complex SQL dependencies and data lineage

  • Creating reusable data transformation logic across projects

  • Testing data quality and implementing data contracts

  • Documenting data models and business logic

  • Building incremental models for large datasets

  • Orchestrating dbt with tools like Airflow, Dagster, or dbt Cloud

  • Migrating legacy ETL processes to modern ELT architecture

  • Implementing DataOps practices for analytics teams

Core Concepts

What is dbt?

dbt (data build tool) enables analytics engineers to transform data in their warehouse more effectively. It's a development framework that brings software engineering best practices to data transformation:

  • Version Control: SQL transformations as code in Git

  • Testing: Built-in data quality testing framework

  • Documentation: Auto-generated, searchable data dictionary

  • Modularity: Reusable SQL through refs and macros

  • Lineage: Automatic dependency resolution and visualization

  • Deployment: CI/CD for data transformations

The dbt Workflow

  1. Develop: Write SQL SELECT statements as models
  2. Test: Define data quality tests
  3. Document: Add descriptions and metadata
  4. Build: dbt run compiles and executes models
  5. Test: dbt test validates data quality
  6. Deploy: CI/CD pipelines deploy to production

Key dbt Entities

  • Models: SQL SELECT statements that define data transformations

  • Sources: Raw data tables in your warehouse

  • Seeds: CSV files loaded into your warehouse

  • Tests: Data quality assertions

  • Macros: Reusable Jinja-SQL functions

  • Snapshots: Type 2 slowly changing dimension captures

  • Exposures: Downstream uses of dbt models (dashboards, ML models)

  • Metrics: Business metric definitions

Model Development

Basic Model Structure

A dbt model is a SELECT statement saved as a .sql file:

-- models/staging/stg_orders.sql

with source as ( select * from {{ source('jaffle_shop', 'orders') }} ),

renamed as ( select id as order_id, user_id as customer_id, order_date, status, _etl_loaded_at from source )

select * from renamed

Key Points:

  • Models are SELECT statements only (no DDL)

  • Use CTEs (Common Table Expressions) for readability

  • Reference sources with {{ source() }}

  • dbt handles CREATE/INSERT logic based on materialization

The ref() Function

Reference other models using {{ ref() }} :

-- models/marts/fct_orders.sql

with orders as ( select * from {{ ref('stg_orders') }} ),

customers as ( select * from {{ ref('stg_customers') }} ),

joined as ( select orders.order_id, orders.order_date, customers.customer_name, orders.status from orders left join customers on orders.customer_id = customers.customer_id )

select * from joined

Benefits of ref():

  • Builds dependency graph automatically

  • Resolves to correct schema/database

  • Enables testing in dev without affecting prod

  • Powers lineage visualization

The source() Function

Define and reference raw data sources:

models/staging/sources.yml

version: 2

sources:

  • name: jaffle_shop description: Raw data from the Jaffle Shop application database: raw schema: jaffle_shop tables:
    • name: orders description: One record per order columns:
      • name: id description: Primary key for orders tests:
        • unique
        • not_null
      • name: user_id description: Foreign key to customers
      • name: order_date description: Date order was placed
      • name: status description: Order status (completed, pending, cancelled)

-- Reference the source select * from {{ source('jaffle_shop', 'orders') }}

Source Features:

  • Document raw data tables

  • Test source data quality

  • Track freshness with freshness config

  • Separate source definitions from transformations

Model Organization

Recommended project structure:

models/ ├── staging/ # One-to-one with source tables │ ├── jaffle_shop/ │ │ ├── _jaffle_shop__sources.yml │ │ ├── _jaffle_shop__models.yml │ │ ├── stg_jaffle_shop__orders.sql │ │ └── stg_jaffle_shop__customers.sql │ └── stripe/ │ ├── _stripe__sources.yml │ ├── _stripe__models.yml │ └── stg_stripe__payments.sql ├── intermediate/ # Purpose-built transformations │ └── int_orders_joined.sql └── marts/ # Business-defined entities ├── core/ │ ├── _core__models.yml │ ├── dim_customers.sql │ └── fct_orders.sql └── marketing/ └── fct_customer_sessions.sql

Naming Conventions:

  • stg_ : Staging models (one-to-one with sources)

  • int_ : Intermediate models (not exposed to end users)

  • fct_ : Fact tables

  • dim_ : Dimension tables

Materializations

Materializations determine how dbt builds models in your warehouse:

  1. View (Default)

{{ config(materialized='view') }}

select * from {{ ref('base_model') }}

Characteristics:

  • Lightweight, no data stored

  • Query runs each time view is accessed

  • Best for: Small datasets, models queried infrequently

  • Fast to build, slower to query

  1. Table

{{ config(materialized='table') }}

select * from {{ ref('base_model') }}

Characteristics:

  • Full table rebuild on each run

  • Data physically stored

  • Best for: Small to medium datasets, heavily queried models

  • Slower to build, faster to query

  1. Incremental

{{ config( materialized='incremental', unique_key='order_id', on_schema_change='fail' ) }}

select * from {{ source('jaffle_shop', 'orders') }}

{% if is_incremental() %} -- Only process new/updated records where order_date > (select max(order_date) from {{ this }}) {% endif %}

Characteristics:

  • Only processes new data on subsequent runs

  • First run builds full table

  • Best for: Large datasets, event/time-series data

  • Fast incremental builds, maintains historical data

Incremental Strategies:

-- Append (default): Add new rows only {{ config( materialized='incremental', incremental_strategy='append' ) }}

-- Merge: Upsert based on unique_key {{ config( materialized='incremental', unique_key='order_id', incremental_strategy='merge' ) }}

-- Delete+Insert: Delete matching records, insert new {{ config( materialized='incremental', unique_key='order_id', incremental_strategy='delete+insert' ) }}

  1. Ephemeral

{{ config(materialized='ephemeral') }}

select * from {{ ref('base_model') }}

Characteristics:

  • Not built in warehouse

  • Interpolated as CTE in dependent models

  • Best for: Lightweight transformations, avoiding view proliferation

  • No storage, compiled into downstream models

Configuration Comparison

Materialization Build Speed Query Speed Storage Use Case

View Fast Slow None Small datasets, infrequent queries

Table Slow Fast High Medium datasets, frequent queries

Incremental Fast* Fast High Large datasets, time-series data

Ephemeral N/A Varies None Intermediate logic, CTEs

*After initial full build

Testing

Schema Tests

Built-in generic tests defined in YAML:

models/staging/stg_orders.yml

version: 2

models:

  • name: stg_orders description: Staged order data columns:
    • name: order_id description: Primary key tests:

      • unique
      • not_null
    • name: customer_id description: Foreign key to customers tests:

      • not_null
      • relationships: to: ref('stg_customers') field: customer_id
    • name: status description: Order status tests:

      • accepted_values: values: ['placed', 'shipped', 'completed', 'returned', 'cancelled']
    • name: order_total description: Total order amount tests:

      • not_null
      • dbt_utils.expression_is_true: expression: ">= 0"

Built-in Tests:

  • unique : No duplicate values

  • not_null : No null values

  • accepted_values : Value in specified list

  • relationships : Foreign key validation

Custom Data Tests

Create custom tests in tests/ directory:

-- tests/assert_positive_order_totals.sql

select order_id, order_total from {{ ref('fct_orders') }} where order_total < 0

How it works:

  • Test fails if query returns any rows

  • Query should return failing records

  • Can use any SQL logic

Advanced Testing Patterns

-- Test for data freshness -- tests/assert_orders_are_fresh.sql

with latest_order as ( select max(order_date) as max_date from {{ ref('fct_orders') }} )

select max_date from latest_order where max_date < current_date - interval '1 day'

-- Test for referential integrity across time -- tests/assert_no_orphaned_orders.sql

select o.order_id, o.customer_id from {{ ref('fct_orders') }} o left join {{ ref('dim_customers') }} c on o.customer_id = c.customer_id where c.customer_id is null

Testing with dbt_utils

Requires dbt-utils package

models:

  • name: stg_orders columns:
    • name: order_id tests:

      Test for uniqueness across multiple columns

      • dbt_utils.unique_combination_of_columns: combination_of_columns: - order_id - order_date

      Test for sequential values

      • dbt_utils.sequential_values: interval: 1

      Test that values match regex

      • dbt_utils.not_null_proportion: at_least: 0.95

Test Severity Levels

models:

  • name: stg_orders columns:
    • name: order_id tests:
      • unique: severity: error # Fail build (default)
      • not_null: severity: warn # Warning only

Documentation

Model Documentation

models/marts/core/_core__models.yml

version: 2

models:

  • name: fct_orders description: | Order fact table containing one row per order with associated customer and payment information. This is the primary table for order analytics and reporting.

    Grain: One row per order

    Refresh: Incremental, updates daily at 2 AM UTC

    Notes:

    • Includes cancelled orders (filter with status column)
    • Payment info joined from Stripe data
    • Customer info joined from application database

    columns:

    • name: order_id description: Primary key for orders table tests:

      • unique
      • not_null
    • name: customer_id description: | Foreign key to dim_customers. Links to customer who placed the order. Note: May be null for guest checkout orders.

    • name: order_date description: Date order was placed (UTC timezone)

    • name: status description: | Current order status. Possible values:

      • placed: Order received, not yet processed
      • shipped: Order shipped to customer
      • completed: Order delivered and confirmed
      • returned: Order returned by customer
      • cancelled: Order cancelled before shipment
    • name: order_total description: Total order amount in USD including tax and shipping

Documentation Blocks

Create reusable documentation:

<!-- models/docs.md -->

{% docs order_status %}

Order status indicates the current state of an order in our fulfillment pipeline.

StatusDescriptionNext Steps
placedOrder receivedInventory check
shippedEn route to customerTrack shipment
completedDelivered successfullyRequest feedback
returnedCustomer return initiatedProcess refund
cancelledOrder cancelledUpdate inventory

{% enddocs %}

{% docs customer_id %}

Unique identifier for customers. This ID is:

  • Generated by the application on account creation
  • Persistent across orders
  • Used to track customer lifetime value
  • Note: NULL for guest checkouts

{% enddocs %}

Reference documentation blocks:

models:

  • name: fct_orders columns:
    • name: status description: "{{ doc('order_status') }}"

    • name: customer_id description: "{{ doc('customer_id') }}"

Generating Documentation

Generate documentation site

dbt docs generate

Serve documentation locally

dbt docs serve --port 8001

View in browser at http://localhost:8001

Documentation Features:

  • Interactive lineage graph (DAG visualization)

  • Searchable model catalog

  • Column-level documentation

  • Source freshness tracking

  • Test coverage visibility

  • Compiled SQL preview

Documentation Best Practices

  • Document at all levels: Project, models, columns, sources

  • Explain business logic: Why transformations exist

  • Define grain explicitly: One row represents...

  • Note refresh schedules: How often data updates

  • Document assumptions: Edge cases, known issues

  • Link to external resources: Confluence, wiki, dashboards

Incremental Models

Basic Incremental Pattern

{{ config( materialized='incremental', unique_key='event_id' ) }}

with source as ( select event_id, user_id, event_timestamp, event_type, event_properties from {{ source('analytics', 'events') }}

{% if is_incremental() %}
    -- Only process events newer than existing data
    where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}

)

select * from source

Key Components:

  • is_incremental() : True after first run

  • {{ this }} : References current model's table

  • unique_key : Column(s) for deduplication

Incremental with Merge Strategy

{{ config( materialized='incremental', unique_key='order_id', incremental_strategy='merge', merge_update_columns=['status', 'updated_at'], merge_exclude_columns=['created_at'] ) }}

with orders as ( select order_id, customer_id, order_date, status, order_total, current_timestamp() as updated_at, case when status = 'placed' then current_timestamp() else null end as created_at from {{ source('ecommerce', 'orders') }}

{% if is_incremental() %}
    -- Look back 3 days to catch late-arriving updates
    where order_date >= (select max(order_date) - interval '3 days' from {{ this }})
{% endif %}

)

select * from orders

Merge Strategy Features:

  • Updates existing records based on unique_key

  • Inserts new records

  • Optional: Specify which columns to update/exclude

  • Best for: Slowly changing data, updates to historical records

Incremental with Delete+Insert

{{ config( materialized='incremental', unique_key=['date', 'customer_id'], incremental_strategy='delete+insert' ) }}

with daily_metrics as ( select date_trunc('day', order_timestamp) as date, customer_id, count(*) as order_count, sum(order_total) as total_revenue from {{ ref('fct_orders') }}

{% if is_incremental() %}
    where date_trunc('day', order_timestamp) >= (
        select max(date) - interval '7 days' from {{ this }}
    )
{% endif %}

group by 1, 2

)

select * from daily_metrics

Delete+Insert Strategy:

  • Deletes all rows matching unique_key

  • Inserts new rows

  • Best for: Aggregated data, full partition replacement

  • More efficient than merge for bulk updates

Handling Late-Arriving Data

{{ config( materialized='incremental', unique_key='order_id' ) }}

select order_id, customer_id, order_date, status, _loaded_at from {{ source('ecommerce', 'orders') }}

{% if is_incremental() %} -- Use _loaded_at instead of order_date to catch updates where _loaded_at > (select max(_loaded_at) from {{ this }})

-- OR use a lookback window
-- where order_date > (select max(order_date) - interval '3 days' from {{ this }})

{% endif %}

Incremental with Partitioning

{{ config( materialized='incremental', unique_key='event_id', partition_by={ 'field': 'event_date', 'data_type': 'date', 'granularity': 'day' }, cluster_by=['user_id', 'event_type'] ) }}

select event_id, user_id, event_type, event_timestamp, date(event_timestamp) as event_date from {{ source('analytics', 'raw_events') }}

{% if is_incremental() %} where date(event_timestamp) > (select max(event_date) from {{ this }}) {% endif %}

Partition Benefits:

  • Improved query performance

  • Cost optimization (scan less data)

  • Efficient incremental processing

  • Better for time-series data

Full Refresh Capability

Force full rebuild of incremental models

dbt run --full-refresh

Full refresh specific model

dbt run --select my_incremental_model --full-refresh

Macros & Jinja

Basic Macro Structure

-- macros/cents_to_dollars.sql

{% macro cents_to_dollars(column_name, precision=2) %} round({{ column_name }} / 100.0, {{ precision }}) {% endmacro %}

Usage:

select order_id, {{ cents_to_dollars('amount_cents') }} as amount_dollars from {{ ref('stg_orders') }}

Reusable Data Quality Macros

-- macros/test_not_negative.sql

{% macro test_not_negative(model, column_name) %}

select {{ column_name }} from {{ model }} where {{ column_name }} < 0

{% endmacro %}

Date Spine Macro

-- macros/date_spine.sql

{% macro date_spine(start_date, end_date) %}

with date_spine as ( {{ dbt_utils.date_spine( datepart="day", start_date="cast('" ~ start_date ~ "' as date)", end_date="cast('" ~ end_date ~ "' as date)" ) }} )

select date_day from date_spine

{% endmacro %}

Dynamic SQL Generation

-- macros/pivot_metric.sql

{% macro pivot_metric(metric_column, group_by_column, values) %}

select {{ group_by_column }}, {% for value in values %} sum(case when status = '{{ value }}' then {{ metric_column }} else 0 end) as {{ value }}_{{ metric_column }} {% if not loop.last %},{% endif %} {% endfor %} from {{ ref('fct_orders') }} group by 1

{% endmacro %}

Usage:

{{ pivot_metric( metric_column='order_total', group_by_column='customer_id', values=['completed', 'pending', 'cancelled'] ) }}

Grant Permissions Macro

-- macros/grant_select.sql

{% macro grant_select(schema, role) %}

{% set sql %} grant select on all tables in schema {{ schema }} to {{ role }}; {% endset %}

{% do run_query(sql) %} {% do log("Granted select on " ~ schema ~ " to " ~ role, info=True) %}

{% endmacro %}

Usage in hooks:

dbt_project.yml

on-run-end:

  • "{{ grant_select(target.schema, 'analyst_role') }}"

Environment-Specific Logic

-- macros/generate_schema_name.sql

{% macro generate_schema_name(custom_schema_name, node) -%}

{%- set default_schema = target.schema -%}

{%- if target.name == 'prod' -%}
    {%- if custom_schema_name is not none -%}
        {{ custom_schema_name | trim }}
    {%- else -%}
        {{ default_schema }}
    {%- endif -%}

{%- else -%}
    {{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}

{%- endmacro %}

Audit Column Macro

-- macros/add_audit_columns.sql

{% macro add_audit_columns() %}

current_timestamp() as dbt_updated_at,
current_timestamp() as dbt_created_at,
'{{ var("dbt_user") }}' as dbt_updated_by

{% endmacro %}

Usage:

select order_id, customer_id, order_total, {{ add_audit_columns() }} from {{ ref('stg_orders') }}

Jinja Control Structures

-- Conditionals {% if target.name == 'prod' %} from {{ source('production', 'orders') }} {% else %} from {{ source('development', 'orders') }} {% endif %}

-- Loops {% for status in ['placed', 'shipped', 'completed'] %} sum(case when status = '{{ status }}' then 1 else 0 end) as {{ status }}_count {% if not loop.last %},{% endif %} {% endfor %}

-- Set variables {% set payment_methods = ['credit_card', 'paypal', 'bank_transfer'] %}

{% for method in payment_methods %} count(distinct case when payment_method = '{{ method }}' then customer_id end) as {{ method }}_customers {% if not loop.last %},{% endif %} {% endfor %}

Package Management

Installing Packages

packages.yml

packages:

dbt-utils: Essential utility macros

  • package: dbt-labs/dbt_utils version: 1.1.1

Audit helper: Compare datasets

  • package: dbt-labs/audit_helper version: 0.9.0

Codegen: Code generation utilities

  • package: dbt-labs/codegen version: 0.11.0

Custom package from Git

Local package

  • local: ../dbt-shared-macros

Install packages:

dbt deps

Using dbt_utils

-- Surrogate key generation select {{ dbt_utils.generate_surrogate_key(['order_id', 'line_item_id']) }} as order_line_id, order_id, line_item_id from {{ ref('stg_order_lines') }}

-- Union multiple tables {{ dbt_utils.union_relations( relations=[ ref('orders_2022'), ref('orders_2023'), ref('orders_2024') ] ) }}

-- Get column values as list {% set statuses = dbt_utils.get_column_values( table=ref('stg_orders'), column='status' ) %}

-- Pivot table {{ dbt_utils.pivot( column='metric_name', values=dbt_utils.get_column_values(table=ref('metrics'), column='metric_name'), agg='sum', then_value='metric_value', else_value=0, prefix='', suffix='_total' ) }}

Creating Custom Packages

Project structure for a package:

dbt-custom-package/ ├── dbt_project.yml ├── macros/ │ ├── custom_test.sql │ └── custom_macro.sql ├── models/ │ └── example_model.sql └── README.md

dbt_project.yml for custom package

name: 'custom_package' version: '1.0.0' config-version: 2

require-dbt-version: [">=1.0.0", "<2.0.0"]

Package Versioning

Semantic versioning

packages:

  • package: dbt-labs/dbt_utils version: [">=1.0.0", "<2.0.0"] # Any 1.x version

Exact version

  • package: dbt-labs/dbt_utils version: 1.1.1

Git branch/tag

Latest from branch

Production Workflows

CI/CD Pipeline (GitHub Actions)

.github/workflows/dbt_ci.yml

name: dbt CI

on: pull_request: branches: [main]

jobs: dbt_run: runs-on: ubuntu-latest

steps:
  - name: Checkout code
    uses: actions/checkout@v3

  - name: Set up Python
    uses: actions/setup-python@v4
    with:
      python-version: '3.11'

  - name: Install dbt
    run: |
      pip install dbt-core dbt-snowflake

  - name: Install dbt packages
    run: dbt deps

  - name: Run dbt debug
    run: dbt debug
    env:
      DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
      DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
      DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}

  - name: Run dbt models (modified only)
    run: dbt run --select state:modified+ --state ./prod_manifest
    env:
      DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
      DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
      DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}

  - name: Run dbt tests
    run: dbt test --select state:modified+
    env:
      DBT_SNOWFLAKE_ACCOUNT: ${{ secrets.DBT_SNOWFLAKE_ACCOUNT }}
      DBT_SNOWFLAKE_USER: ${{ secrets.DBT_SNOWFLAKE_USER }}
      DBT_SNOWFLAKE_PASSWORD: ${{ secrets.DBT_SNOWFLAKE_PASSWORD }}

Slim CI (Test Changed Models Only)

Store production manifest

dbt compile --target prod cp target/manifest.json ./prod_manifest/

In CI: Test only changed models and downstream dependencies

dbt test --select state:modified+ --state ./prod_manifest

Production Deployment

.github/workflows/dbt_prod.yml

name: dbt Production Deploy

on: push: branches: [main]

jobs: deploy: runs-on: ubuntu-latest

steps:
  - uses: actions/checkout@v3

  - name: Set up Python
    uses: actions/setup-python@v4
    with:
      python-version: '3.11'

  - name: Install dbt
    run: pip install dbt-core dbt-snowflake

  - name: Install packages
    run: dbt deps

  - name: Run dbt seed
    run: dbt seed --target prod

  - name: Run dbt run
    run: dbt run --target prod

  - name: Run dbt test
    run: dbt test --target prod

  - name: Generate docs
    run: dbt docs generate --target prod

  - name: Upload docs to S3
    run: |
      aws s3 sync target/ s3://dbt-docs-bucket/
    env:
      AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
      AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

Orchestration with Airflow

dags/dbt_dag.py

from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime, timedelta

default_args = { 'owner': 'analytics', 'depends_on_past': False, 'email_on_failure': True, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), }

with DAG( 'dbt_production', default_args=default_args, description='Run dbt models in production', schedule_interval='0 2 * * *', # 2 AM daily start_date=datetime(2024, 1, 1), catchup=False, tags=['dbt', 'analytics'], ) as dag:

dbt_deps = BashOperator(
    task_id='dbt_deps',
    bash_command='cd /opt/dbt &#x26;&#x26; dbt deps',
)

dbt_seed = BashOperator(
    task_id='dbt_seed',
    bash_command='cd /opt/dbt &#x26;&#x26; dbt seed --target prod',
)

dbt_run_staging = BashOperator(
    task_id='dbt_run_staging',
    bash_command='cd /opt/dbt &#x26;&#x26; dbt run --select staging.* --target prod',
)

dbt_run_marts = BashOperator(
    task_id='dbt_run_marts',
    bash_command='cd /opt/dbt &#x26;&#x26; dbt run --select marts.* --target prod',
)

dbt_test = BashOperator(
    task_id='dbt_test',
    bash_command='cd /opt/dbt &#x26;&#x26; dbt test --target prod',
)

dbt_docs = BashOperator(
    task_id='dbt_docs',
    bash_command='cd /opt/dbt &#x26;&#x26; dbt docs generate --target prod',
)

# Define task dependencies
dbt_deps >> dbt_seed >> dbt_run_staging >> dbt_run_marts >> dbt_test >> dbt_docs

dbt Cloud Integration

dbt_cloud.yml

Environment configuration

environments:

  • name: Production dbt_version: 1.7.latest type: deployment

  • name: Development dbt_version: 1.7.latest type: development

Job configuration

jobs:

  • name: Production Run environment: Production triggers: schedule: cron: "0 2 * * *" # 2 AM daily commands:

    • dbt deps
    • dbt seed
    • dbt run
    • dbt test
  • name: CI Check environment: Development triggers: github_webhook: true commands:

    • dbt deps
    • dbt run --select state:modified+
    • dbt test --select state:modified+

Monitoring & Alerting

-- macros/post_hook_monitoring.sql

{% macro monitor_row_count(threshold=0) %}

{% if execute %} {% set row_count_query %} select count(*) as row_count from {{ this }} {% endset %}

{% set results = run_query(row_count_query) %}
{% set row_count = results.columns[0].values()[0] %}

{% if row_count &#x3C; threshold %}
    {{ exceptions.raise_compiler_error("Row count " ~ row_count ~ " below threshold " ~ threshold) }}
{% endif %}

{{ log("Model " ~ this ~ " has " ~ row_count ~ " rows", info=True) }}

{% endif %}

{% endmacro %}

Usage:

{{ config( post_hook="{{ monitor_row_count(threshold=1000) }}" ) }}

select * from {{ ref('stg_orders') }}

Best Practices

Naming Conventions

Models:

stg_[source]__[entity].sql # Staging: stg_stripe__payments.sql int_[entity][verb].sql # Intermediate: int_orders_joined.sql fct[entity].sql # Fact: fct_orders.sql dim_[entity].sql # Dimension: dim_customers.sql

Tests:

assert_[description].sql # assert_positive_order_totals.sql

Macros:

[verb]_[noun].sql # generate_surrogate_key.sql

SQL Style Guide

-- ✓ Good: Clear CTEs, proper formatting with orders as ( select order_id, customer_id, order_date, status from {{ ref('stg_orders') }} where status != 'cancelled' ),

customers as ( select customer_id, customer_name, customer_email from {{ ref('dim_customers') }} ),

final as ( select orders.order_id, orders.order_date, customers.customer_name, orders.status from orders left join customers on orders.customer_id = customers.customer_id )

select * from final

-- ✗ Bad: Nested subqueries, poor formatting select o.order_id, o.order_date, c.customer_name, o.status from ( select order_id, customer_id, order_date, status from {{ ref('stg_orders') }} where status != 'cancelled') o left join (select customer_id, customer_name from {{ ref('dim_customers') }}) c on o.customer_id = c.customer_id

Performance Optimization

  1. Use Incremental Models for Large Tables

-- Process only new data {{ config(materialized='incremental') }}

select * from {{ source('events', 'page_views') }} {% if is_incremental() %} where event_timestamp > (select max(event_timestamp) from {{ this }}) {% endif %}

  1. Leverage Clustering and Partitioning

{{ config( materialized='table', partition_by={'field': 'order_date', 'data_type': 'date'}, cluster_by=['customer_id', 'status'] ) }}

  1. Reduce Data Scanned

-- ✓ Good: Filter early with source as ( select * from {{ source('app', 'events') }} where event_date >= '2024-01-01' -- Filter in source CTE )

-- ✗ Bad: Filter late with source as ( select * from {{ source('app', 'events') }} )

select * from source where event_date >= '2024-01-01' -- Filtering after full scan

  1. Use Ephemeral for Simple Transformations

-- Avoid creating unnecessary views {{ config(materialized='ephemeral') }}

select order_id, lower(trim(status)) as status_clean from {{ ref('stg_orders') }}

Project Structure Best Practices

  1. Layer Your Transformations

Staging → Intermediate → Marts ↓ ↓ ↓ 1:1 Purpose-built Business Sources Logic Entities

  1. Modularize Complex Logic

-- Instead of one massive model, break it down:

-- intermediate/int_order_items_aggregated.sql -- intermediate/int_customer_lifetime_value.sql -- intermediate/int_payment_summaries.sql

-- marts/fct_orders.sql (combines intermediate models)

  1. Use Consistent File Organization

models/ ├── staging/ │ └── [source]/ │ ├── [source]sources.yml │ ├── _[source]_models.yml │ └── stg[source][table].sql ├── intermediate/ │ └── int[purpose].sql └── marts/ └── [business_area]/ ├── [area]__models.yml └── [model_type][entity].sql

Testing Strategy

  1. Test at Multiple Levels

Source tests: Data quality at ingestion

sources:

  • name: raw_data tables:
    • name: orders columns:
      • name: id tests: [unique, not_null]

Model tests: Transformation logic

models:

  • name: fct_orders tests:
    • dbt_utils.expression_is_true: expression: "order_total >= 0" columns:
    • name: order_id tests: [unique, not_null]

Custom tests: Business logic

tests/assert_revenue_reconciliation.sql

  1. Use Appropriate Test Severity

Critical tests: error (fail build)

Nice-to-have tests: warn (log but don't fail)

tests:

  • unique: severity: error
  • dbt_utils.not_null_proportion: at_least: 0.95 severity: warn
  1. Test Coverage Goals
  • 100% of primary keys: unique + not_null

  • 100% of foreign keys: relationships tests

  • All business logic: custom data tests

  • Critical calculations: expression tests

Documentation Standards

  1. Document Every Model

models:

  • name: fct_orders description: | Purpose: [Why this model exists] Grain: [One row represents...] Refresh: [When and how often] Consumers: [Who uses this]
  1. Document Complex Logic

-- Use comments for complex business rules select order_id, -- Revenue recognition: Only count completed orders -- cancelled within 30 days (per finance policy 2024-03) case when status = 'completed' and datediff('day', order_date, current_date) > 30 then order_total else 0 end as recognized_revenue from {{ ref('stg_orders') }}

  1. Keep Docs Updated
  • Update docs when logic changes

  • Review docs during code reviews

  • Generate docs regularly: dbt docs generate

20 Detailed Examples

Example 1: Basic Staging Model

-- models/staging/jaffle_shop/stg_jaffle_shop__customers.sql

with source as ( select * from {{ source('jaffle_shop', 'customers') }} ),

renamed as ( select id as customer_id, first_name, last_name, first_name || ' ' || last_name as customer_name, email, _loaded_at from source )

select * from renamed

Example 2: Fact Table with Multiple Joins

-- models/marts/core/fct_orders.sql

{{ config( materialized='table', tags=['core', 'daily'] ) }}

with orders as ( select * from {{ ref('stg_jaffle_shop__orders') }} ),

customers as ( select * from {{ ref('dim_customers') }} ),

payments as ( select order_id, sum(amount) as total_payment_amount from {{ ref('stg_stripe__payments') }} where status = 'success' group by 1 ),

final as ( select orders.order_id, orders.customer_id, customers.customer_name, orders.order_date, orders.status, coalesce(payments.total_payment_amount, 0) as order_total, {{ add_audit_columns() }} from orders left join customers on orders.customer_id = customers.customer_id left join payments on orders.order_id = payments.order_id )

select * from final

Example 3: Incremental Event Table

-- models/marts/analytics/fct_page_views.sql

{{ config( materialized='incremental', unique_key='page_view_id', partition_by={ 'field': 'event_date', 'data_type': 'date', 'granularity': 'day' }, cluster_by=['user_id', 'page_path'] ) }}

with events as ( select event_id as page_view_id, user_id, session_id, event_timestamp, date(event_timestamp) as event_date, event_properties:page_path::string as page_path, event_properties:referrer::string as referrer, _loaded_at from {{ source('analytics', 'raw_events') }} where event_type = 'page_view'

{% if is_incremental() %}
    -- Use _loaded_at to catch late-arriving data
    and _loaded_at > (select max(_loaded_at) from {{ this }})
{% endif %}

),

enriched as ( select page_view_id, user_id, session_id, event_timestamp, event_date, page_path, referrer, -- Parse URL components split_part(page_path, '?', 1) as page_path_clean, case when referrer like '%google%' then 'Google' when referrer like '%facebook%' then 'Facebook' when referrer is null then 'Direct' else 'Other' end as referrer_source, _loaded_at from events )

select * from enriched

Example 4: Customer Dimension with SCD Type 2

-- models/marts/core/dim_customers.sql

{{ config( materialized='table', unique_key='customer_key' ) }}

with customers as ( select * from {{ ref('stg_jaffle_shop__customers') }} ),

customer_orders as ( select customer_id, min(order_date) as first_order_date, max(order_date) as most_recent_order_date, count(order_id) as total_orders from {{ ref('fct_orders') }} group by 1 ),

final as ( select {{ dbt_utils.generate_surrogate_key(['customers.customer_id', 'customers._loaded_at']) }} as customer_key, customers.customer_id, customers.customer_name, customers.email, customer_orders.first_order_date, customer_orders.most_recent_order_date, customer_orders.total_orders, case when customer_orders.total_orders >= 10 then 'VIP' when customer_orders.total_orders >= 5 then 'Regular' when customer_orders.total_orders >= 1 then 'New' else 'Prospect' end as customer_segment, customers._loaded_at as effective_from, null as effective_to, true as is_current from customers left join customer_orders on customers.customer_id = customer_orders.customer_id )

select * from final

Example 5: Aggregated Metrics Table

-- models/marts/analytics/daily_order_metrics.sql

{{ config( materialized='incremental', unique_key=['metric_date', 'status'], incremental_strategy='delete+insert' ) }}

with orders as ( select * from {{ ref('fct_orders') }}

{% if is_incremental() %}
    where order_date >= (select max(metric_date) - interval '7 days' from {{ this }})
{% endif %}

),

daily_metrics as ( select date_trunc('day', order_date) as metric_date, status, count(distinct order_id) as order_count, count(distinct customer_id) as unique_customers, sum(order_total) as total_revenue, avg(order_total) as avg_order_value, min(order_total) as min_order_value, max(order_total) as max_order_value, percentile_cont(0.5) within group (order by order_total) as median_order_value from orders group by 1, 2 )

select * from daily_metrics

Example 6: Pivoted Metrics Using Macro

-- models/marts/analytics/customer_order_status_summary.sql

with orders as ( select customer_id, status, order_total from {{ ref('fct_orders') }} )

select customer_id, {% for status in ['placed', 'shipped', 'completed', 'returned', 'cancelled'] %} sum(case when status = '{{ status }}' then 1 else 0 end) as {{ status }}_count, sum(case when status = '{{ status }}' then order_total else 0 end) as {{ status }}_revenue {% if not loop.last %},{% endif %} {% endfor %} from orders group by 1

Example 7: Snapshot for SCD Type 2

-- snapshots/customers_snapshot.sql

{% snapshot customers_snapshot %}

{{ config( target_schema='snapshots', target_database='analytics', unique_key='customer_id', strategy='timestamp', updated_at='updated_at', invalidate_hard_deletes=True ) }}

select customer_id, customer_name, email, customer_segment, updated_at from {{ source('jaffle_shop', 'customers') }}

{% endsnapshot %}

Example 8: Funnel Analysis Model

-- models/marts/analytics/conversion_funnel.sql

with page_views as ( select user_id, session_id, min(event_timestamp) as session_start from {{ ref('fct_page_views') }} where event_date >= current_date - interval '30 days' group by 1, 2 ),

product_views as ( select distinct user_id, session_id from {{ ref('fct_page_views') }} where page_path like '/product/%' and event_date >= current_date - interval '30 days' ),

add_to_cart as ( select distinct user_id, session_id from {{ ref('fct_events') }} where event_type = 'add_to_cart' and event_date >= current_date - interval '30 days' ),

checkout_started as ( select distinct user_id, session_id from {{ ref('fct_events') }} where event_type = 'checkout_started' and event_date >= current_date - interval '30 days' ),

orders as ( select distinct customer_id as user_id, session_id from {{ ref('fct_orders') }} where order_date >= current_date - interval '30 days' and status = 'completed' ),

funnel as ( select count(distinct page_views.session_id) as sessions, count(distinct product_views.session_id) as product_views, count(distinct add_to_cart.session_id) as add_to_cart, count(distinct checkout_started.session_id) as checkout_started, count(distinct orders.session_id) as completed_orders from page_views left join product_views using (session_id) left join add_to_cart using (session_id) left join checkout_started using (session_id) left join orders using (session_id) ),

funnel_metrics as ( select sessions, product_views, round(100.0 * product_views / nullif(sessions, 0), 2) as pct_product_views, add_to_cart, round(100.0 * add_to_cart / nullif(product_views, 0), 2) as pct_add_to_cart, checkout_started, round(100.0 * checkout_started / nullif(add_to_cart, 0), 2) as pct_checkout_started, completed_orders, round(100.0 * completed_orders / nullif(checkout_started, 0), 2) as pct_completed_orders, round(100.0 * completed_orders / nullif(sessions, 0), 2) as overall_conversion_rate from funnel )

select * from funnel_metrics

Example 9: Cohort Retention Analysis

-- models/marts/analytics/cohort_retention.sql

with customer_orders as ( select customer_id, date_trunc('month', order_date) as order_month from {{ ref('fct_orders') }} where status = 'completed' ),

first_order as ( select customer_id, min(order_month) as cohort_month from customer_orders group by 1 ),

cohort_data as ( select f.cohort_month, c.order_month, datediff('month', f.cohort_month, c.order_month) as months_since_first_order, count(distinct c.customer_id) as customer_count from first_order f join customer_orders c on f.customer_id = c.customer_id group by 1, 2, 3 ),

cohort_size as ( select cohort_month, customer_count as cohort_size from cohort_data where months_since_first_order = 0 ),

retention as ( select cohort_data.cohort_month, cohort_data.months_since_first_order, cohort_data.customer_count, cohort_size.cohort_size, round(100.0 * cohort_data.customer_count / cohort_size.cohort_size, 2) as retention_pct from cohort_data join cohort_size on cohort_data.cohort_month = cohort_size.cohort_month )

select * from retention order by cohort_month, months_since_first_order

Example 10: Revenue Attribution Model

-- models/marts/analytics/revenue_attribution.sql

with touchpoints as ( select user_id, session_id, event_timestamp, case when referrer like '%google%' then 'Google' when referrer like '%facebook%' then 'Facebook' when referrer like '%email%' then 'Email' when referrer is null then 'Direct' else 'Other' end as channel from {{ ref('fct_page_views') }} ),

customer_journeys as ( select t.user_id, o.order_id, o.order_total, t.channel, t.event_timestamp, o.order_date, row_number() over ( partition by o.order_id order by t.event_timestamp ) as touchpoint_number, count(*) over (partition by o.order_id) as total_touchpoints from touchpoints t join {{ ref('fct_orders') }} o on t.user_id = o.customer_id and t.event_timestamp <= o.order_date and t.event_timestamp >= dateadd('day', -30, o.order_date) ),

attributed_revenue as ( select order_id, channel, order_total, -- First touch attribution case when touchpoint_number = 1 then order_total else 0 end as first_touch_revenue, -- Last touch attribution case when touchpoint_number = total_touchpoints then order_total else 0 end as last_touch_revenue, -- Linear attribution order_total / total_touchpoints as linear_revenue, -- Time decay (more recent touchpoints get more credit) order_total * (power(2, touchpoint_number - 1) / (power(2, total_touchpoints) - 1)) as time_decay_revenue from customer_journeys )

select channel, count(distinct order_id) as orders, sum(first_touch_revenue) as first_touch_revenue, sum(last_touch_revenue) as last_touch_revenue, sum(linear_revenue) as linear_revenue, sum(time_decay_revenue) as time_decay_revenue from attributed_revenue group by 1

Example 11: Data Quality Test Suite

-- tests/assert_fct_orders_quality.sql

-- Test multiple data quality rules in one test

with order_quality_checks as ( select order_id, customer_id, order_date, order_total, status,

    -- Check 1: Order total should be positive
    case when order_total &#x3C; 0
        then 'Negative order total' end as check_1,

    -- Check 2: Order date should not be in future
    case when order_date > current_date
        then 'Future order date' end as check_2,

    -- Check 3: Customer ID should exist
    case when customer_id is null
        then 'Missing customer ID' end as check_3,

    -- Check 4: Status should be valid
    case when status not in ('placed', 'shipped', 'completed', 'returned', 'cancelled')
        then 'Invalid status' end as check_4

from {{ ref('fct_orders') }}

),

failed_checks as ( select order_id, check_1, check_2, check_3, check_4 from order_quality_checks where check_1 is not null or check_2 is not null or check_3 is not null or check_4 is not null )

select * from failed_checks

Example 12: Slowly Changing Dimension Merge

-- models/marts/core/dim_products_scd.sql

{{ config( materialized='incremental', unique_key='product_key', merge_update_columns=['product_name', 'category', 'price', 'effective_to', 'is_current'] ) }}

with source_data as ( select product_id, product_name, category, price, updated_at from {{ source('ecommerce', 'products') }}

{% if is_incremental() %}
    where updated_at > (select max(updated_at) from {{ this }} where is_current = true)
{% endif %}

),

{% if is_incremental() %} existing_records as ( select * from {{ this }} where is_current = true ),

changed_records as ( select s.product_id, s.product_name, s.category, s.price, s.updated_at from source_data s join existing_records e on s.product_id = e.product_id and ( s.product_name != e.product_name or s.category != e.category or s.price != e.price ) ),

expire_old_records as ( select e.product_key, e.product_id, e.product_name, e.category, e.price, e.effective_from, c.updated_at as effective_to, false as is_current, e.updated_at from existing_records e join changed_records c on e.product_id = c.product_id ),

new_versions as ( select {{ dbt_utils.generate_surrogate_key(['c.product_id', 'c.updated_at']) }} as product_key, c.product_id, c.product_name, c.category, c.price, c.updated_at as effective_from, null::timestamp as effective_to, true as is_current, c.updated_at from changed_records c ),

combined as ( select * from expire_old_records union all select * from new_versions )

select * from combined

{% else %}

-- First load: all records are current select {{ dbt_utils.generate_surrogate_key(['product_id', 'updated_at']) }} as product_key, product_id, product_name, category, price, updated_at as effective_from, null::timestamp as effective_to, true as is_current, updated_at from source_data

{% endif %}

Example 13: Window Functions for Rankings

-- models/marts/analytics/customer_rfm_score.sql

with customer_metrics as ( select customer_id, max(order_date) as last_order_date, count(order_id) as total_orders, sum(order_total) as total_revenue from {{ ref('fct_orders') }} where status = 'completed' group by 1 ),

rfm_calculations as ( select customer_id, -- Recency: Days since last order datediff('day', last_order_date, current_date) as recency_days, -- Frequency: Total orders total_orders as frequency, -- Monetary: Total revenue total_revenue as monetary,

    -- Recency score (1-5, lower days = higher score)
    ntile(5) over (order by datediff('day', last_order_date, current_date) desc) as recency_score,
    -- Frequency score (1-5, more orders = higher score)
    ntile(5) over (order by total_orders) as frequency_score,
    -- Monetary score (1-5, more revenue = higher score)
    ntile(5) over (order by total_revenue) as monetary_score
from customer_metrics

),

rfm_segments as ( select customer_id, recency_days, frequency, monetary, recency_score, frequency_score, monetary_score, recency_score * 100 + frequency_score * 10 + monetary_score as rfm_score, case when recency_score >= 4 and frequency_score >= 4 and monetary_score >= 4 then 'Champions' when recency_score >= 3 and frequency_score >= 3 and monetary_score >= 3 then 'Loyal Customers' when recency_score >= 4 and frequency_score <= 2 and monetary_score <= 2 then 'Promising' when recency_score >= 3 and frequency_score <= 2 and monetary_score <= 2 then 'Potential Loyalists' when recency_score <= 2 and frequency_score >= 3 and monetary_score >= 3 then 'At Risk' when recency_score <= 2 and frequency_score <= 2 and monetary_score <= 2 then 'Hibernating' when recency_score <= 1 then 'Lost' else 'Need Attention' end as customer_segment from rfm_calculations )

select * from rfm_segments

Example 14: Union Multiple Sources

-- models/staging/stg_all_events.sql

{{ config( materialized='view' ) }}

-- Union events from multiple sources using dbt_utils

{{ dbt_utils.union_relations( relations=[ ref('stg_web_events'), ref('stg_mobile_events'), ref('stg_api_events') ], exclude=['_loaded_at'], -- Exclude source-specific columns source_column_name='event_source' -- Add column to track source ) }}

Example 15: Surrogate Key Generation

-- models/marts/core/fct_order_lines.sql

with order_lines as ( select order_id, line_number, product_id, quantity, unit_price, quantity * unit_price as line_total from {{ source('ecommerce', 'order_lines') }} )

select {{ dbt_utils.generate_surrogate_key(['order_id', 'line_number']) }} as order_line_key, {{ dbt_utils.generate_surrogate_key(['order_id']) }} as order_key, {{ dbt_utils.generate_surrogate_key(['product_id']) }} as product_key, order_id, line_number, product_id, quantity, unit_price, line_total from order_lines

Example 16: Date Spine for Time Series

-- models/marts/analytics/daily_revenue_complete.sql

-- Generate complete date spine to ensure no missing dates

with date_spine as ( {{ dbt_utils.date_spine( datepart="day", start_date="cast('2020-01-01' as date)", end_date="cast(current_date as date)" ) }} ),

daily_revenue as ( select date_trunc('day', order_date) as order_date, sum(order_total) as revenue from {{ ref('fct_orders') }} where status = 'completed' group by 1 ),

complete_series as ( select date_spine.date_day, coalesce(daily_revenue.revenue, 0) as revenue, -- 7-day moving average avg(coalesce(daily_revenue.revenue, 0)) over ( order by date_spine.date_day rows between 6 preceding and current row ) as revenue_7d_ma, -- Month-to-date revenue sum(coalesce(daily_revenue.revenue, 0)) over ( partition by date_trunc('month', date_spine.date_day) order by date_spine.date_day ) as revenue_mtd from date_spine left join daily_revenue on date_spine.date_day = daily_revenue.order_date )

select * from complete_series

Example 17: Custom Schema Macro Override

-- macros/generate_schema_name.sql

{% macro generate_schema_name(custom_schema_name, node) -%}

{%- set default_schema = target.schema -%}

{%- if target.name == 'prod' -%}
    -- Production: Use custom schema names directly
    {%- if custom_schema_name is not none -%}
        {{ custom_schema_name | trim }}
    {%- else -%}
        {{ default_schema }}
    {%- endif -%}

{%- elif target.name == 'dev' -%}
    -- Development: Prefix with dev_username
    {%- if custom_schema_name is not none -%}
        dev_{{ env_var('DBT_USER', 'unknown') }}_{{ custom_schema_name | trim }}
    {%- else -%}
        dev_{{ env_var('DBT_USER', 'unknown') }}
    {%- endif -%}

{%- else -%}
    -- Default: Concatenate target schema with custom schema
    {{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}

{%- endmacro %}

Example 18: Cross-Database Query Macro

-- macros/cross_db_concat.sql

-- Handle database-specific concat syntax

{% macro concat(fields) -%} {{ return(adapter.dispatch('concat', 'dbt_utils')(fields)) }} {%- endmacro %}

{% macro default__concat(fields) -%} concat({{ fields|join(', ') }}) {%- endmacro %}

{% macro snowflake__concat(fields) -%} {{ fields|join(' || ') }} {%- endmacro %}

{% macro bigquery__concat(fields) -%} concat({{ fields|join(', ') }}) {%- endmacro %}

{% macro redshift__concat(fields) -%} {{ fields|join(' || ') }} {%- endmacro %}

Usage:

select {{ concat(['first_name', "' '", 'last_name']) }} as full_name from {{ ref('stg_customers') }}

Example 19: Pre-Hook and Post-Hook Configuration

-- models/marts/core/fct_orders.sql

{{ config( materialized='incremental', unique_key='order_id', pre_hook=[ "delete from {{ this }} where order_date < dateadd('year', -3, current_date)", "{{ log('Starting incremental load for fct_orders', info=True) }}" ], post_hook=[ "create index if not exists idx_fct_orders_customer_id on {{ this }}(customer_id)", "create index if not exists idx_fct_orders_order_date on {{ this }}(order_date)", "{{ grant_select(this, 'analyst_role') }}", "{{ log('Completed incremental load for fct_orders', info=True) }}" ], tags=['core', 'incremental'] ) }}

select * from {{ ref('stg_orders') }}

{% if is_incremental() %} where order_date > (select max(order_date) from {{ this }}) {% endif %}

Example 20: Exposure Definition

models/exposures.yml

version: 2

exposures:

  • name: customer_dashboard description: | Executive dashboard showing customer metrics including:

    owner: name: Analytics Team email: analytics@company.com

    depends_on:

    • ref('fct_orders')
    • ref('dim_customers')
    • ref('customer_rfm_score')
    • ref('cohort_retention')

    tags: ['executive', 'customer-analytics']

  • name: revenue_forecast_model description: | Machine learning model for revenue forecasting. Uses historical order data to predict future revenue. type: ml maturity: medium url: https://mlflow.company.com/models/revenue-forecast

    owner: name: Data Science Team email: datascience@company.com

    depends_on:

    • ref('fct_orders')
    • ref('daily_revenue_complete')

    tags: ['ml', 'forecasting']

Quick Reference Commands

Essential dbt Commands

Install dependencies

dbt deps

Compile project (check for errors)

dbt compile

Run all models

dbt run

Run specific model

dbt run --select fct_orders

Run model and downstream dependencies

dbt run --select fct_orders+

Run model and upstream dependencies

dbt run --select +fct_orders

Run model and all dependencies

dbt run --select +fct_orders+

Run all models in a directory

dbt run --select staging.*

Run models with specific tag

dbt run --select tag:daily

Run models, exclude specific ones

dbt run --exclude staging.*

Run with full refresh (incremental models)

dbt run --full-refresh

Test all models

dbt test

Test specific model

dbt test --select fct_orders

Generate documentation

dbt docs generate

Serve documentation

dbt docs serve

Debug connection

dbt debug

Clean compiled files

dbt clean

Seed CSV files

dbt seed

Snapshot models

dbt snapshot

List resources

dbt ls --select staging.*

Show compiled SQL

dbt show --select fct_orders

Parse project

dbt parse

Model Selection Syntax

By name

--select model_name

By path

--select staging.jaffle_shop.*

By tag

--select tag:daily

By resource type

--select resource_type:model

By package

--select package:dbt_utils

By status (modified, new)

--select state:modified+ --state ./prod_manifest

Combinations (union)

--select model_a model_b

Intersections

--select tag:daily,staging.*

Graph operators

--select +model_name # Upstream dependencies --select model_name+ # Downstream dependencies --select +model_name+ # All dependencies --select @model_name # Model + children/parents to nth degree

Resources

Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: Data Engineering, Analytics Engineering, Data Transformation Compatible With: dbt Core 1.0+, dbt Cloud, Snowflake, BigQuery, Redshift, Postgres, Databricks

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

docker-compose-orchestration

No summary provided by upstream source.

Repository SourceNeeds Review
General

postgresql-database-engineering

No summary provided by upstream source.

Repository SourceNeeds Review
General

jest-react-testing

No summary provided by upstream source.

Repository SourceNeeds Review
General

ui-design-patterns

No summary provided by upstream source.

Repository SourceNeeds Review