Kafka 기본 개념과 실습 예제

본 글은 Claude Sonnet 4로 작성했습니다.


Kafka란?

Apache Kafka는 분산 스트리밍 플랫폼이다. 실시간으로 대량의 데이터를 처리하고 저장할 수 있는 시스템으로, 높은 처리량과 내결함성을 제공한다.

핵심 개념

Producer와 Consumer

  • Producer: 데이터를 Kafka에 전송하는 애플리케이션
  • Consumer: Kafka에서 데이터를 읽어오는 애플리케이션

Topic과 Partition

  • Topic: 데이터를 분류하는 카테고리. 메시지가 저장되는 논리적 단위
  • Partition: Topic을 물리적으로 분할한 단위. 병렬 처리와 확장성을 위해 사용

Broker와 Cluster

  • Broker: Kafka 서버 인스턴스. 메시지를 저장하고 처리
  • Cluster: 여러 Broker로 구성된 Kafka 시스템

Consumer Group

  • 같은 Topic을 읽는 Consumer들의 집합
  • 각 Partition은 Consumer Group 내에서 하나의 Consumer만 읽을 수 있음

Kafka의 장점

높은 처리량

  • 초당 수백만 건의 메시지 처리 가능
  • 수평 확장을 통한 성능 향상

내결함성

  • 데이터 복제를 통한 안정성 보장
  • Broker 장애 시에도 서비스 지속

확장성

  • Partition 추가를 통한 수평 확장
  • 실시간 확장 가능

실습 예제

환경 설정

먼저 Kafka를 Docker로 실행한다:

# docker-compose.yml
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Python Producer

# producer.py
from kafka import KafkaProducer
import json
import time
from datetime import datetime

# Kafka Producer 설정
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],  # Kafka 서버 주소
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # JSON 직렬화
)

def send_user_activity():
    """사용자 활동 데이터를 Kafka로 전송"""
    
    # 샘플 데이터 생성
    user_activities = [
        {"user_id": 1, "action": "login", "timestamp": datetime.now().isoformat()},
        {"user_id": 2, "action": "purchase", "timestamp": datetime.now().isoformat()},
        {"user_id": 3, "action": "logout", "timestamp": datetime.now().isoformat()}
    ]
    
    for activity in user_activities:
        # 메시지를 'user-activity' 토픽으로 전송
        future = producer.send('user-activity', value=activity)
        
        # 전송 결과 확인
        try:
            record_metadata = future.get(timeout=10)
            print(f"메시지 전송 성공: {activity}")
            print(f"토픽: {record_metadata.topic}, 파티션: {record_metadata.partition}")
        except Exception as e:
            print(f"메시지 전송 실패: {e}")
        
        time.sleep(1)  # 1초 간격으로 전송

if __name__ == "__main__":
    send_user_activity()
    producer.close()  # Producer 종료

Python Consumer

# consumer.py
from kafka import KafkaConsumer
import json

# Kafka Consumer 설정
consumer = KafkaConsumer(
    'user-activity',  # 구독할 토픽
    bootstrap_servers=['localhost:9092'],  # Kafka 서버 주소
    group_id='activity-processor',  # Consumer Group ID
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),  # JSON 역직렬화
    auto_offset_reset='earliest'  # 처음부터 메시지 읽기
)

def process_user_activity():
    """사용자 활동 데이터를 처리"""
    
    print("사용자 활동 모니터링 시작...")
    
    for message in consumer:
        # 메시지 처리
        activity = message.value
        
        print(f"처리 중인 활동: {activity}")
        
        # 활동 유형에 따른 처리 로직
        if activity['action'] == 'login':
            print(f"사용자 {activity['user_id']}가 로그인했습니다.")
        elif activity['action'] == 'purchase':
            print(f"사용자 {activity['user_id']}가 구매했습니다.")
        elif activity['action'] == 'logout':
            print(f"사용자 {activity['user_id']}가 로그아웃했습니다.")
        
        print("---")

if __name__ == "__main__":
    try:
        process_user_activity()
    except KeyboardInterrupt:
        print("Consumer 종료")
    finally:
        consumer.close()

Go Producer

// producer.go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

// UserActivity 사용자 활동 구조체
type UserActivity struct {
    UserID    int    `json:"user_id"`
    Action    string `json:"action"`
    Timestamp string `json:"timestamp"`
}

func main() {
    // Kafka Writer 설정
    writer := &kafka.Writer{
        Addr:     kafka.TCP("localhost:9092"), // Kafka 서버 주소
        Topic:    "user-activity",             // 토픽 이름
        Balancer: &kafka.LeastBytes{},         // 파티션 분배 전략
    }
    defer writer.Close()

    // 샘플 데이터 생성
    activities := []UserActivity{
        {UserID: 1, Action: "login", Timestamp: time.Now().Format(time.RFC3339)},
        {UserID: 2, Action: "purchase", Timestamp: time.Now().Format(time.RFC3339)},
        {UserID: 3, Action: "logout", Timestamp: time.Now().Format(time.RFC3339)},
    }

    // 메시지 전송
    for _, activity := range activities {
        // JSON 직렬화
        data, err := json.Marshal(activity)
        if err != nil {
            log.Printf("JSON 직렬화 실패: %v", err)
            continue
        }

        // 메시지 전송
        err = writer.WriteMessages(context.Background(),
            kafka.Message{
                Key:   []byte(fmt.Sprintf("user-%d", activity.UserID)), // 파티션 키
                Value: data,                                             // 메시지 내용
            },
        )

        if err != nil {
            log.Printf("메시지 전송 실패: %v", err)
        } else {
            fmt.Printf("메시지 전송 성공: %+v\n", activity)
        }

        time.Sleep(1 * time.Second) // 1초 간격
    }
}

Go Consumer

// consumer.go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/segmentio/kafka-go"
)

// UserActivity 사용자 활동 구조체
type UserActivity struct {
    UserID    int    `json:"user_id"`
    Action    string `json:"action"`
    Timestamp string `json:"timestamp"`
}

func main() {
    // Kafka Reader 설정
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:  []string{"localhost:9092"}, // Kafka 서버 주소
        Topic:    "user-activity",            // 토픽 이름
        GroupID:  "activity-processor-go",    // Consumer Group ID
        MinBytes: 10e3,                       // 최소 배치 크기 (10KB)
        MaxBytes: 10e6,                       // 최대 배치 크기 (10MB)
    })
    defer reader.Close()

    fmt.Println("사용자 활동 모니터링 시작...")

    // 메시지 읽기
    for {
        message, err := reader.ReadMessage(context.Background())
        if err != nil {
            log.Printf("메시지 읽기 실패: %v", err)
            continue
        }

        // JSON 역직렬화
        var activity UserActivity
        if err := json.Unmarshal(message.Value, &activity); err != nil {
            log.Printf("JSON 역직렬화 실패: %v", err)
            continue
        }

        // 활동 처리
        fmt.Printf("처리 중인 활동: %+v\n", activity)

        // 활동 유형에 따른 처리 로직
        switch activity.Action {
        case "login":
            fmt.Printf("사용자 %d가 로그인했습니다.\n", activity.UserID)
        case "purchase":
            fmt.Printf("사용자 %d가 구매했습니다.\n", activity.UserID)
        case "logout":
            fmt.Printf("사용자 %d가 로그아웃했습니다.\n", activity.UserID)
        }

        fmt.Println("---")
    }
}

실행 방법

1. Kafka 실행

# Docker Compose로 Kafka 실행
docker-compose up -d

# 토픽 생성
docker exec -it kafka_kafka_1 kafka-topics --create --topic user-activity --bootstrap-server localhost:9092

2. Python 실행

# 의존성 설치
pip install kafka-python

# Producer 실행
python producer.py

# Consumer 실행 (다른 터미널에서)
python consumer.py

3. Go 실행

# 의존성 설치
go mod init kafka-example
go get github.com/segmentio/kafka-go

# Producer 실행
go run producer.go

# Consumer 실행 (다른 터미널에서)
go run consumer.go

실무 활용 사례

로그 수집 시스템

  • 마이크로서비스에서 생성되는 로그를 중앙 집중식으로 수집
  • ELK Stack과 연동하여 로그 분석

실시간 데이터 파이프라인

  • 사용자 행동 데이터를 실시간으로 처리
  • 추천 시스템이나 개인화 서비스에 활용

이벤트 소싱

  • 시스템의 모든 변경사항을 이벤트로 저장
  • 데이터 복구와 감사(Audit) 용도로 활용

Kafka는 현대적인 분산 시스템에서 필수적인 컴포넌트이다. 높은 처리량과 안정성을 바탕으로 다양한 실시간 데이터 처리 시나리오에 활용할 수 있다.