[Airflow] Slack을 이용한 Airflow 실습 - 3 (Slack Webhook)

2025. 7. 1. 13:07·Data Engineering

Slack Webhook

Slack에서 외부 앱이 봇처럼 활용될 수 있도록 특정 워크스페이스에 열어둔 HTTP URL이다. 워크스페이스의 고유한 주소로, 이 주소로 정보를 보내면 채팅으로 올라온다. 다양한 활용 방법이 있는데,

  • 외부 서비스 이벤트 알림
  • 데이터 파이프라인 알림
  • 서버 알림

 

등으로 활용할 수 있다.

 

  • 발급 방법
  1. https://api.slack.com/apps 접속
  2. “Create New App” 클릭
  3. “From scratch” 선택 → 이름과 워크스페이스 설정
  4. 좌측 메뉴에서 “Incoming Webhooks” 선택
  5. “Activate Incoming Webhooks” → ON
  6. 아래로 스크롤 → “Add New Webhook to Workspace” 클릭
  7. 채널 선택 후 “Allow” 클릭
  8. 발급된 Webhook URL을 복사 (예: https://hooks.slack.com/services/T0000/B0000/XXXXXXXX)

 

DAG 예제

Slack Webhook을 다룰 파이프라인 코드 예제이다.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import random
import requests
import os
import logging

WORDS_FILE_PATH = "***/opt/airflow/dags/resources/words.txt"
SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/**********"

 

Python 코드를 작동시키기 때문에 PythonOperator와 Webhook에 HTTP Requests를 보내기 때문에 Request를 라이브러리로 가져온다. 그리고 파이프라인의 각 동작을 로그로 검사하기에 logging 라이브러리도 가져온다.

 

def choose_random_word(**context):
    logging.info(f"📘 Reading words from: {WORDS_FILE_PATH}")
    
    if not os.path.exists(WORDS_FILE_PATH):
        logging.error("❌ words.txt 파일이 존재하지 않습니다.")
        raise FileNotFoundError(f"{WORDS_FILE_PATH} not found.")
    
    with open(WORDS_FILE_PATH, "r") as f:
        words = [line.strip() for line in f if line.strip()]

    if not words:
        logging.warning("⚠️ words.txt에 유효한 단어가 없습니다.")
        raise ValueError("No valid words found in file.")

    chosen = random.choice(words)
    logging.info(f"✅ 선택된 단어: {chosen}")

    context['ti'].xcom_push(key='selected_word', value=chosen)
    logging.info("📤 XCom에 단어 푸시 완료")

 

파이프라인 중 첫 번째 단계인 choose_random_word다. airflow의 특정 폴더 안에 있는 words.txt의 랜덤 문구를 하나 고르는 함수이다. Airflow에서는 파이프라인의 단계를 이런 함수로 표현한다. 각 과정에는 로그를 살펴볼 수 있는 코드가 추가되어 있다. 마지막에 랜덤으로 선택한 문자열을 xcom으로 보낸다. xcom은 task간의 공유하는 데이터를 저장할 때 사용하는 메타데이터 저장소이다. 데이터베이스에 저장되기에 대용량 데이터로는 적합하지 않다.

 

def send_to_slack(**context):
    word = context['ti'].xcom_pull(task_ids='choose_word', key='selected_word')
    if not word:
        logging.error("❌ XCom에서 단어를 가져오지 못했습니다.")
        raise ValueError("No word retrieved from XCom.")

    message = f"🎯 Random message of the run: *{word}*"
    payload = {"text": message}

    logging.info(f"📨 Slack에 전송할 메시지: {message}")

    if not SLACK_WEBHOOK_URL:
        logging.error("❌ SLACK_WEBHOOK_URL 환경 변수가 설정되어 있지 않습니다.")
        raise EnvironmentError("SLACK_WEBHOOK_URL is not set.")

    try:
        response = requests.post(SLACK_WEBHOOK_URL, json=payload)
        logging.info(f"Slack 응답 코드: {response.status_code}")
        if response.status_code != 200:
            logging.error(f"❌ Slack 메시지 전송 실패: {response.text}")
            raise Exception(f"Slack notification failed: {response.text}")
        else:
            logging.info("✅ Slack 메시지 전송 성공")
    except Exception as e:
        logging.exception("Slack 전송 중 예외 발생")
        raise e

 

xcom에 있는 문자열을 HTTP Request를 통해 Webhook으로 전송하는 task이다. 로그 때문에 복잡해보이지만 단순한 함수이다.

 

default_args = {
    'owner': 'airflow',
}

with DAG(
    dag_id='random_word_to_slack',
    default_args=default_args,
    schedule_interval=None,
    start_date=days_ago(1),
    catchup=False,
    tags=['example', 'slack'],
) as dag:

    t1 = PythonOperator(
        task_id='choose_word',
        python_callable=choose_random_word,
    )

    t2 = PythonOperator(
        task_id='send_slack',
        python_callable=send_to_slack,
    )

    t1 >> t2

 

Airflow에서 PythonOperator를 사용하여 이런식으로 DAG를 설정할 수 있다. task당 하나씩 Operator를 설정하고 어떤 task를 동작할 것인지 정할 수 있다. Operator를 통해 task를 정의하고 Executor가 어떤 Worker를 사용해 task를 실행할지 결정한다. Worker는 다양한 Operator를 통해 task를 실행하게 된다.

 

 

실습 과정

단계로 풀어보면 다음과 같다.

 

  1. Airflow 실행 및 DAG 인식
  2. choose_word task에서 words.txt 중 랜덤으로 문자열 추출
  3. slack_send task에서 xcom에 있는 랜덤 문자열을 받아 webhook에 전송(HTTP Request)
  4. slack이 전송 받은 문자열을 출력함.

 

결과

'Data Engineering' 카테고리의 다른 글

[Kafka] Kafka에 대하여  (0) 2025.10.13
[데이터 파이프라인] 3. 데이터 수집 실습  (0) 2025.07.10
[Airflow] Slack을 이용한 Airflow 실습 - 2 (Airflow와 DAG)  (0) 2025.06.25
[Airflow] Slack을 이용한 Airflow 실습 - 1 (Docker와 Airflow 연동)  (0) 2025.06.24
[데이터 파이프라인] 2. 일반적인 데이터 파이프라인  (0) 2025.06.17
'Data Engineering' 카테고리의 다른 글
  • [Kafka] Kafka에 대하여
  • [데이터 파이프라인] 3. 데이터 수집 실습
  • [Airflow] Slack을 이용한 Airflow 실습 - 2 (Airflow와 DAG)
  • [Airflow] Slack을 이용한 Airflow 실습 - 1 (Docker와 Airflow 연동)
BestTomaTo
BestTomaTo
  • BestTomaTo
    기록보관소
    BestTomaTo
  • 전체
    오늘
    어제
    • 분류 전체보기 (36) N
      • Algorithm (8)
      • Computer Science (3)
      • Backend (3)
      • DevOps (4)
        • Kubernetes (3)
        • Docker (0)
      • Data Engineering (8)
      • Cloud (2)
      • AI (1)
      • Security (3) N
        • SK Shieldus Rookies (3) N
      • Reference (2)
      • Project (1)
      • Experience (1)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

    langchain memory
    SQLD
    해커톤 후기
    3단계 모델링
    airlfow
    홈 서버
    동기 프로그래밍
    sql 개발자
    AWS
    langsmith
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.4
BestTomaTo
[Airflow] Slack을 이용한 Airflow 실습 - 3 (Slack Webhook)
상단으로

티스토리툴바