Slack Webhook
Slack에서 외부 앱이 봇처럼 활용될 수 있도록 특정 워크스페이스에 열어둔 HTTP URL이다. 워크스페이스의 고유한 주소로, 이 주소로 정보를 보내면 채팅으로 올라온다. 다양한 활용 방법이 있는데,
- 외부 서비스 이벤트 알림
- 데이터 파이프라인 알림
- 서버 알림
등으로 활용할 수 있다.
- 발급 방법
- https://api.slack.com/apps 접속
- “Create New App” 클릭
- “From scratch” 선택 → 이름과 워크스페이스 설정
- 좌측 메뉴에서 “Incoming Webhooks” 선택
- “Activate Incoming Webhooks” → ON
- 아래로 스크롤 → “Add New Webhook to Workspace” 클릭
- 채널 선택 후 “Allow” 클릭
- 발급된 Webhook URL을 복사 (예: https://hooks.slack.com/services/T0000/B0000/XXXXXXXX)
DAG 예제
Slack Webhook을 다룰 파이프라인 코드 예제이다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import random
import requests
import os
import logging
WORDS_FILE_PATH = "***/opt/airflow/dags/resources/words.txt"
SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/**********"
Python 코드를 작동시키기 때문에 PythonOperator와 Webhook에 HTTP Requests를 보내기 때문에 Request를 라이브러리로 가져온다. 그리고 파이프라인의 각 동작을 로그로 검사하기에 logging 라이브러리도 가져온다.
def choose_random_word(**context):
logging.info(f"📘 Reading words from: {WORDS_FILE_PATH}")
if not os.path.exists(WORDS_FILE_PATH):
logging.error("❌ words.txt 파일이 존재하지 않습니다.")
raise FileNotFoundError(f"{WORDS_FILE_PATH} not found.")
with open(WORDS_FILE_PATH, "r") as f:
words = [line.strip() for line in f if line.strip()]
if not words:
logging.warning("⚠️ words.txt에 유효한 단어가 없습니다.")
raise ValueError("No valid words found in file.")
chosen = random.choice(words)
logging.info(f"✅ 선택된 단어: {chosen}")
context['ti'].xcom_push(key='selected_word', value=chosen)
logging.info("📤 XCom에 단어 푸시 완료")
파이프라인 중 첫 번째 단계인 choose_random_word다. airflow의 특정 폴더 안에 있는 words.txt의 랜덤 문구를 하나 고르는 함수이다. Airflow에서는 파이프라인의 단계를 이런 함수로 표현한다. 각 과정에는 로그를 살펴볼 수 있는 코드가 추가되어 있다. 마지막에 랜덤으로 선택한 문자열을 xcom으로 보낸다. xcom은 task간의 공유하는 데이터를 저장할 때 사용하는 메타데이터 저장소이다. 데이터베이스에 저장되기에 대용량 데이터로는 적합하지 않다.
def send_to_slack(**context):
word = context['ti'].xcom_pull(task_ids='choose_word', key='selected_word')
if not word:
logging.error("❌ XCom에서 단어를 가져오지 못했습니다.")
raise ValueError("No word retrieved from XCom.")
message = f"🎯 Random message of the run: *{word}*"
payload = {"text": message}
logging.info(f"📨 Slack에 전송할 메시지: {message}")
if not SLACK_WEBHOOK_URL:
logging.error("❌ SLACK_WEBHOOK_URL 환경 변수가 설정되어 있지 않습니다.")
raise EnvironmentError("SLACK_WEBHOOK_URL is not set.")
try:
response = requests.post(SLACK_WEBHOOK_URL, json=payload)
logging.info(f"Slack 응답 코드: {response.status_code}")
if response.status_code != 200:
logging.error(f"❌ Slack 메시지 전송 실패: {response.text}")
raise Exception(f"Slack notification failed: {response.text}")
else:
logging.info("✅ Slack 메시지 전송 성공")
except Exception as e:
logging.exception("Slack 전송 중 예외 발생")
raise e
xcom에 있는 문자열을 HTTP Request를 통해 Webhook으로 전송하는 task이다. 로그 때문에 복잡해보이지만 단순한 함수이다.
default_args = {
'owner': 'airflow',
}
with DAG(
dag_id='random_word_to_slack',
default_args=default_args,
schedule_interval=None,
start_date=days_ago(1),
catchup=False,
tags=['example', 'slack'],
) as dag:
t1 = PythonOperator(
task_id='choose_word',
python_callable=choose_random_word,
)
t2 = PythonOperator(
task_id='send_slack',
python_callable=send_to_slack,
)
t1 >> t2
Airflow에서 PythonOperator를 사용하여 이런식으로 DAG를 설정할 수 있다. task당 하나씩 Operator를 설정하고 어떤 task를 동작할 것인지 정할 수 있다. Operator를 통해 task를 정의하고 Executor가 어떤 Worker를 사용해 task를 실행할지 결정한다. Worker는 다양한 Operator를 통해 task를 실행하게 된다.
실습 과정
단계로 풀어보면 다음과 같다.
- Airflow 실행 및 DAG 인식
- choose_word task에서 words.txt 중 랜덤으로 문자열 추출
- slack_send task에서 xcom에 있는 랜덤 문자열을 받아 webhook에 전송(HTTP Request)
- slack이 전송 받은 문자열을 출력함.
결과

'Data Engineering' 카테고리의 다른 글
| [Kafka] Kafka에 대하여 (0) | 2025.10.13 |
|---|---|
| [데이터 파이프라인] 3. 데이터 수집 실습 (0) | 2025.07.10 |
| [Airflow] Slack을 이용한 Airflow 실습 - 2 (Airflow와 DAG) (0) | 2025.06.25 |
| [Airflow] Slack을 이용한 Airflow 실습 - 1 (Docker와 Airflow 연동) (0) | 2025.06.24 |
| [데이터 파이프라인] 2. 일반적인 데이터 파이프라인 (0) | 2025.06.17 |