← Back to Blogs

Building a Data Orchestrator with Dagster and dbt

12 min read
Data EngineeringDagsterdbtDuckDB

Modern data pipelines require more than just moving data from A to B. They need orchestration, transformation, testing, and observability. In this post, I walk through building a production-grade weather data pipeline using Dagster, dbt, and DuckDB.

What We Are Building

A complete ELT (Extract-Load-Transform) pipeline that fetches weather data from the Open-Meteo API, loads it into DuckDB, and transforms it through staging, intermediate, and marts layers using dbt.

text
+===========================================================================+
|                         DATA ORCHESTRATOR                                 |
+===========================================================================+
|                                                                           |
|   +-------------+     +-------------+     +------------------+            |
|   |   EXTRACT   |     |    LOAD     |     |    TRANSFORM     |            |
|   | (Dagster)   | --> |  (Dagster)  | --> |      (dbt)       |            |
|   +-------------+     +-------------+     +------------------+            |
|         |                   |                     |                       |
|         v                   v                     v                       |
|   Open-Meteo API       DuckDB Raw           Staging/Int/Marts             |
|                                                                           |
+===========================================================================+

Key Features:

  • Asset-based orchestration with Dagster (not task-based DAGs)
  • SQL transformations with dbt and 19 automated data quality tests
  • Zero infrastructure - DuckDB runs as an embedded database
  • No API keys required - uses free Open-Meteo weather API
  • Fully containerized with Docker Compose

Technology Stack

The stack is chosen for simplicity and production-readiness:

  • Dagster 1.9+ - Asset-based orchestration with lineage tracking
  • dbt 1.9+ - SQL transformations with testing and documentation
  • DuckDB 1.1+ - Embedded analytics database (single file)
  • Python 3.11+ - With Pydantic for type-safe configuration
  • UV - Fast, deterministic dependency management
  • Docker - Multi-container deployment

Project Structure

text
dataOrchestrator/
|-- config/
|   +-- cities.yml              # Configurable city coordinates
|
|-- data/
|   |-- raw/                    # JSON extraction files
|   +-- warehouse/              # DuckDB database file
|
|-- dagster_project/
|   |-- definitions.py          # Main entry point
|   |-- assets/
|   |   |-- extract.py          # Weather API extraction
|   |   |-- load.py             # DuckDB loading
|   |   +-- transform.py        # dbt integration
|   |-- resources/
|   |   |-- weather_api.py      # HTTP client resource
|   |   +-- duckdb.py           # Database resource
|   +-- schedules/
|       +-- daily.py            # 6 AM UTC daily schedule
|
+-- dbt_project/
    |-- models/
    |   |-- staging/            # Clean and rename
    |   |-- intermediate/       # Add business logic
    |   +-- marts/              # Analytics-ready tables
    +-- profiles.yml            # DuckDB adapter config

Asset-Based Orchestration

Unlike traditional task-based workflows (Airflow), Dagster uses assets as the primary abstraction. Each asset represents a data artifact that can be materialized.

text
+----------------+      +------------------+      +-------------------+
| raw_weather    |      | staged_weather   |      | dbt_weather       |
| _data          | ---> | _data            | ---> | _models           |
| (JSON files)   |      | (DuckDB tables)  |      | (dbt models)      |
+----------------+      +------------------+      +-------------------+

Why assets over tasks?

  • Data lineage is automatic - see what depends on what
  • Re-run specific assets without running the entire pipeline
  • Metadata tracking shows row counts, file sizes, timestamps
  • The UI visualizes the asset graph

Extract: Fetching Weather Data

The extraction asset fetches weather data from Open-Meteo for configurable cities. It returns 7 days of history plus 7 days of forecast.

python
@asset(
    description="Extract weather data from Open-Meteo API",
    group_name="extract",
)
def raw_weather_data(
    context: AssetExecutionContext,
    weather_api: WeatherAPIClient,
) -> MaterializeResult:
    settings = get_settings()
    all_data = {"cities": {}, "extracted_at": timestamp}

    for city_name, city_config in settings.cities.items():
        response = weather_api.fetch_weather(
            latitude=city_config.lat,
            longitude=city_config.lon,
            past_days=7,
            forecast_days=7,
        )
        all_data["cities"][city_name] = response

    # Save raw JSON
    output_file = RAW_DATA_PATH / f"weather_{timestamp}.json"
    output_file.write_text(json.dumps(all_data, indent=2))

    return MaterializeResult(
        metadata={
            "cities_extracted": len(settings.cities),
            "total_hourly_records": total_records,
            "output_file": MetadataValue.path(str(output_file)),
        }
    )

The WeatherAPIClient is a Dagster resource, allowing for dependency injection and easy testing:

python
class WeatherAPIClient(ConfigurableResource):
    base_url: str = "https://api.open-meteo.com/v1/forecast"
    timeout: int = 30

    def fetch_weather(
        self,
        latitude: float,
        longitude: float,
        past_days: int = 7,
        forecast_days: int = 7,
    ) -> dict:
        params = {
            "latitude": latitude,
            "longitude": longitude,
            "hourly": "temperature_2m,relative_humidity_2m,precipitation",
            "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum",
            "past_days": past_days,
            "forecast_days": forecast_days,
        }

        with httpx.Client(timeout=self.timeout) as client:
            response = client.get(self.base_url, params=params)
            response.raise_for_status()
            return response.json()

Load: DuckDB Staging Tables

The load asset reads the latest JSON file and upserts data into DuckDB staging tables. Using INSERT OR REPLACE makes the process idempotent - safe to re-run without duplicates.

python
@asset(
    deps=[raw_weather_data],
    description="Load weather data into DuckDB staging tables",
    group_name="load",
)
def staged_weather_data(
    context: AssetExecutionContext,
    duckdb_resource: DuckDBResource,
) -> MaterializeResult:
    raw_file = get_latest_raw_file()
    data = json.loads(raw_file.read_text())

    with duckdb_resource.get_connection() as conn:
        for city_name, city_data in data["cities"].items():
            # Upsert hourly data
            for i, timestamp in enumerate(city_data["hourly"]["time"]):
                conn.execute("""
                    INSERT OR REPLACE INTO raw_hourly_weather
                    (city, timestamp, temperature_c, humidity_pct, ...)
                    VALUES (?, ?, ?, ?, ...)
                """, [city_name, timestamp, temp, humidity, ...])

    return MaterializeResult(
        metadata={
            "hourly_records_loaded": hourly_count,
            "daily_records_loaded": daily_count,
        }
    )

DuckDB Resource Pattern

The database resource uses a context manager for safe connection handling:

python
class DuckDBResource(ConfigurableResource):
    database_path: str = "data/warehouse/weather.duckdb"

    @contextmanager
    def get_connection(self):
        conn = duckdb.connect(self.database_path)
        try:
            yield conn
        finally:
            conn.close()

    def init_schema(self):
        with self.get_connection() as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS raw_hourly_weather (
                    city VARCHAR,
                    timestamp TIMESTAMP,
                    temperature_c DOUBLE,
                    humidity_pct DOUBLE,
                    precipitation_mm DOUBLE,
                    PRIMARY KEY (city, timestamp)
                )
            """)

Transform: dbt Medallion Architecture

The transformation layer uses dbt with three model layers following medallion architecture:

text
+==============================================================================+
|                            TRANSFORMATION LAYERS                             |
+==============================================================================+
|                                                                              |
|  +-----------------+    +-------------------+    +------------------+        |
|  |    STAGING      |    |   INTERMEDIATE    |    |      MARTS       |        |
|  |    (views)      | -> |     (views)       | -> |    (tables)      |        |
|  +-----------------+    +-------------------+    +------------------+        |
|                                                                              |
|   - Clean columns       - Add categories        - Aggregations               |
|   - Rename fields       - Business logic        - Rankings                   |
|   - Extract dates       - Derived fields        - Analytics-ready            |
|                                                                              |
+==============================================================================+

Staging Layer

Staging models clean and standardize raw data:

sql
-- models/staging/stg_hourly_weather.sql
select
    city,
    timestamp as observation_timestamp,
    date_trunc('day', timestamp) as observation_date,
    extract(hour from timestamp) as hour_of_day,
    temperature_c,
    humidity_pct as relative_humidity,
    precipitation_mm,
    wind_speed_kmh,
    weather_code
from {{ source('raw', 'raw_hourly_weather') }}

Intermediate Layer

Intermediate models add business logic and categorizations:

sql
-- models/intermediate/int_hourly_enriched.sql
select
    *,
    case
        when temperature_c < 0 then 'freezing'
        when temperature_c < 10 then 'cold'
        when temperature_c < 20 then 'mild'
        when temperature_c < 30 then 'warm'
        else 'hot'
    end as temp_category,

    case
        when precipitation_mm = 0 then 'dry'
        when precipitation_mm < 2.5 then 'light'
        when precipitation_mm < 7.5 then 'moderate'
        else 'heavy'
    end as precip_category,

    case
        when hour_of_day between 6 and 11 then 'morning'
        when hour_of_day between 12 and 17 then 'afternoon'
        when hour_of_day between 18 and 21 then 'evening'
        else 'night'
    end as time_of_day

from {{ ref('stg_hourly_weather') }}

Marts Layer

Mart models are materialized as tables for fast analytics queries:

sql
-- models/marts/fct_city_comparison.sql
{{ config(materialized='table') }}

with daily_stats as (
    select
        city,
        avg(temp_avg_c) as avg_temperature,
        sum(precipitation_sum_mm) as total_precipitation,
        avg(avg_humidity) as avg_humidity,
        count(*) as total_days
    from {{ ref('fct_daily_weather') }}
    group by city
)

select
    *,
    rank() over (order by avg_temperature desc) as warmest_rank,
    rank() over (order by total_precipitation desc) as wettest_rank,
    rank() over (order by avg_humidity desc) as humidity_rank
from daily_stats

Data Quality Testing

The pipeline includes 19 automated dbt tests that run on every execution:

yaml
# models/staging/_schema.yml
version: 2

models:
  - name: stg_hourly_weather
    description: "Cleaned hourly weather observations"
    columns:
      - name: city
        tests:
          - not_null
      - name: observation_timestamp
        tests:
          - not_null
      - name: temperature_c
        tests:
          - not_null

  - name: fct_city_comparison
    columns:
      - name: city
        tests:
          - not_null
          - unique

Tests run as part of dbt build, blocking bad data from reaching the marts layer.

Dagster + dbt Integration

Dagster provides first-class dbt integration. Each dbt model becomes a Dagster asset with automatic dependency tracking:

python
# dagster_project/assets/transform.py
from dagster_dbt import DbtCliResource, dbt_assets, DbtProject

dbt_project = DbtProject(
    project_dir=Path(__file__).parent.parent.parent / "dbt_project",
)

@dbt_assets(
    manifest=dbt_project.manifest_path,
    dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def dbt_weather_models(
    context: AssetExecutionContext,
    dbt: DbtCliResource,
):
    yield from dbt.cli(["build"], context=context).stream()

This creates assets for each dbt model (stg_hourly_weather, int_hourly_enriched, fct_daily_weather, etc.) that are visible in the Dagster UI.

Scheduling

The pipeline runs daily at 6 AM UTC:

python
# dagster_project/schedules/daily.py
from dagster import ScheduleDefinition, AssetSelection

daily_weather_schedule = ScheduleDefinition(
    name="daily_weather_pipeline",
    cron_schedule="0 6 * * *",  # 6 AM UTC daily
    target=AssetSelection.all(),
    default_status=DefaultScheduleStatus.STOPPED,
)

Enable the schedule from the Dagster UI when ready for production.

Running the Pipeline

Option 1: Docker Compose

bash
# Start the containers
docker-compose up -d

# Access Dagster UI
open http://localhost:3333

# View logs
docker-compose logs -f

Option 2: Local Development

bash
# Install dependencies
uv sync

# Start Dagster development server
uv run dagster dev

# Access UI at http://localhost:3000

Materialize Assets

From the Dagster UI, click Materialize All to run the complete pipeline. You will see:

  • Asset graph visualization with dependencies
  • Real-time execution logs
  • Metadata for each asset (row counts, file paths, timestamps)
  • Run history and timing information

Sample Results

After running the pipeline, query the marts tables:

sql
-- Query city comparison rankings
SELECT
    city,
    round(avg_temperature, 1) as avg_temp_c,
    round(total_precipitation, 1) as total_precip_mm,
    warmest_rank,
    wettest_rank
FROM fct_city_comparison
ORDER BY warmest_rank;

-- Results:
-- | city     | avg_temp_c | total_precip_mm | warmest_rank | wettest_rank |
-- |----------|------------|-----------------|--------------|--------------|
-- | Dubai    | 21.3       | 0.1             | 1            | 4            |
-- | Riyadh   | 15.1       | 0.0             | 2            | 3            |
-- | London   | 3.1        | 2.4             | 3            | 2            |
-- | New York | -0.8       | 19.6            | 4            | 1            |

Key Takeaways

This project demonstrates several modern data engineering practices:

  • Asset-based orchestration provides better lineage and observability than task-based DAGs
  • Separation of concerns - Dagster handles orchestration, dbt handles transformations
  • Idempotent operations with INSERT OR REPLACE allow safe re-runs
  • Dependency injection via Dagster resources enables testing
  • Data quality testing catches issues before they reach production tables
  • Zero infrastructure - DuckDB eliminates database setup

The complete source code is available on GitHub.