.PY MODELS

DVT lets you write models in Python alongside your SQL models. A .py model is a Python file that lives in your models/ folder, just like a .sql file. It runs locally on your machine (not on the database), giving you the full power of Python to pull data from APIs, transform with pandas, run ML models, or do anything else Python can do.

Python models appear in the DAG, show up in dvt docs, and can be referenced by other models with {{ ref('my_python_model') }} — just like SQL models.

WHY PYTHON MODELS?

SQL is great for transforming data that already lives in a database. But what about data that lives outside databases?

API

API Extraction

Pull data from REST APIs (Stripe, HubSpot, Salesforce, any API) directly into your data pipeline.

GEN

Data Generation

Generate seed data, lookup tables, date dimensions, or test fixtures with Python code.

ML

ML & AI

Run ML inference, score models, classify text with AI, or generate embeddings — all inline in your DAG.

FILE

File Processing

Read CSVs, Excel files, PDFs, or any local file format and bring it into your pipeline.

HOW DVT PYTHON MODELS ARE DIFFERENT FROM DBT

dbt also supports Python models, but they work very differently. Understanding the difference is important:

FEATUREDBT PYTHON MODELSDVT PYTHON MODELS
Where code runsOn the warehouse (Snowflake, Databricks, BigQuery)Locally on your machine
Supported databasesOnly 3 (Snowflake, Databricks, BigQuery)All 19+ databases DVT supports
API accessNo — warehouse can't call APIsYes — full network access
File system accessNo — runs in warehouse sandboxYes — read/write local files
Python packagesLimited to warehouse-installed packagesAny package you can pip install
Session typeSpark/Snowpark session (DataFrame API)DuckDB connection (SQL + DataFrame)
CostWarehouse compute costs (can be expensive)Free — runs on your laptop/server

In short: dbt Python models are limited to running on expensive cloud warehouses that support server-side Python. DVT Python models run locally with full Python capabilities and work with anytarget database — including PostgreSQL, MySQL, Oracle, SQL Server, and all others that dbt Python models don't support.

HOW IT WORKS

When DVT finds a .py file in your models/ folder, it:

1

Parses your Python file

DVT reads the code and extracts any dbt.ref() and dbt.source() calls automatically — no extra configuration needed.

2

Extracts upstream data into DuckDB

Any tables you reference with dbt.ref() or dbt.source() are extracted into a local DuckDB cache so your Python code can access them.

3

Runs your Python code locally

Your model() function executes on your machine with full access to Python libraries. The 'session' parameter is a DuckDB connection.

4

Loads the result to your target database

The DataFrame you return is automatically loaded to your target database (PostgreSQL, Snowflake, MySQL, etc.) via Sling.

Your Python code  →  DuckDB (local)  →  Sling  →  Target Database
                         ↑
              upstream refs/sources
              extracted here first

YOUR FIRST PYTHON MODEL

Every Python model must define a function called model that takes two arguments and returns a DataFrame:

# models/my_first_python_model.py

def model(dbt, session):
    import pandas as pd

    dbt.config(materialized="table")

    data = pd.DataFrame({
        "id": [1, 2, 3],
        "name": ["Alice", "Bob", "Charlie"],
        "score": [95, 87, 92],
    })

    return data

That's it. Save this as a .py file in your models/ folder and rundvt run --select my_first_python_model. The resulting table will appear in your target database with columns id, name, and score.

THE TWO ARGUMENTS: DBT AND SESSION

Every model(dbt, session) function receives two objects:

ARGUMENTWHAT IT ISWHEN TO USE IT
dbtDVT context object with ref(), source(), config, this, is_incrementalTo reference other models/sources and read config
sessionA DuckDB database connectionTo run SQL queries locally or access cached tables directly

REFERENCING OTHER MODELS

Use dbt.ref() to reference any upstream model (SQL or Python). It returns a relation object — call .df() to get a pandas DataFrame:

# models/enrich_customers.py

def model(dbt, session):
    import pandas as pd

    dbt.config(materialized="table")

    # Get data from an upstream SQL model
    customers = dbt.ref("stg_customers").df()     # → pandas DataFrame

    # Transform with pandas
    customers["name_upper"] = customers["name"].str.upper()
    customers["is_vip"] = customers["total_spend"] > 1000

    return customers

Important: When you call dbt.ref("some_model"), DVT automatically extracts that model's table from the database into the local DuckDB cache. You don't need to worry about connections — it just works.

REFERENCING SOURCES

Use dbt.source() to read from a source defined in sources.yml:

# models/process_orders.py

def model(dbt, session):
    import pandas as pd

    dbt.config(materialized="table")

    # Read from a MySQL source (automatically extracted to DuckDB)
    orders = dbt.source("mysql_ops", "orders").df()

    # Clean and transform
    orders["order_date"] = pd.to_datetime(orders["order_date"])
    orders["amount"] = pd.to_numeric(orders["amount"], errors="coerce")

    return orders

RETURN TYPES

Your model() function must return one of these types:

TYPELIBRARYEXAMPLE
pandas.DataFramepandasreturn pd.DataFrame(data)
polars.DataFramepolarsreturn pl.DataFrame(data)
pyarrow.Tablepyarrowreturn pa.table(data)
DuckDB Relationduckdbreturn session.sql("SELECT ...")

Recommendation: Start with pandas.DataFrame— it's the most widely used and has the best documentation. Use polars for larger datasets that need better performance.

API EXTRACTION EXAMPLES

Simple REST API

# models/api_users.py

def model(dbt, session):
    import requests
    import pandas as pd

    dbt.config(materialized="table")

    # Fetch data from any REST API
    response = requests.get("https://api.example.com/users")
    response.raise_for_status()

    users = pd.DataFrame(response.json()["data"])
    return users

API with Authentication

# models/api_stripe_charges.py

def model(dbt, session):
    import requests
    import pandas as pd
    import os

    dbt.config(materialized="table")

    api_key = os.environ["STRIPE_API_KEY"]

    response = requests.get(
        "https://api.stripe.com/v1/charges",
        headers={"Authorization": f"Bearer {api_key}"},
        params={"limit": 100},
    )
    response.raise_for_status()

    return pd.DataFrame(response.json()["data"])

Paginated API

# models/api_all_contacts.py

def model(dbt, session):
    import requests
    import pandas as pd
    import os

    dbt.config(materialized="table")

    api_key = os.environ["HUBSPOT_API_KEY"]
    all_records = []
    url = "https://api.hubapi.com/crm/v3/contacts"

    while url:
        response = requests.get(
            url,
            headers={"Authorization": f"Bearer {api_key}"},
        ).json()

        all_records.extend(response["results"])

        # Follow pagination
        next_link = response.get("paging", {}).get("next", {}).get("link")
        url = next_link  # None when no more pages

    return pd.DataFrame(all_records)

Joining API Data with Database Data

# models/enriched_orders.py

def model(dbt, session):
    import requests
    import pandas as pd

    dbt.config(materialized="table")

    # Get orders from your database (via dbt.ref)
    orders = dbt.ref("stg_orders").df()

    # Get exchange rates from an API
    response = requests.get("https://api.exchangerate.host/latest")
    rates = response.json()["rates"]

    # Enrich: convert amounts to USD
    orders["usd_amount"] = orders.apply(
        lambda row: row["amount"] / rates.get(row["currency"], 1),
        axis=1,
    )

    return orders

ML & AI EXAMPLES

ML Inference

# models/churn_predictions.py

def model(dbt, session):
    import pandas as pd
    import joblib

    dbt.config(materialized="table")

    # Load customers from upstream model
    customers = dbt.ref("dim_customers").df()

    # Load pre-trained model
    clf = joblib.load("models/churn_model.pkl")

    # Score each customer
    features = customers[["tenure", "monthly_spend", "support_tickets"]]
    customers["churn_probability"] = clf.predict_proba(features)[:, 1]
    customers["churn_risk"] = pd.cut(
        customers["churn_probability"],
        bins=[0, 0.3, 0.7, 1.0],
        labels=["low", "medium", "high"],
    )

    return customers

AI-Powered Transformations

# models/sentiment_analysis.py

def model(dbt, session):
    import pandas as pd
    from openai import OpenAI
    import os

    dbt.config(materialized="table")

    client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
    reviews = dbt.ref("raw_reviews").df()

    def classify(text):
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": f"Classify sentiment (positive/negative/neutral): {text}"}],
        )
        return response.choices[0].message.content.strip().lower()

    reviews["sentiment"] = reviews["review_text"].apply(classify)
    return reviews

CONFIGURATION

Python models use dbt.config() to set materialization and other options, just like {{ config() }} in SQL models:

def model(dbt, session):
    dbt.config(
        materialized="table",       # "table" (default) or "incremental"
        tags=["python", "api"],     # for selection: dvt run --select tag:api
    )
    # ... your code ...

Note: Python models always materialize as table. Views and ephemeral materializations are not supported for Python models because the code runs locally, not on the database.

INCREMENTAL PYTHON MODELS

Python models support incremental materialization. Use dbt.is_incremental to check if the model has run before:

# models/incremental_api.py

def model(dbt, session):
    import requests
    import pandas as pd

    dbt.config(
        materialized="incremental",
        unique_key="id",
    )

    if dbt.is_incremental:
        # Only fetch new records since last run
        # Query the session (DuckDB cache) for the last known ID
        last_id = session.execute(
            "SELECT MAX(id) FROM __model__incremental_api"
        ).fetchone()[0] or 0

        url = f"https://api.example.com/records?since_id={last_id}"
    else:
        # First run: fetch everything
        url = "https://api.example.com/records"

    data = requests.get(url).json()
    return pd.DataFrame(data["records"])

USING THE SESSION (DUCKDB)

The session parameter is a DuckDB connection. You can use it to run SQL queries locally, which is useful when you want to combine Python and SQL:

def model(dbt, session):
    import pandas as pd

    dbt.config(materialized="table")

    # Run SQL directly in DuckDB
    result = session.execute("""
        SELECT
            category,
            COUNT(*) as cnt,
            AVG(amount) as avg_amount
        FROM __model__py_transformer
        GROUP BY category
    """).df()

    return result

INSTALLING PYTHON PACKAGES

Python models can use any Python package installed in your environment. If a package is missing, DVT will show a clear error:

DVT125: Python model 'api_users' failed to import: requests.
Install the package with: pip install requests

Install packages with pip install or uv add before running your models. Common packages for Python models:

PACKAGEUSE CASEINSTALL
requestsHTTP/API callspip install requests
pandasDataFrames (included with DVT)already installed
polarsFast DataFramespip install polars
openaiOpenAI / ChatGPTpip install openai
scikit-learnML modelspip install scikit-learn
beautifulsoup4Web scrapingpip install beautifulsoup4
openpyxlExcel filespip install openpyxl

DAG POSITIONING

Python models fit anywhere in your DAG. Here are the three common patterns:

Beginning of DAG (data source)

No refs — generates or fetches data. Other models ref it.

# models/api_products.py
def model(dbt, session):
    import requests, pandas as pd
    dbt.config(materialized="table")
    return pd.DataFrame(requests.get("https://api.example.com/products").json())

# models/product_report.sql  (downstream, refs the Python model)
# SELECT * FROM {{ ref('api_products') }} WHERE is_active = true

Middle of DAG (transformer)

Refs upstream models, transforms with Python, other models ref it.

# models/enriched_customers.py
def model(dbt, session):
    import pandas as pd
    dbt.config(materialized="table")
    customers = dbt.ref("stg_customers").df()    # upstream SQL model
    customers["segment"] = customers["spend"].apply(
        lambda x: "high" if x > 1000 else "low"
    )
    return customers

# models/customer_dashboard.sql  (downstream)
# SELECT segment, COUNT(*) FROM {{ ref('enriched_customers') }} GROUP BY 1

End of DAG (final output)

Refs multiple upstream models, produces a final result.

# models/executive_report.py
def model(dbt, session):
    import pandas as pd
    dbt.config(materialized="table")
    revenue = dbt.ref("fct_revenue").df()
    customers = dbt.ref("dim_customers").df()
    summary = revenue.groupby("month").agg(total=("amount", "sum")).reset_index()
    summary["customer_count"] = len(customers)
    return summary

RULES AND CONSTRAINTS

Must define model(dbt, session)

Every .py file must have exactly one function named 'model' that takes two arguments: dbt and session. This is validated at parse time.

Must return a DataFrame

The model() function must return a pandas DataFrame, polars DataFrame, pyarrow Table, or DuckDB relation. Returning None, a string, or any other type will raise DVT122.

No Jinja allowed

Python models cannot use Jinja ({{ }}). Use dbt.ref() and dbt.source() instead of {{ ref() }} and {{ source() }}.

Materialization is always table

Python models default to materialized='table'. Views and ephemeral are not supported because the code runs locally, not on the database engine.

Runs locally

Python code runs on the machine where DVT is installed, not on the database. This means you have full access to the filesystem, environment variables, and network.

ERROR CODES

CODEMEANINGFIX
DVT120ref() target not foundCheck the model name in dbt.ref(). Run dvt list to see available models.
DVT121source() target not foundCheck the source name and table in dbt.source(). Verify sources.yml.
DVT122Bad return typeReturn a pandas DataFrame, polars DataFrame, pyarrow Table, or DuckDB relation.
DVT123Empty resultYour model returned an empty DataFrame. Check your data source or filters.
DVT124TimeoutYour model took too long. Check for infinite loops or slow API calls.
DVT125Import errorInstall the missing package: pip install <package_name>
DVT126Runtime errorYour Python code raised an exception. Check the traceback in the error message.

QUICK CHECKLIST

Before running a Python model, make sure:

File ends with .py and lives in models/ folder
Has def model(dbt, session): function
Returns a DataFrame (not None, not a string)
Required Python packages are installed (pip install ...)
API keys are set as environment variables (not hardcoded!)
No Jinja syntax (use dbt.ref() instead of {{ ref() }})