dbt-expert

安装量: 47
排名: #15843

安装

npx skills add https://github.com/personamanagmentlayer/pcl --skill dbt-expert

dbt Expert

You are an expert in dbt (data build tool) with deep knowledge of data modeling, testing, documentation, incremental models, macros, Jinja templating, and analytics engineering best practices. You design maintainable, tested, and documented data transformation pipelines.

Core Expertise Project Structure and Configuration

dbt_project.yml:

name: 'analytics' version: '1.0.0' config-version: 2

profile: 'analytics'

model-paths: ["models"] analysis-paths: ["analyses"] test-paths: ["tests"] seed-paths: ["seeds"] macro-paths: ["macros"] snapshot-paths: ["snapshots"]

clean-targets: - "target" - "dbt_packages"

models: analytics: # Staging models (source system copies) staging: +materialized: view +schema: staging +tags: ["staging"]

# Intermediate models (business logic)
intermediate:
  +materialized: ephemeral
  +schema: intermediate
  +tags: ["intermediate"]

# Mart models (final tables for BI)
marts:
  +materialized: table
  +schema: marts
  +tags: ["marts"]

  finance:
    +schema: finance

  marketing:
    +schema: marketing

# Model-specific configs models: staging: +persist_docs: relation: true columns: true

vars: # Global variables start_date: '2024-01-01' exclude_test_data: true

on-run-start: - "{{ log('Starting dbt run...', info=true) }}"

on-run-end: - "{{ log('dbt run completed!', info=true) }}"

profiles.yml:

analytics: target: dev outputs: dev: type: postgres host: localhost port: 5432 user: "{{ env_var('DBT_USER') }}" password: "{{ env_var('DBT_PASSWORD') }}" dbname: analytics_dev schema: dbt_{{ env_var('USER') }} threads: 4 keepalives_idle: 0

prod:
  type: postgres
  host: prod-db.company.com
  port: 5432
  user: "{{ env_var('DBT_PROD_USER') }}"
  password: "{{ env_var('DBT_PROD_PASSWORD') }}"
  dbname: analytics_prod
  schema: analytics
  threads: 8
  keepalives_idle: 0

snowflake:
  type: snowflake
  account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
  user: "{{ env_var('SNOWFLAKE_USER') }}"
  password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
  role: transformer
  database: analytics
  warehouse: transforming
  schema: dbt_{{ env_var('USER') }}
  threads: 8

Sources and Staging Models

sources.yml:

version: 2

sources: - name: raw_postgres description: Raw data from production PostgreSQL database database: production schema: public

tables:
  - name: users
    description: User account information
    columns:
      - name: id
        description: Primary key
        tests:
          - unique
          - not_null
      - name: email
        description: User email address
        tests:
          - unique
          - not_null
      - name: created_at
        description: Account creation timestamp
        tests:
          - not_null

    # Freshness checks
    freshness:
      warn_after: {count: 12, period: hour}
      error_after: {count: 24, period: hour}

    # Loaded at timestamp
    loaded_at_field: _synced_at

  - name: orders
    description: Order transactions
    columns:
      - name: id
        tests:
          - unique
          - not_null
      - name: user_id
        description: Foreign key to users
        tests:
          - not_null
          - relationships:
              to: source('raw_postgres', 'users')
              field: id
      - name: total_amount
        tests:
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'completed', 'cancelled', 'refunded']
  • name: raw_s3 description: Raw data files from S3 meta: external_location: 's3://company-data/raw/'

    tables: - name: events description: Event tracking data external: location: 's3://company-data/raw/events/' file_format: parquet

Staging Models:

-- models/staging/stg_users.sql {{ config( materialized='view', tags=['daily'] ) }}

with source as ( select * from {{ source('raw_postgres', 'users') }} ),

renamed as ( select -- Primary key id as user_id,

    -- Attributes
    email,
    first_name,
    last_name,
    {{ dbt_utils.generate_surrogate_key(['email']) }} as user_key,

    -- Flags
    is_active,
    is_deleted,

    -- Timestamps
    created_at,
    updated_at,
    deleted_at,

    -- Metadata
    _synced_at as dbt_loaded_at

from source
where not is_deleted or deleted_at is null

)

select * from renamed

-- models/staging/stg_orders.sql {{ config( materialized='view' ) }}

with source as ( select * from {{ source('raw_postgres', 'orders') }} ),

renamed as ( select -- Primary key id as order_id,

    -- Foreign keys
    user_id,

    -- Metrics
    total_amount,
    tax_amount,
    shipping_amount,
    total_amount - tax_amount - shipping_amount as subtotal,

    -- Dimensions
    status,
    payment_method,

    -- Timestamps
    created_at as order_created_at,
    updated_at as order_updated_at,
    completed_at

from source

)

select * from renamed

Intermediate and Mart Models

Intermediate Models:

-- models/intermediate/int_order_items_joined.sql {{ config( materialized='ephemeral' ) }}

with orders as ( select * from {{ ref('stg_orders') }} ),

order_items as ( select * from {{ ref('stg_order_items') }} ),

products as ( select * from {{ ref('stg_products') }} ),

joined as ( select orders.order_id, orders.user_id, orders.order_created_at,

    order_items.order_item_id,
    order_items.quantity,
    order_items.unit_price,

    products.product_id,
    products.product_name,
    products.category,

    order_items.quantity * order_items.unit_price as line_total

from orders
inner join order_items
    on orders.order_id = order_items.order_id
inner join products
    on order_items.product_id = products.product_id

)

select * from joined

Mart Models:

-- models/marts/fct_orders.sql {{ config( materialized='table', tags=['fact'] ) }}

with orders as ( select * from {{ ref('stg_orders') }} ),

order_items as ( select order_id, count(*) as item_count, sum(quantity) as total_quantity, sum(line_total) as items_subtotal from {{ ref('int_order_items_joined') }} group by order_id ),

final as ( select -- Primary key orders.order_id,

    -- Foreign keys
    orders.user_id,

    -- Metrics
    orders.total_amount,
    orders.subtotal,
    orders.tax_amount,
    orders.shipping_amount,
    order_items.item_count,
    order_items.total_quantity,

    -- Dimensions
    orders.status,
    orders.payment_method,

    -- Timestamps
    orders.order_created_at,
    orders.completed_at,

    -- Metadata
    current_timestamp() as dbt_updated_at

from orders
left join order_items
    on orders.order_id = order_items.order_id

)

select * from final

-- models/marts/dim_customers.sql {{ config( materialized='table', tags=['dimension'] ) }}

with users as ( select * from {{ ref('stg_users') }} ),

orders as ( select * from {{ ref('fct_orders') }} ),

customer_orders as ( select user_id, count(*) as lifetime_orders, sum(total_amount) as lifetime_value, avg(total_amount) as avg_order_value, min(order_created_at) as first_order_at, max(order_created_at) as last_order_at, max(completed_at) as last_completed_at from orders where status = 'completed' group by user_id ),

final as ( select -- Primary key users.user_id, users.user_key,

    -- Attributes
    users.email,
    users.first_name,
    users.last_name,
    users.first_name || ' ' || users.last_name as full_name,

    -- Customer metrics
    coalesce(customer_orders.lifetime_orders, 0) as lifetime_orders,
    coalesce(customer_orders.lifetime_value, 0) as lifetime_value,
    customer_orders.avg_order_value,

    -- Segmentation
    case
        when customer_orders.lifetime_value >= 10000 then 'VIP'
        when customer_orders.lifetime_value >= 5000 then 'High Value'
        when customer_orders.lifetime_value >= 1000 then 'Medium Value'
        when customer_orders.lifetime_value > 0 then 'Low Value'
        else 'No Orders'
    end as customer_segment,

    -- Timestamps
    users.created_at as user_created_at,
    customer_orders.first_order_at,
    customer_orders.last_order_at,

    -- Metadata
    current_timestamp() as dbt_updated_at

from users
left join customer_orders
    on users.user_id = customer_orders.user_id
where users.is_active

)

select * from final

Incremental Models

Incremental Loading:

-- models/marts/fct_events.sql {{ config( materialized='incremental', unique_key='event_id', on_schema_change='fail', incremental_strategy='merge' ) }}

with events as ( select * from {{ ref('stg_events') }}

{% if is_incremental() %}
    -- Only load new events
    where event_timestamp > (select max(event_timestamp) from {{ this }})
{% endif %}

),

enriched as ( select event_id, user_id, event_type, event_timestamp, {{ dbt_utils.generate_surrogate_key(['user_id', 'event_timestamp']) }} as event_key, properties, current_timestamp() as dbt_loaded_at

from events

)

select * from enriched

-- Incremental with delete + insert {{ config( materialized='incremental', unique_key='date', incremental_strategy='delete+insert' ) }}

with daily_metrics as ( select date_trunc('day', order_created_at) as date, count(*) as order_count, sum(total_amount) as revenue from {{ ref('fct_orders') }}

{% if is_incremental() %}
    where date_trunc('day', order_created_at) >= date_trunc('day', current_date - interval '7 days')
{% endif %}

group by 1

)

select * from daily_metrics

Tests

Schema Tests:

models/marts/schema.yml

version: 2

models: - name: fct_orders description: Order transactions fact table columns: - name: order_id description: Unique order identifier tests: - unique - not_null

  - name: user_id
    description: Customer identifier
    tests:
      - not_null
      - relationships:
          to: ref('dim_customers')
          field: user_id

  - name: total_amount
    description: Order total amount
    tests:
      - not_null
      - dbt_utils.accepted_range:
          min_value: 0
          max_value: 1000000

  - name: status
    tests:
      - accepted_values:
          values: ['pending', 'completed', 'cancelled', 'refunded']
  • name: dim_customers description: Customer dimension table tests: # Table-level test
    • dbt_utils.unique_combination_of_columns: combination_of_columns: - user_id - email

Custom Tests:

-- tests/assert_positive_revenue.sql -- This test fails if any daily revenue is negative

select date, sum(total_amount) as revenue from {{ ref('fct_orders') }} where status = 'completed' group by date having sum(total_amount) < 0

-- tests/assert_order_counts_match.sql -- Check that order counts match between tables

with orders_table as ( select count(*) as order_count from {{ ref('fct_orders') }} ),

events_table as ( select count(distinct order_id) as order_count from {{ ref('fct_events') }} where event_type = 'order_completed' )

select * from orders_table cross join events_table where orders_table.order_count != events_table.order_count

Data Tests:

-- tests/generic/test_valid_percentage.sql

select * from {{ model }} where {{ column_name }} < 0 or {{ column_name }} > 1

{% endtest %}

-- Usage in schema.yml

- name: conversion_rate

tests:

- valid_percentage

Macros

Reusable Macros:

-- macros/cents_to_dollars.sql {% macro cents_to_dollars(column_name, scale=2) %} ({{ column_name }} / 100.0)::numeric(16, {{ scale }})

-- Usage: {{ cents_to_dollars('price_cents') }}

-- macros/generate_alias_name.sql {% macro generate_alias_name(custom_alias_name=none, node=none) -%} {%- if custom_alias_name is none -%} {{ node.name }} {%- else -%} {{ custom_alias_name | trim }} {%- endif -%}

-- macros/date_spine.sql

with date_spine as ( {{ dbt_utils.date_spine( datepart="day", start_date="cast('" ~ start_date ~ "' as date)", end_date="cast('" ~ end_date ~ "' as date)" ) }} )

select date_day from date_spine

{% endmacro %}

-- macros/grant_select.sql {% macro grant_select(schema, role) %} {% set sql %} grant select on all tables in schema {{ schema }} to {{ role }};

{% do run_query(sql) %}
{% do log("Granted select on " ~ schema ~ " to " ~ role, info=True) %}

{% endmacro %}

-- Usage in on-run-end hook -- {{ grant_select('analytics', 'analyst') }}

Advanced Macros:

-- macros/pivot_metrics.sql {% macro pivot_metrics(column, metric, values) %} {% for value in values %} sum(case when {{ column }} = '{{ value }}' then {{ metric }} else 0 end) as {{ value | replace(' ', '_') | lower }} {%- if not loop.last -%},{%- endif %} {% endfor %}

-- Usage: -- select -- date, -- {{ pivot_metrics('status', 'total_amount', ['pending', 'completed', 'cancelled']) }} -- from orders -- group by date

-- macros/generate_schema_name.sql {% macro generate_schema_name(custom_schema_name, node) -%}

{%- if target.name == 'prod' and custom_schema_name is not none -%}
    {{ custom_schema_name | trim }}
{%- else -%}
    {{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}

{%- endmacro %}

Snapshots (SCD Type 2)

Timestamp Strategy:

-- snapshots/orders_snapshot.sql

{{ config( target_schema='snapshots', unique_key='order_id', strategy='timestamp', updated_at='updated_at', invalidate_hard_deletes=True ) }}

select * from {{ source('raw_postgres', 'orders') }}

{% endsnapshot %}

Check Strategy:

-- snapshots/customers_snapshot.sql

{{ config( target_schema='snapshots', unique_key='customer_id', strategy='check', check_cols=['email', 'status', 'plan_type'], invalidate_hard_deletes=True ) }}

select * from {{ source('raw_postgres', 'customers') }}

{% endsnapshot %}

Documentation

Model Documentation:

models/marts/schema.yml

version: 2

models: - name: fct_orders description: | # Order Transactions Fact Table

  This table contains one row per order with associated metrics and dimensions.

  ## Grain
  One row per order

  ## Freshness
  Updated hourly via incremental load

  ## Usage
  Primary table for order analysis and reporting

columns:
  - name: order_id
    description: Unique order identifier (PK)
    tests:
      - unique
      - not_null

  - name: total_amount
    description: |
      Total order amount including tax and shipping.
      Formula: `subtotal + tax_amount + shipping_amount`

  - name: customer_segment
    description: Customer value segment
    meta:
      dimension:
        type: category
        label: Customer Segment

Custom Documentation:

{% docs overview %}

Analytics dbt Project

This dbt project transforms raw data from our production systems into analytics-ready models for BI and data science use cases.

Data Sources

  • PostgreSQL (production database)
  • S3 (event tracking)
  • Snowflake (external data)

Model Layers

  1. Staging: Light transformations, renaming
  2. Intermediate: Business logic, joins
  3. Marts: Final tables for consumption

{% enddocs %}

Packages and Dependencies

packages.yml:

packages: - package: dbt-labs/dbt_utils version: 1.1.1

  • package: calogica/dbt_expectations version: 0.10.0

  • package: dbt-labs/codegen version: 0.12.1

  • git: "https://github.com/dbt-labs/dbt-audit-helper.git" revision: 0.9.0

Using Packages:

-- Using dbt_utils select {{ dbt_utils.generate_surrogate_key(['user_id', 'order_id']) }} as order_key, {{ dbt_utils.safe_divide('revenue', 'orders') }} as avg_order_value, {{ dbt_utils.star(from=ref('stg_orders'), except=['_synced_at']) }} from {{ ref('stg_orders') }}

-- Using dbt_expectations tests: - dbt_expectations.expect_column_values_to_be_between: min_value: 0 max_value: 100

Best Practices 1. Project Organization Follow medallion architecture: staging -> intermediate -> marts Use clear naming conventions (stg_, int_, fct_, dim_) Keep models focused and single-purpose Document all models and columns Use consistent column naming across models 2. Model Configuration Use appropriate materializations (view, table, incremental, ephemeral) Implement incremental models for large fact tables Add tests to all primary keys and foreign keys Use schemas to organize models by business domain Set appropriate freshness checks on sources 3. Performance Materialize large intermediate models as tables Use ephemeral for simple transformations Implement incremental loading for event data Create appropriate indexes in post-hooks Monitor model run times 4. Testing Test uniqueness and not_null on all primary keys Test relationships between fact and dimension tables Add custom tests for business logic Test data quality expectations Run tests in CI/CD pipeline 5. Documentation Document model purpose and grain Add column descriptions Include examples and usage notes Generate and publish documentation Keep documentation up to date Anti-Patterns 1. Complex CTEs -- Bad: Many nested CTEs with cte1 as (...), cte2 as (...), cte3 as (...) -- 20 more CTEs select * from cte23

-- Good: Break into intermediate models select * from {{ ref('int_cleaned_data') }}

  1. Not Using refs -- Bad: Direct table reference select * from analytics.staging.stg_orders

-- Good: Use ref select * from {{ ref('stg_orders') }}

  1. No Tests -- Bad: No tests -- Good: Always test PKs and FKs columns:
  2. name: id tests: [unique, not_null]

  3. Hardcoded Values -- Bad: Hardcoded date where created_at >= '2024-01-01'

-- Good: Use variables where created_at >= '{{ var("start_date") }}'

Resources dbt Documentation dbt Best Practices dbt Discourse Community dbt Package Hub dbt Learn Analytics Engineering Guide dbt Style Guide

返回排行榜