"""
dbt + Collate Integration DAG (S3 Method)
This DAG:
1. Runs dbt models
2. Runs dbt tests
3. Generates dbt documentation (catalog.json)
4. Uploads all artifacts to S3
No Collate packages are installed in this Airflow environment.
Collate pulls the artifacts from S3 independently.
"""
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
# =============================================================================
# CONFIGURATION
# =============================================================================
# dbt Configuration
DBT_PROJECT_DIR = os.getenv("DBT_PROJECT_DIR", "/opt/airflow/dbt/my_project")
DBT_PROFILES_DIR = os.getenv("DBT_PROFILES_DIR", "/opt/airflow/dbt")
# S3 Configuration
S3_BUCKET = os.getenv("S3_BUCKET", "your-company-dbt-artifacts")
S3_PREFIX = os.getenv("S3_PREFIX", "dbt-artifacts")
AWS_REGION = os.getenv("AWS_DEFAULT_REGION", "us-east-1")
# =============================================================================
# DAG DEFAULT ARGUMENTS
# =============================================================================
default_args = {
"owner": "data-engineering",
"depends_on_past": False,
"email": ["data-team@yourcompany.com"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=2),
}
# =============================================================================
# PYTHON FUNCTIONS
# =============================================================================
def upload_artifacts_to_s3(**context):
"""
Upload dbt artifacts to S3.
Uses boto3 (AWS SDK) which is typically available in Airflow.
If not: pip install boto3
"""
import boto3
from botocore.exceptions import ClientError
s3_client = boto3.client("s3", region_name=AWS_REGION)
target_dir = os.path.join(DBT_PROJECT_DIR, "target")
# Files to upload
artifacts = [
("manifest.json", True), # Required
("catalog.json", False), # Optional but recommended
("run_results.json", False), # Optional
("sources.json", False), # Optional
]
uploaded = []
failed = []
for filename, required in artifacts:
local_path = os.path.join(target_dir, filename)
s3_key = f"{S3_PREFIX}/{filename}"
if os.path.exists(local_path):
try:
s3_client.upload_file(local_path, S3_BUCKET, s3_key)
uploaded.append(filename)
print(f"✓ Uploaded {filename} to s3://{S3_BUCKET}/{s3_key}")
except ClientError as e:
error_msg = f"✗ Failed to upload {filename}: {e}"
print(error_msg)
if required:
raise Exception(error_msg)
failed.append(filename)
else:
if required:
raise FileNotFoundError(
f"Required artifact not found: {local_path}\n"
f"Make sure 'dbt run' completed successfully."
)
else:
print(f"⊘ Skipping {filename} (not found - optional)")
# Log summary
print(f"\n{'='*50}")
print(f"Upload Summary:")
print(f" Uploaded: {', '.join(uploaded) or 'None'}")
print(f" Skipped: {', '.join(failed) or 'None'}")
print(f" S3 Location: s3://{S3_BUCKET}/{S3_PREFIX}/")
print(f"{'='*50}")
return {"uploaded": uploaded, "bucket": S3_BUCKET, "prefix": S3_PREFIX}
# =============================================================================
# DAG DEFINITION
# =============================================================================
with DAG(
dag_id="dbt_with_collate",
default_args=default_args,
description="Run dbt models and sync metadata to Collate via S3",
schedule_interval="0 6 * * *", # Daily at 6 AM UTC
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=["dbt", "collate", "data-pipeline"],
) as dag:
# Task Group: dbt Execution
with TaskGroup(group_id="dbt_execution") as dbt_tasks:
dbt_run = BashOperator(
task_id="dbt_run",
bash_command=f"""
cd {DBT_PROJECT_DIR} && \
dbt run --profiles-dir {DBT_PROFILES_DIR}
""",
)
dbt_test = BashOperator(
task_id="dbt_test",
bash_command=f"""
cd {DBT_PROJECT_DIR} && \
dbt test --profiles-dir {DBT_PROFILES_DIR}
""",
trigger_rule="all_done", # Run even if dbt_run fails
)
dbt_docs = BashOperator(
task_id="dbt_docs_generate",
bash_command=f"""
cd {DBT_PROJECT_DIR} && \
dbt docs generate --profiles-dir {DBT_PROFILES_DIR}
""",
)
dbt_run >> dbt_test >> dbt_docs
# Upload to S3
upload_to_s3 = PythonOperator(
task_id="upload_artifacts_to_s3",
python_callable=upload_artifacts_to_s3,
provide_context=True,
)
# DAG Dependencies
dbt_tasks >> upload_to_s3