linkedin insight
Omax Tech

Loading...

Dagster For Data Orchestration

Dagster For Data Orchestration

Software Development
Apr 23, 2025
3-4 min

Share blog

Introduction to ETLs and Orchestration

In today’s data-driven world, businesses need a structured approach to managing and processing vast amounts of data. This is where ETL (Extract, Transform, Load) processes come into play. ETL helps organizations systematically collect, refine, and store data for further analysis and decision-making.

  • Data is retrieved from various sources, including relational databases, APIs, flat files, and cloud storage systems like AWS S3 or Google Cloud Storage.
  • The extracted raw data undergoes cleansing, validation, and formatting to meet the required standards. This step may include filtering unwanted data, aggregating information, or applying business rules.
  • The transformed data is then loaded into a target data warehouse, data lake, or database for further analytics, reporting, or machine learning applications.

It’s important to understand that ETL is the process, while orchestration refers to the coordination and automation of that process. Orchestration tools like Dagster help ensure that the steps of an ETL workflow are:

  • Executed in the correct order with dependencies handled properly.
  • Automatically retried or flagged when failures occur.
  • Observable, testable, and easy to debug.

So while ETL defines what needs to happen with the data, orchestration defines how and when it happens, particularly at scale and across many interconnected steps. That’s why we chose Dagster for our ETL workflows it provides robust orchestration capabilities tailored to data engineering.

Why Use Dagster?

Dagster is an open-source data orchestrator designed to simplify and automate ETL workflows. It provides a structured approach to defining data pipelines and managing dependencies between tasks. Here are some key reasons why Dagster stands out:

  • No more running scripts manually. Dagster automates the execution of each step in the workflow.
  • Built-in tools allow you to track the execution status of every step, visualize logs, and debug failures effectively.
  • Dagster seamlessly integrates with cloud services like AWS, Google Cloud, and Snowflake, making it ideal for processing large datasets.
  • Define assets as independent units that can be reused in multiple workflows. For example, the same transformation logic can be applied to multiple datasets without redundant coding.

Setting Up Dagster

1. Setup a virtual environment

  • python -m venv dagster-env
  • source dagster-env/bin/activate # On Windows, use `dagster-env\Scripts\activate`

2. Install Dagster

  • pip install dagster dagster-webserver pandas

3. Run dagster

  • Run dagster ui by “dagster dev”

The Example Pipeline

Our pipeline covers:

  • 1
    Fetching a CSV file from AWS S3.
  • 2
    Processing raw data and generating summary.
  • 3
    Loading the data into Snowflake.

1. Setting Up Resources

Resources in Dagster represent external dependencies like databases or cloud storage. Below is the resource setup for AWS S3 and Snowflake.

python
1from dagster import resource
2import os
3
4@resource
5def s3_resource(_):
6 import boto3
7 return boto3.client(
8 "s3",
9 aws_access_key_id=os.getenv("YOUR_AWS_ACCESS_KEY"),
10 aws_secret_access_key=os.getenv("YOUR_AWS_SECRET_KEY"),
11 region_name="ap-south-1",
12 )
13
14@resource
15def snowflake_resource():
16 from dagster_snowflake import snowflake_resource
17 return snowflake_resource.configured({
18 "account": os.getenv("SNOWFLAKE_ACCOUNT"),
19 "user": os.getenv("SNOWFLAKE_USER"),
20 "password": os.getenv("SNOWFLAKE_PASSWORD"),
21 "database": "FitnessApp",
22 "warehouse": "COMPUTE_WH",
23 "schema": "PUBLIC",
24 })

2. Fetching Data from S3

This step involves downloading a CSV file from an S3 bucket and loading it into a Pandas DataFrame for further processing. Metadata logging ensures visibility into the process.

python
1from dagster import resource
2import os
3
4@asset(required_resource_keys={"s3"})
5
6def fetch_csv_from_s3(context):
7 import pandas as pd
8 import io
9
10 bucket = "s3-snow-lambda"
11 key = "prices.csv"
12
13 response = context.resources.s3.get_object(Bucket=bucket, Key=key)
14 csv_content = response["Body"].read().decode("utf-8")
15
16 data_frame = pd.read_csv(io.StringIO(csv_content))
17 context.log.info(f"Fetched {len(data_frame)} rows from S3")
18 context.add_output_metadata({
19 "DataFrame Preview": MetadataValue.md(data_frame.head().to_markdown(index=False))
20 })
21 return data_frame

3. Processing the Data

After fetching the data, we add a new column called discounted_price by applying a 10% discount to an existing price column. This demonstrates simple data transformation.

python
1from dagster import resource
2import os
3
4@asset(required_resource_keys={"s3"})
5
6def fetch_csv_from_s3(context):
7 import pandas as pd
8 import io
9
10 bucket = "s3-snow-lambda"
11 key = "prices.csv"
12
13 response = context.resources.s3.get_object(Bucket=bucket, Key=key)
14 csv_content = response["Body"].read().decode("utf-8")
15
16 data_frame = pd.read_csv(io.StringIO(csv_content))
17 context.log.info(f"Fetched {len(data_frame)} rows from S3")
18 context.add_output_metadata({
19 "DataFrame Preview": MetadataValue.md(data_frame.head().to_markdown(index=False))
20 })
21 return data_frame

4. Loading Data into Snowflake

The processed data is loaded into a Snowflake table. Temporary stages are used to manage file uploads efficiently.

python
1from dagster import resource
2import os
3
4@asset(required_resource_keys={"snowflake"})
5
6def save_to_snowflake(context, process_data):
7 import pandas as pd
8 df = process_data
9
10 temp_file_path = "temp_data.csv"
11 df.to_csv(temp_file_path, index=False, header=False)
12
13 snowflake_stage = "TEMP_STAGE"
14 snowflake_table = "PRICES"
15
16 try:
17 with context.resources.snowflake.get_connection() as conn:
18 cursor = conn.cursor()
19 cursor.execute(f"CREATE OR REPLACE STAGE {snowflake_stage}")
20 cursor.execute(f"PUT file://{temp_file_path} @{snowflake_stage}")
21 cursor.execute(f"COPY INTO {snowflake_table} FROM @{snowflake_stage} FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"');")
22 cursor.execute(f"REMOVE @{snowflake_stage}")
23
24 context.log.info("Data successfully loaded into Snowflake.")
25 except Exception as e:
26 context.log.error(f"Failed to load data into Snowflake: {e}")
27 raise
28 finally:
29 if os.path.exists(temp_file_path):
30 os.remove(temp_file_path)

5. Definitions

Finally, we register resources and assets in Dagster.

python
1from dagster import Definitions
2
3defs = Definitions(
4 resources={"s3": s3_resource, "snowflake": snowflake_resource},
5 assets=[fetch_csv_from_s3, process_data, save_to_snowflake],
6)

Running Dagster Pipeline

Now, we can see the Global asset lineage in dagster ui running locally on port 3000

Dagster overview

We can materialize an asset by running its job

Dagster overview

Our Asset is now materialized and now we can preview the data it has produced

Dagster overview

Now lets materialize the Process Data asset

Dagster overview

We can now view its meta data too to see the data it has produced

Dagster overview

Now, lets move to our final step, which is inputting this transformed data into Snowflake database, lets run our snowflake asset

Dagster overview

Now, we can verify our data in snowflake cloud

Dagster overview

As we can see, our transformation and saving of data has been executed successfully.

Benefits of Using Dagster

  • 1
    Each step in the pipeline is defined independently, making it reusable and testable.
  • 2
    Dagster’s rich logging and metadata capabilities provide clear insights into pipeline execution.
  • 3
    Seamlessly integrates with tools like AWS, Snowflake, and Pandas.
  • 4
    Allows customization and configuration to meet specific pipeline requirements.

Conclusion

Dagster makes managing complex data pipelines intuitive and efficient. Whether you are working with cloud storage, databases, or data transformations, Dagster provides the tools and flexibility needed for modern data orchestration. Start building your pipelines today and experience the power of Dagster!

Blogs

Discover the latest insights and trends in technology with the Omax Tech Blog.

View All Blogs
Data operations dashboard showing production quality checks, performance trends, and incident alerts across stores.
8-10 min
April 09, 2026

Production Ready ( Quality, performance, and the lessons learned shipping to 150 stores )

We chose dbt over custom scripts, built observability, optimized performance, and shipped to production...

Read More
Scalable data pipeline diagram highlighting dbt macros, reusable models, and multi-store analytics flow.
8-10 min
April 08, 2026

Scaling from 15 to 150 Stores ( When copy-paste becomes technical debt, macros become salvation )

We built a pipeline with observability, incremental models for performance, and snapshots for history. Our 15-store deployment ran smoothly...

Read More
Observability dashboard tracking source freshness, pipeline status, and real-time data quality alerts.
8-10 min
April 07, 2026

Keeping Your Data Fresh: ( The wake-up call at 3am that taught us about observability )

That morning taught us a crucial lesson: a successful dbt run doesn't mean your data is fresh, accurate, or complete. You need observability.

Read More
Retail data architecture visual showing fragmented store databases consolidated into a unified analytics pipeline.
8-10 min
April 06, 2026

Retail Data Chaos: How We Found Our Way Out ( When spreadsheetsfail and databases multiply, where do you turn? )

Picture this: You're managing data for a growing retail chain. Store afterstore opens New York, San Francisco, Los Angeles—each with its own MySQL database...

Read More
Secure AI access workflow showing authentication, authorization, and protected enterprise operations.
8-10 min
April 07, 2026

Securing Your AI-Powered Future (How Authorization Ensures Safe and Appropriate Access)

Discover how authorization in MCP ensures secure, role-based access for AI-powered business workflows...

Read More
AI security dashboard visualizing request throttling, traffic control, and system protection metrics.
6-8 min
April 06, 2026

Protecting Your AI-Powered Systems (How Rate Limiting Ensures Stability and Performance)

MCP connects AI to your applications (Episode 1) and enables powerful self-service analytics (Episode 2)...

Read More
AI dashboard visual showing analytics insights, charts, and automated business reporting.
6-8 min
April 05, 2026

AI-Powered Analytics (How MCP Enables Self-Service Reporting Without Developers)

One of the most powerful applications of MCP is enabling self-service analytics. Product owners, managers, and business analysts...

Read More
Futuristic AI robot on a digital platform representing artificial intelligence and automation.
6-8 min
April 04, 2026

AI Meets Your Applications (What is MCP and Why Your Business Needs It Now)

Traditional application programming interfaces (APIs) have served us well, but they require technical knowledge. Developers need to understand endpoints...

Read More
Startup MVP architecture illustration with rocket and analytics icons.
6-8 min
Feb 25, 2026

Why Building the Right MVP Architecture No Longer Slows You Down

Just build a simple monolith for your MVP. You can fix the architecture later...

Read More

Get In Touch

Build Your Next Big Idea with Us

From MVPs to full-scale applications, we help you bring your vision to life on time and within budget. Our expert team delivers scalable, high-quality software tailored to your business goals.