본 글은 Claude Sonnet 4로 작성했습니다.
Kafka란?
Apache Kafka는 분산 스트리밍 플랫폼이다. 실시간으로 대량의 데이터를 처리하고 저장할 수 있는 시스템으로, 높은 처리량과 내결함성을 제공한다.
핵심 개념
Producer와 Consumer
- Producer: 데이터를 Kafka에 전송하는 애플리케이션
- Consumer: Kafka에서 데이터를 읽어오는 애플리케이션
Topic과 Partition
- Topic: 데이터를 분류하는 카테고리. 메시지가 저장되는 논리적 단위
- Partition: Topic을 물리적으로 분할한 단위. 병렬 처리와 확장성을 위해 사용
Broker와 Cluster
- Broker: Kafka 서버 인스턴스. 메시지를 저장하고 처리
- Cluster: 여러 Broker로 구성된 Kafka 시스템
Consumer Group
- 같은 Topic을 읽는 Consumer들의 집합
- 각 Partition은 Consumer Group 내에서 하나의 Consumer만 읽을 수 있음
Kafka의 장점
높은 처리량
- 초당 수백만 건의 메시지 처리 가능
- 수평 확장을 통한 성능 향상
내결함성
- 데이터 복제를 통한 안정성 보장
- Broker 장애 시에도 서비스 지속
확장성
- Partition 추가를 통한 수평 확장
- 실시간 확장 가능
실습 예제
환경 설정
먼저 Kafka를 Docker로 실행한다:
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Python Producer
# producer.py
from kafka import KafkaProducer
import json
import time
from datetime import datetime
# Kafka Producer 설정
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'], # Kafka 서버 주소
value_serializer=lambda v: json.dumps(v).encode('utf-8') # JSON 직렬화
)
def send_user_activity():
"""사용자 활동 데이터를 Kafka로 전송"""
# 샘플 데이터 생성
user_activities = [
{"user_id": 1, "action": "login", "timestamp": datetime.now().isoformat()},
{"user_id": 2, "action": "purchase", "timestamp": datetime.now().isoformat()},
{"user_id": 3, "action": "logout", "timestamp": datetime.now().isoformat()}
]
for activity in user_activities:
# 메시지를 'user-activity' 토픽으로 전송
future = producer.send('user-activity', value=activity)
# 전송 결과 확인
try:
record_metadata = future.get(timeout=10)
print(f"메시지 전송 성공: {activity}")
print(f"토픽: {record_metadata.topic}, 파티션: {record_metadata.partition}")
except Exception as e:
print(f"메시지 전송 실패: {e}")
time.sleep(1) # 1초 간격으로 전송
if __name__ == "__main__":
send_user_activity()
producer.close() # Producer 종료
Python Consumer
# consumer.py
from kafka import KafkaConsumer
import json
# Kafka Consumer 설정
consumer = KafkaConsumer(
'user-activity', # 구독할 토픽
bootstrap_servers=['localhost:9092'], # Kafka 서버 주소
group_id='activity-processor', # Consumer Group ID
value_deserializer=lambda m: json.loads(m.decode('utf-8')), # JSON 역직렬화
auto_offset_reset='earliest' # 처음부터 메시지 읽기
)
def process_user_activity():
"""사용자 활동 데이터를 처리"""
print("사용자 활동 모니터링 시작...")
for message in consumer:
# 메시지 처리
activity = message.value
print(f"처리 중인 활동: {activity}")
# 활동 유형에 따른 처리 로직
if activity['action'] == 'login':
print(f"사용자 {activity['user_id']}가 로그인했습니다.")
elif activity['action'] == 'purchase':
print(f"사용자 {activity['user_id']}가 구매했습니다.")
elif activity['action'] == 'logout':
print(f"사용자 {activity['user_id']}가 로그아웃했습니다.")
print("---")
if __name__ == "__main__":
try:
process_user_activity()
except KeyboardInterrupt:
print("Consumer 종료")
finally:
consumer.close()
Go Producer
// producer.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
// UserActivity 사용자 활동 구조체
type UserActivity struct {
UserID int `json:"user_id"`
Action string `json:"action"`
Timestamp string `json:"timestamp"`
}
func main() {
// Kafka Writer 설정
writer := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"), // Kafka 서버 주소
Topic: "user-activity", // 토픽 이름
Balancer: &kafka.LeastBytes{}, // 파티션 분배 전략
}
defer writer.Close()
// 샘플 데이터 생성
activities := []UserActivity{
{UserID: 1, Action: "login", Timestamp: time.Now().Format(time.RFC3339)},
{UserID: 2, Action: "purchase", Timestamp: time.Now().Format(time.RFC3339)},
{UserID: 3, Action: "logout", Timestamp: time.Now().Format(time.RFC3339)},
}
// 메시지 전송
for _, activity := range activities {
// JSON 직렬화
data, err := json.Marshal(activity)
if err != nil {
log.Printf("JSON 직렬화 실패: %v", err)
continue
}
// 메시지 전송
err = writer.WriteMessages(context.Background(),
kafka.Message{
Key: []byte(fmt.Sprintf("user-%d", activity.UserID)), // 파티션 키
Value: data, // 메시지 내용
},
)
if err != nil {
log.Printf("메시지 전송 실패: %v", err)
} else {
fmt.Printf("메시지 전송 성공: %+v\n", activity)
}
time.Sleep(1 * time.Second) // 1초 간격
}
}
Go Consumer
// consumer.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/segmentio/kafka-go"
)
// UserActivity 사용자 활동 구조체
type UserActivity struct {
UserID int `json:"user_id"`
Action string `json:"action"`
Timestamp string `json:"timestamp"`
}
func main() {
// Kafka Reader 설정
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"}, // Kafka 서버 주소
Topic: "user-activity", // 토픽 이름
GroupID: "activity-processor-go", // Consumer Group ID
MinBytes: 10e3, // 최소 배치 크기 (10KB)
MaxBytes: 10e6, // 최대 배치 크기 (10MB)
})
defer reader.Close()
fmt.Println("사용자 활동 모니터링 시작...")
// 메시지 읽기
for {
message, err := reader.ReadMessage(context.Background())
if err != nil {
log.Printf("메시지 읽기 실패: %v", err)
continue
}
// JSON 역직렬화
var activity UserActivity
if err := json.Unmarshal(message.Value, &activity); err != nil {
log.Printf("JSON 역직렬화 실패: %v", err)
continue
}
// 활동 처리
fmt.Printf("처리 중인 활동: %+v\n", activity)
// 활동 유형에 따른 처리 로직
switch activity.Action {
case "login":
fmt.Printf("사용자 %d가 로그인했습니다.\n", activity.UserID)
case "purchase":
fmt.Printf("사용자 %d가 구매했습니다.\n", activity.UserID)
case "logout":
fmt.Printf("사용자 %d가 로그아웃했습니다.\n", activity.UserID)
}
fmt.Println("---")
}
}
실행 방법
1. Kafka 실행
# Docker Compose로 Kafka 실행
docker-compose up -d
# 토픽 생성
docker exec -it kafka_kafka_1 kafka-topics --create --topic user-activity --bootstrap-server localhost:9092
2. Python 실행
# 의존성 설치
pip install kafka-python
# Producer 실행
python producer.py
# Consumer 실행 (다른 터미널에서)
python consumer.py
3. Go 실행
# 의존성 설치
go mod init kafka-example
go get github.com/segmentio/kafka-go
# Producer 실행
go run producer.go
# Consumer 실행 (다른 터미널에서)
go run consumer.go
실무 활용 사례
로그 수집 시스템
- 마이크로서비스에서 생성되는 로그를 중앙 집중식으로 수집
- ELK Stack과 연동하여 로그 분석
실시간 데이터 파이프라인
- 사용자 행동 데이터를 실시간으로 처리
- 추천 시스템이나 개인화 서비스에 활용
이벤트 소싱
- 시스템의 모든 변경사항을 이벤트로 저장
- 데이터 복구와 감사(Audit) 용도로 활용
Kafka는 현대적인 분산 시스템에서 필수적인 컴포넌트이다. 높은 처리량과 안정성을 바탕으로 다양한 실시간 데이터 처리 시나리오에 활용할 수 있다.