.PY MODELS
DVT lets you write models in Python alongside your SQL models. A .py model is a Python file that lives in your models/ folder, just like a .sql file. It runs locally on your machine (not on the database), giving you the full power of Python to pull data from APIs, transform with pandas, run ML models, or do anything else Python can do.
Python models appear in the DAG, show up in dvt docs, and can be referenced by other models with {{ ref('my_python_model') }} — just like SQL models.
WHY PYTHON MODELS?
SQL is great for transforming data that already lives in a database. But what about data that lives outside databases?
API Extraction
Pull data from REST APIs (Stripe, HubSpot, Salesforce, any API) directly into your data pipeline.
Data Generation
Generate seed data, lookup tables, date dimensions, or test fixtures with Python code.
ML & AI
Run ML inference, score models, classify text with AI, or generate embeddings — all inline in your DAG.
File Processing
Read CSVs, Excel files, PDFs, or any local file format and bring it into your pipeline.
HOW DVT PYTHON MODELS ARE DIFFERENT FROM DBT
dbt also supports Python models, but they work very differently. Understanding the difference is important:
| FEATURE | DBT PYTHON MODELS | DVT PYTHON MODELS |
|---|---|---|
| Where code runs | On the warehouse (Snowflake, Databricks, BigQuery) | Locally on your machine |
| Supported databases | Only 3 (Snowflake, Databricks, BigQuery) | All 19+ databases DVT supports |
| API access | No — warehouse can't call APIs | Yes — full network access |
| File system access | No — runs in warehouse sandbox | Yes — read/write local files |
| Python packages | Limited to warehouse-installed packages | Any package you can pip install |
| Session type | Spark/Snowpark session (DataFrame API) | DuckDB connection (SQL + DataFrame) |
| Cost | Warehouse compute costs (can be expensive) | Free — runs on your laptop/server |
In short: dbt Python models are limited to running on expensive cloud warehouses that support server-side Python. DVT Python models run locally with full Python capabilities and work with anytarget database — including PostgreSQL, MySQL, Oracle, SQL Server, and all others that dbt Python models don't support.
HOW IT WORKS
When DVT finds a .py file in your models/ folder, it:
Parses your Python file
DVT reads the code and extracts any dbt.ref() and dbt.source() calls automatically — no extra configuration needed.
Extracts upstream data into DuckDB
Any tables you reference with dbt.ref() or dbt.source() are extracted into a local DuckDB cache so your Python code can access them.
Runs your Python code locally
Your model() function executes on your machine with full access to Python libraries. The 'session' parameter is a DuckDB connection.
Loads the result to your target database
The DataFrame you return is automatically loaded to your target database (PostgreSQL, Snowflake, MySQL, etc.) via Sling.
Your Python code → DuckDB (local) → Sling → Target Database
↑
upstream refs/sources
extracted here firstYOUR FIRST PYTHON MODEL
Every Python model must define a function called model that takes two arguments and returns a DataFrame:
# models/my_first_python_model.py
def model(dbt, session):
import pandas as pd
dbt.config(materialized="table")
data = pd.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"],
"score": [95, 87, 92],
})
return dataThat's it. Save this as a .py file in your models/ folder and rundvt run --select my_first_python_model. The resulting table will appear in your target database with columns id, name, and score.
THE TWO ARGUMENTS: DBT AND SESSION
Every model(dbt, session) function receives two objects:
| ARGUMENT | WHAT IT IS | WHEN TO USE IT |
|---|---|---|
| dbt | DVT context object with ref(), source(), config, this, is_incremental | To reference other models/sources and read config |
| session | A DuckDB database connection | To run SQL queries locally or access cached tables directly |
REFERENCING OTHER MODELS
Use dbt.ref() to reference any upstream model (SQL or Python). It returns a relation object — call .df() to get a pandas DataFrame:
# models/enrich_customers.py
def model(dbt, session):
import pandas as pd
dbt.config(materialized="table")
# Get data from an upstream SQL model
customers = dbt.ref("stg_customers").df() # → pandas DataFrame
# Transform with pandas
customers["name_upper"] = customers["name"].str.upper()
customers["is_vip"] = customers["total_spend"] > 1000
return customersImportant: When you call dbt.ref("some_model"), DVT automatically extracts that model's table from the database into the local DuckDB cache. You don't need to worry about connections — it just works.
REFERENCING SOURCES
Use dbt.source() to read from a source defined in sources.yml:
# models/process_orders.py
def model(dbt, session):
import pandas as pd
dbt.config(materialized="table")
# Read from a MySQL source (automatically extracted to DuckDB)
orders = dbt.source("mysql_ops", "orders").df()
# Clean and transform
orders["order_date"] = pd.to_datetime(orders["order_date"])
orders["amount"] = pd.to_numeric(orders["amount"], errors="coerce")
return ordersRETURN TYPES
Your model() function must return one of these types:
| TYPE | LIBRARY | EXAMPLE |
|---|---|---|
| pandas.DataFrame | pandas | return pd.DataFrame(data) |
| polars.DataFrame | polars | return pl.DataFrame(data) |
| pyarrow.Table | pyarrow | return pa.table(data) |
| DuckDB Relation | duckdb | return session.sql("SELECT ...") |
Recommendation: Start with pandas.DataFrame— it's the most widely used and has the best documentation. Use polars for larger datasets that need better performance.
API EXTRACTION EXAMPLES
Simple REST API
# models/api_users.py
def model(dbt, session):
import requests
import pandas as pd
dbt.config(materialized="table")
# Fetch data from any REST API
response = requests.get("https://api.example.com/users")
response.raise_for_status()
users = pd.DataFrame(response.json()["data"])
return usersAPI with Authentication
# models/api_stripe_charges.py
def model(dbt, session):
import requests
import pandas as pd
import os
dbt.config(materialized="table")
api_key = os.environ["STRIPE_API_KEY"]
response = requests.get(
"https://api.stripe.com/v1/charges",
headers={"Authorization": f"Bearer {api_key}"},
params={"limit": 100},
)
response.raise_for_status()
return pd.DataFrame(response.json()["data"])Paginated API
# models/api_all_contacts.py
def model(dbt, session):
import requests
import pandas as pd
import os
dbt.config(materialized="table")
api_key = os.environ["HUBSPOT_API_KEY"]
all_records = []
url = "https://api.hubapi.com/crm/v3/contacts"
while url:
response = requests.get(
url,
headers={"Authorization": f"Bearer {api_key}"},
).json()
all_records.extend(response["results"])
# Follow pagination
next_link = response.get("paging", {}).get("next", {}).get("link")
url = next_link # None when no more pages
return pd.DataFrame(all_records)Joining API Data with Database Data
# models/enriched_orders.py
def model(dbt, session):
import requests
import pandas as pd
dbt.config(materialized="table")
# Get orders from your database (via dbt.ref)
orders = dbt.ref("stg_orders").df()
# Get exchange rates from an API
response = requests.get("https://api.exchangerate.host/latest")
rates = response.json()["rates"]
# Enrich: convert amounts to USD
orders["usd_amount"] = orders.apply(
lambda row: row["amount"] / rates.get(row["currency"], 1),
axis=1,
)
return ordersML & AI EXAMPLES
ML Inference
# models/churn_predictions.py
def model(dbt, session):
import pandas as pd
import joblib
dbt.config(materialized="table")
# Load customers from upstream model
customers = dbt.ref("dim_customers").df()
# Load pre-trained model
clf = joblib.load("models/churn_model.pkl")
# Score each customer
features = customers[["tenure", "monthly_spend", "support_tickets"]]
customers["churn_probability"] = clf.predict_proba(features)[:, 1]
customers["churn_risk"] = pd.cut(
customers["churn_probability"],
bins=[0, 0.3, 0.7, 1.0],
labels=["low", "medium", "high"],
)
return customersAI-Powered Transformations
# models/sentiment_analysis.py
def model(dbt, session):
import pandas as pd
from openai import OpenAI
import os
dbt.config(materialized="table")
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
reviews = dbt.ref("raw_reviews").df()
def classify(text):
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": f"Classify sentiment (positive/negative/neutral): {text}"}],
)
return response.choices[0].message.content.strip().lower()
reviews["sentiment"] = reviews["review_text"].apply(classify)
return reviewsCONFIGURATION
Python models use dbt.config() to set materialization and other options, just like {{ config() }} in SQL models:
def model(dbt, session):
dbt.config(
materialized="table", # "table" (default) or "incremental"
tags=["python", "api"], # for selection: dvt run --select tag:api
)
# ... your code ...Note: Python models always materialize as table. Views and ephemeral materializations are not supported for Python models because the code runs locally, not on the database.
INCREMENTAL PYTHON MODELS
Python models support incremental materialization. Use dbt.is_incremental to check if the model has run before:
# models/incremental_api.py
def model(dbt, session):
import requests
import pandas as pd
dbt.config(
materialized="incremental",
unique_key="id",
)
if dbt.is_incremental:
# Only fetch new records since last run
# Query the session (DuckDB cache) for the last known ID
last_id = session.execute(
"SELECT MAX(id) FROM __model__incremental_api"
).fetchone()[0] or 0
url = f"https://api.example.com/records?since_id={last_id}"
else:
# First run: fetch everything
url = "https://api.example.com/records"
data = requests.get(url).json()
return pd.DataFrame(data["records"])USING THE SESSION (DUCKDB)
The session parameter is a DuckDB connection. You can use it to run SQL queries locally, which is useful when you want to combine Python and SQL:
def model(dbt, session):
import pandas as pd
dbt.config(materialized="table")
# Run SQL directly in DuckDB
result = session.execute("""
SELECT
category,
COUNT(*) as cnt,
AVG(amount) as avg_amount
FROM __model__py_transformer
GROUP BY category
""").df()
return resultINSTALLING PYTHON PACKAGES
Python models can use any Python package installed in your environment. If a package is missing, DVT will show a clear error:
DVT125: Python model 'api_users' failed to import: requests. Install the package with: pip install requests
Install packages with pip install or uv add before running your models. Common packages for Python models:
| PACKAGE | USE CASE | INSTALL |
|---|---|---|
| requests | HTTP/API calls | pip install requests |
| pandas | DataFrames (included with DVT) | already installed |
| polars | Fast DataFrames | pip install polars |
| openai | OpenAI / ChatGPT | pip install openai |
| scikit-learn | ML models | pip install scikit-learn |
| beautifulsoup4 | Web scraping | pip install beautifulsoup4 |
| openpyxl | Excel files | pip install openpyxl |
DAG POSITIONING
Python models fit anywhere in your DAG. Here are the three common patterns:
Beginning of DAG (data source)
No refs — generates or fetches data. Other models ref it.
# models/api_products.py
def model(dbt, session):
import requests, pandas as pd
dbt.config(materialized="table")
return pd.DataFrame(requests.get("https://api.example.com/products").json())
# models/product_report.sql (downstream, refs the Python model)
# SELECT * FROM {{ ref('api_products') }} WHERE is_active = trueMiddle of DAG (transformer)
Refs upstream models, transforms with Python, other models ref it.
# models/enriched_customers.py
def model(dbt, session):
import pandas as pd
dbt.config(materialized="table")
customers = dbt.ref("stg_customers").df() # upstream SQL model
customers["segment"] = customers["spend"].apply(
lambda x: "high" if x > 1000 else "low"
)
return customers
# models/customer_dashboard.sql (downstream)
# SELECT segment, COUNT(*) FROM {{ ref('enriched_customers') }} GROUP BY 1End of DAG (final output)
Refs multiple upstream models, produces a final result.
# models/executive_report.py
def model(dbt, session):
import pandas as pd
dbt.config(materialized="table")
revenue = dbt.ref("fct_revenue").df()
customers = dbt.ref("dim_customers").df()
summary = revenue.groupby("month").agg(total=("amount", "sum")).reset_index()
summary["customer_count"] = len(customers)
return summaryRULES AND CONSTRAINTS
Every .py file must have exactly one function named 'model' that takes two arguments: dbt and session. This is validated at parse time.
The model() function must return a pandas DataFrame, polars DataFrame, pyarrow Table, or DuckDB relation. Returning None, a string, or any other type will raise DVT122.
Python models cannot use Jinja ({{ }}). Use dbt.ref() and dbt.source() instead of {{ ref() }} and {{ source() }}.
Python models default to materialized='table'. Views and ephemeral are not supported because the code runs locally, not on the database engine.
Python code runs on the machine where DVT is installed, not on the database. This means you have full access to the filesystem, environment variables, and network.
ERROR CODES
| CODE | MEANING | FIX |
|---|---|---|
| DVT120 | ref() target not found | Check the model name in dbt.ref(). Run dvt list to see available models. |
| DVT121 | source() target not found | Check the source name and table in dbt.source(). Verify sources.yml. |
| DVT122 | Bad return type | Return a pandas DataFrame, polars DataFrame, pyarrow Table, or DuckDB relation. |
| DVT123 | Empty result | Your model returned an empty DataFrame. Check your data source or filters. |
| DVT124 | Timeout | Your model took too long. Check for infinite loops or slow API calls. |
| DVT125 | Import error | Install the missing package: pip install <package_name> |
| DVT126 | Runtime error | Your Python code raised an exception. Check the traceback in the error message. |
QUICK CHECKLIST
Before running a Python model, make sure: