"""
dbt + Collate Integration DAG (Azure Blob Method)
This DAG:
1. Runs dbt models
2. Runs dbt tests
3. Generates dbt documentation (catalog.json)
4. Uploads all artifacts to Azure Blob Storage
Perfect for AKS, Azure VMs, or Container Instances.
"""
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
# =============================================================================
# CONFIGURATION
# =============================================================================
# dbt Configuration
DBT_PROJECT_DIR = os.getenv("DBT_PROJECT_DIR", "/opt/airflow/dbt/my_project")
DBT_PROFILES_DIR = os.getenv("DBT_PROFILES_DIR", "/opt/airflow/dbt")
# Azure Blob Storage Configuration
AZURE_STORAGE_ACCOUNT = os.getenv("AZURE_STORAGE_ACCOUNT", "dbtartifacts12345")
AZURE_CONTAINER_NAME = os.getenv("AZURE_CONTAINER_NAME", "dbt-artifacts")
AZURE_STORAGE_KEY = os.getenv("AZURE_STORAGE_KEY", "")
AZURE_CONNECTION_STRING = os.getenv("AZURE_STORAGE_CONNECTION_STRING", "")
# =============================================================================
# DAG DEFAULT ARGUMENTS
# =============================================================================
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email": ["data-team@yourcompany.com"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=2),
}
# =============================================================================
# PYTHON FUNCTIONS
# =============================================================================
def upload_artifacts_to_azure(**context):
"""
Upload dbt artifacts to Azure Blob Storage.
Uses azure-storage-blob library.
Install with: pip install azure-storage-blob
"""
from azure.storage.blob import BlobServiceClient
target_dir = os.path.join(DBT_PROJECT_DIR, "target")
# Initialize Azure Blob Service Client
if AZURE_CONNECTION_STRING:
blob_service_client = BlobServiceClient.from_connection_string(
AZURE_CONNECTION_STRING
)
else:
account_url = f"https://{AZURE_STORAGE_ACCOUNT}.blob.core.windows.net"
blob_service_client = BlobServiceClient(
account_url=account_url,
credential=AZURE_STORAGE_KEY
)
container_client = blob_service_client.get_container_client(AZURE_CONTAINER_NAME)
# Files to upload
artifacts = [
("manifest.json", True), # Required
("catalog.json", False), # Optional but recommended
("run_results.json", False), # Optional
("sources.json", False), # Optional
]
uploaded = []
failed = []
for filename, required in artifacts:
local_path = os.path.join(target_dir, filename)
if os.path.exists(local_path):
try:
blob_client = container_client.get_blob_client(filename)
with open(local_path, "rb") as data:
blob_client.upload_blob(data, overwrite=True)
uploaded.append(filename)
print(f"✓ Uploaded {filename} to Azure Blob Storage")
except Exception as e:
error_msg = f"✗ Failed to upload {filename}: {e}"
print(error_msg)
if required:
raise Exception(error_msg)
failed.append(filename)
else:
if required:
raise FileNotFoundError(
f"Required artifact not found: {local_path}\n"
f"Make sure 'dbt run' completed successfully."
)
else:
print(f"⊘ Skipping {filename} (not found - optional)")
# Log summary
print(f"\n{'='*50}")
print(f"Upload Summary:")
print(f" Uploaded: {', '.join(uploaded) or 'None'}")
print(f" Skipped: {', '.join(failed) or 'None'}")
print(f" Azure Location: {AZURE_STORAGE_ACCOUNT}/{AZURE_CONTAINER_NAME}/")
print(f"{'='*50}")
return {
"uploaded": uploaded,
"storage_account": AZURE_STORAGE_ACCOUNT,
"container": AZURE_CONTAINER_NAME
}
# =============================================================================
# DAG DEFINITION
# =============================================================================
with DAG(
dag_id="dbt_with_azure",
default_args=default_args,
description="Run dbt models and sync metadata to Collate via Azure Blob",
schedule_interval="0 6 * * *", # Daily at 6 AM UTC
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=["dbt", "collate", "azure", "data-pipeline"],
) as dag:
# Task Group: dbt Execution
with TaskGroup(group_id="dbt_execution") as dbt_tasks:
dbt_run = BashOperator(
task_id="dbt_run",
bash_command=f"""
cd {DBT_PROJECT_DIR} && \
dbt run --profiles-dir {DBT_PROFILES_DIR}
""",
)
dbt_test = BashOperator(
task_id="dbt_test",
bash_command=f"""
cd {DBT_PROJECT_DIR} && \
dbt test --profiles-dir {DBT_PROFILES_DIR}
""",
trigger_rule="all_done",
)
dbt_docs = BashOperator(
task_id="dbt_docs_generate",
bash_command=f"""
cd {DBT_PROJECT_DIR} && \
dbt docs generate --profiles-dir {DBT_PROFILES_DIR}
""",
)
dbt_run >> dbt_test >> dbt_docs
# Upload to Azure Blob
upload_to_azure = PythonOperator(
task_id="upload_artifacts_to_azure",
python_callable=upload_artifacts_to_azure,
provide_context=True,
)
# DAG Dependencies
dbt_tasks >> upload_to_azure