Partitions vs config
In this example, we'll explore two different approaches to parameterize Dagster pipelines. When you need to process data for different segments (like customers, regions, or dates), you can choose between Dagster's partitions or run configuration. Each approach has distinct trade-offs in terms of tracking, observability, and workflow.
Problem: Processing data for multiple customers
Imagine you need to process data for multiple customers, where each customer's data should be processed independently. You want to be able to run the pipeline for specific customers and potentially reprocess historical data when needed.
The key question is: Should you use partitions to create a segment for each customer, or should you use config to pass the customer ID as a parameter?
Solution 1: Using partitions
Partitions divide your data into discrete segments. Each customer becomes a partition, giving you full visibility into which customers have been processed and the ability to backfill specific customers.
import dagster as dg
customer_partitions = dg.StaticPartitionsDefinition(["customer_a", "customer_b", "customer_c"])
@dg.asset(partitions_def=customer_partitions)
def customer_orders(context: dg.AssetExecutionContext):
"""Fetch and process orders for a specific customer partition."""
customer_id = context.partition_key
context.log.info(f"Fetching orders for customer: {customer_id}")
orders = [
{"order_id": f"{customer_id}-001", "amount": 150.00},
{"order_id": f"{customer_id}-002", "amount": 275.50},
{"order_id": f"{customer_id}-003", "amount": 89.99},
]
context.log.info(f"Processed {len(orders)} orders for {customer_id}")
return orders
@dg.asset(partitions_def=customer_partitions, deps=[customer_orders])
def customer_summary(context: dg.AssetExecutionContext):
"""Generate a summary report for a specific customer partition."""
customer_id = context.partition_key
context.log.info(f"Generating summary for customer: {customer_id}")
summary = {
"customer_id": customer_id,
"total_orders": 3,
"total_revenue": 515.49,
}
context.log.info(f"Summary for {customer_id}: {summary}")
return summary
@dg.schedule(
cron_schedule="0 1 * * *",
job=dg.define_asset_job(
"all_customers_job",
selection=[customer_orders, customer_summary],
partitions_def=customer_partitions,
),
)
def daily_customer_schedule():
"""Trigger processing for all customer partitions."""
for partition_key in customer_partitions.get_partition_keys():
yield dg.RunRequest(partition_key=partition_key)
| Partitions approach | |
|---|---|
| Materialization tracking | Per-customer history visible in UI |
| Backfilling | Built-in support for reprocessing specific customers |
| Scheduling | Native support for processing all partitions |
| UI experience | Partition status bar shows processing state |
| Setup complexity | Requires defining partition set upfront |
Solution 2: Using config
Run configuration passes the customer ID as a parameter at runtime. This approach is simpler to set up but doesn't track which customers have been processed.
import dagster as dg
class CustomerConfig(dg.Config):
customer_id: str
@dg.asset
def customer_orders(context: dg.AssetExecutionContext, config: CustomerConfig):
"""Fetch and process orders for a specific customer."""
customer_id = config.customer_id
context.log.info(f"Fetching orders for customer: {customer_id}")
orders = [
{"order_id": f"{customer_id}-001", "amount": 150.00},
{"order_id": f"{customer_id}-002", "amount": 275.50},
{"order_id": f"{customer_id}-003", "amount": 89.99},
]
context.log.info(f"Processed {len(orders)} orders for {customer_id}")
return orders
@dg.asset(deps=[customer_orders])
def customer_summary(context: dg.AssetExecutionContext, config: CustomerConfig):
"""Generate a summary report for a specific customer."""
customer_id = config.customer_id
context.log.info(f"Generating summary for customer: {customer_id}")
summary = {
"customer_id": customer_id,
"total_orders": 3,
"total_revenue": 515.49,
}
context.log.info(f"Summary for {customer_id}: {summary}")
return summary
| Config approach | |
|---|---|
| Materialization tracking | Single asset history (not per-customer) |
| Backfilling | Manual re-runs required |
| Scheduling | Requires custom logic to iterate customers |
| UI experience | Specify customer in Launchpad before each run |
| Setup complexity | Simple config class, no partition management |
When to use each approach
The choice between partitions and config depends on your specific requirements:
Use partitions when:
- Your data naturally segments into discrete categories
- You need to track materialization status per segment
- Backfilling specific segments is a common operation
- You want to schedule processing for all segments automatically
- You need visibility into which segments are up-to-date vs stale
Use config when:
- Processing is infrequent or ad-hoc
- Parameters are dynamic or come from an unbounded set
- You don't need per-parameter tracking
- A single materialization history is sufficient
- You want simple parameterization without partition overhead
Hybrid approach
You can also combine both approaches: use partitions for the primary segmentation (e.g., by customer) and config for additional runtime parameters (e.g., processing options).
import dagster as dg
customer_partitions = dg.StaticPartitionsDefinition(["customer_a", "customer_b", "customer_c"])
class ProcessingConfig(dg.Config):
include_archived: bool = False
limit: int = 1000
@dg.asset(partitions_def=customer_partitions)
def customer_data(context: dg.AssetExecutionContext, config: ProcessingConfig):
customer_id = context.partition_key
context.log.info(f"Processing {customer_id} with include_archived={config.include_archived}")
This gives you the benefits of partition tracking while maintaining flexibility for runtime parameters.