1.시스템&인프라/Apache Kafka

19.Kafka Stream 핵심 개념 이해하기

쿼드큐브 2025. 4. 7. 12:54
728x90
반응형

Kafka Streams는 Kafka에 저장된 데이터를 실시간으로 처리하고 분석할 수 있게 도와주는 클라이언트 라이브러리입니다.
Java 애플리케이션에 쉽게 통합할 수 있고, 외부 시스템 없이 Kafka만으로 고성능 스트림 처리가 가능합니다.

 

Kafka Stream 핵심 개념 이해하기

 

목차

1. Kafka Stream란?

2. 스트림 처리의 기본 구조

3. 시간(Time)의 개념

4. 스트림과 테이블의 이중성(Duality): 상호 변환

5. 집계(Aggregation) 이해하기

6. 윈도우(Windowing) 개념

7. 상태(State)와 상태 저장소

8. 처리 보장 (Exactly-Once)

9. 순서가 뒤바뀐 데이터 처리(Out-of-Order Handling)

10. 예제 코드

관련 글 링크

 

 

1. Kafka Stream란?

Kafka Streams는 Kafka에 저장된 데이터를 실시간으로 처리하고 분석하기 위한 Java 기반 라이브러리입니다.
따로 클러스터나 서버 없이 단순히 Java 애플리케이션에 포함해서 실행하면 됩니다.

  • Kafka 외에 별도 의존 시스템 없음
  • Java 애플리케이션에 내장 가능
  • 정확히 한 번만 처리(Exactly-once) 보장
  • 이벤트 시간 기반 처리 지원

 

2. 스트림 처리의 기본 구조

Kafka Streams는 데이터를 스트림(stream) 단위로 처리합니다.

  • Stream(스트림): Kafka의 토픽에서 읽는 연속된 데이터 흐름이며, 모든 데이터는 Key-Value 쌍으로 구성됩니다
  • Topology(토폴로지):  데이터가 어떤 순서로 어떤 처리를 거칠지를 정의합니다.

처리 구성 요소

  • Source Processor: Kafka에서 데이터를 읽어오는 역할 (user-events 토픽 구독)
  • Processor(Node): 데이터를 변환(가공/처리)하는 노드(필터, 변환, 집계 등)
  • Sink Processor: 처리된 데이터를 Kafka로 다시 쓰거나 외부로 전달(output-토픽으로 결과 전달)

Kafka Streams 애플리케이션은 내부적으로 Topology를 구성한 후 이를 실행합니다.

Kafka Streams의 핵심은 데이터 흐름을 그래프로 설계하는 Topology입니다.
각 노드는 Source → Processor → Sink 형태로 구성되어 있으며,이를 통해 복잡한 비즈니스 로직을 단계별로 처리할 수 있습니다.

 

Kafka Streams Topology 구성도 예

Kafka Streams Topology 구성도

 

Kafka Streams DSL(Domain-Specific Language)을 사용하면 Topology를 간단하게 정의할 수 있습니다.

Kafka Streams DSL은
 - Kafka Streams에서 자주 사용되는 스트림 처리 연산(map, filter, join, groupBy 등)을
   함수형 스타일로 쉽게 작성할 수 있게 도와주는 고수준 API 입니다. 
- 주요 클래스(KStream, KTable, GlobalKTable)

 

3. 시간(Time)의 개념

스트림 처리에서 시간은 매우 중요합니다. Kafka Streams에서는 다음 세 가지 시간 개념을 사용합니다.

  • 이벤트 시간(Event time): 실제 데이터가 발생한 시점 (예: GPS 센서가 좌표를 기록한 시간)
  • 처리 시간(Processing time): 애플리케이션이 해당 데이터를 처리한 시간
  • 수집 시간(Ingestion time): Kafka 브로커가 데이터를 수신한 시간

Kafka Streams는 이벤트 시간 기반으로 동작하도록 설정할 수 있으며, 이를 통해 정확한 시계열 처리와 윈도우 연산이 가능합니다.

 

4. 스트림과 테이블의 이중성(Duality) : 상호 변환

Kafka Streams는 스트림뿐만 아니라 테이블(Table) 개념도 함께 지원합니다.

  • Stream : 이벤트의 연속된 흐름
  • Table : 특정 시점의 상태를 나타내는 키-값 구조

◆ 스트림 → 테이블(Stream as Table):

  • 변경 기록의 연속(스트림)을 재생하면 테이블을 만들 수 있음 (예: 페이지뷰 수를 집계)
  • Kafka의 log-compacted 토픽이나 KTable은 스트림을 기반으로 최신 상태를 계속 반영합니다

스트림 -> 테이블

 

테이블 → 스트림(Table as Stream):

  • 테이블은 특정 시점의 상태이며, 이를 다시 스트림으로 변환 가능
  • 반대로 테이블도 내부적으로는 변경 로그(changelog)를 기반으로 만들어졌기 때문에
    변경된 내용을 다시 스트림으로 복원할 수 있습니다.

 

  • 즉, 테이블은 특정 시점에서의 상태이고, 그 상태의 변화 이력은 스트림으로 표현 가능합니다.

Kafka Streams는 이 이중성을 다음과 같이 구현합니다:

인터페이스 설명
KStream<K, V> 일반적인 스트림, 이벤트의 연속
KTable<K, V> 변경 사항을 반영한 테이블
GlobalKTable<K, V> 모든 인스턴스에 공유되는 전역 테이블

groupByKey().aggregate() 같은 연산은 KStream → KTable 변환입니다.

 

 

5. 집계(Aggregation) 이해하기

Kafka Streams에서 집계는 여러 개의 입력 데이터를 하나로 묶는 작업입니다.
예를 들어 사용자별 총 페이지뷰 수를 계산할 수 있습니다.

◆ 집계 대상

  • Kafka Streams DSL에서는 KStreamKTable 모두 집계할 수 있습니다.
구분 설명 집계 후 결과
KStream 연속된 이벤트 스트림 항상 KTable
KTable 상태 변경 스트림 변경된 KTable
  • 모든 집계 결과는 KTable입니다. 왜냐하면 집계 결과는 상태(state)를 의미하기 때문입니다.
  • 값은 키별로 합산되며, 나중에 도착한 데이터가 이전 값을 덮어씀

 집계 처리 흐름

KStream
  ▼
groupByKey() 또는 groupBy()
  ▼
count(), reduce(), aggregate()
  ▼
KTable (집계 결과)
  • groupByKey()는 기존 키를 기준으로 그룹화하고, groupBy()는 새 키로 재정의할 수 있게 해줍니다.
  • 예제: 사용자별 클릭 수 계산
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> clickStream = builder.stream("click-events");

//사용자 ID를 키로 설정
//같은 사용자(키)끼리 그룹화
//개수를세어 집계 결과 생성
//결과를 Kafka 토픽에 출력
KTable<String, Long> userClickCounts = clickStream
    .map((key, value) -> new KeyValue<>(value, "1"))  // value = userId
    .groupByKey()
    .count();

userClickCounts.toStream().to("user-click-counts", Produced.with(Serdes.String(), Serdes.Long()));

 

 

6. 윈도우(Windowing) 개념 

윈도우는 시간을 기준으로 데이터를 잘라서 묶는 단위입니다.
Kafka Streams에서는 같은 키에 대해 특정 시간 범위 내에 들어오는 데이터들을 하나의 윈도우로 간주하고,
이 범위 안에서만 집계/조인/분석이 이루어집니다.

 

윈도우 종류

종류 설명 사용예시
Tumbling Window 고정된 크기, 겹치지 않음 매 5분마다 클릭 수
Hopping Window 고정된 크기, 슬라이딩 가능 (겹침) 매 1분 간격의 5분 통계
Sliding Window 두 이벤트 간 시간차를 기준으로 처리 A가 B 이후에 발생했을 때만
Session Window 사용자 활동 간 간격이 일정 시간 이상 비면 새 윈도우 시작 사용자별 방문 세션 분석

 

 윈도우 구성요소

구성요소 설명 예시
윈도우 크기(Window size) 집계를 수행할 시간 간격 5분, 1시간 등
슬라이드 간격(Advance interval) 다음 윈도우를 언제 시작할지 1분 단위 슬라이딩
유예 시간(Grace period) 지연 도착 데이터를 언제까지 수용할지 1분
  • 슬라이드 간격 < 윈도우 크기이면 겹치는 윈도우가 생성됩니다. (Hopping Window)

예제 코드

1. Tumbling Window
 - 고정된 크기, 겹치지 않음
 - 한 번에 한 윈도우만 유효
 - 매 5분 단위 집계 같은 상황에 적합

KTable<Windowed<String>, Long> result = stream
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .count();

[0~5분]   [5~10분]   [10~15분] ...
   ▲          ▲          ▲
 집계1      집계2      집계3
2. Hopping Window
 - 고정된 크기, 겹치는 윈도우 가능
 - 윈도우가 일정 간격으로 슬라이딩
 - 하나의 이벤트가 여러 윈도우에 포함될 수 있음

KTable<Windowed<String>, Long> result = stream
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1))
        .advanceBy(Duration.ofMinutes(1)))
    .count();

윈도우 크기: 5분, 슬라이드: 1분
[0~5분]
   [1~6분]
      [2~7분]
         ...
3. Sliding Window 
 - A 이벤트와 B 이벤트가 일정 시간 이내에 발생하면 같은 윈도우로 묶임
 - 두 이벤트 간의 시간차를 기준으로 윈도우 구성
 
SlidingWindows windows = SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10));

KTable<Windowed<String>, Long> result = stream
    .groupByKey()
    .windowedBy(windows)
    .count();

A: 10:00 발생
B: 10:07 발생 → A와 B는 같은 슬라이딩 윈도우

윈도우 범위: [10:00 ~ 10:10]
4. Session Window
 - 사용자 활동 단위로 윈도우 구성
 - 활동 간 비활동 기간(gap)이 기준보다 길면 새로운 윈도우 시작
 - 비정기적/불규칙한 이벤트에 매우 유용
 - 5분 이상 아무 이벤트가 없으면 세션이 종료되고, 새로운 세션을 시작한다.

SessionWindows sessionWindows = SessionWindows.with(Duration.ofMinutes(5));

KTable<Windowed<String>, Long> result = stream
    .groupByKey()
    .windowedBy(sessionWindows)
    .count();

사용자 이벤트 시점: [10:00], [10:02], [10:10]

윈도우 1: [10:00 ~ 10:02]  
윈도우 2: [10:10 ~ 10:10] (새로운 세션 시작)

 

 

7. 상태(State)와 상태 저장소

Kafka Streams는 상태 기반 처리도 지원합니다.
예를 들어, 특정 키의 총합을 계산할 때 상태 저장소(State Store)가 필요합니다.

상태:

  • 상태란, 스트림 처리 중에 과거 데이터를 기억하고 있는 값을 말합니다.
  • "사용자별 클릭 수"를 계산할 때, 이전까지의 클릭 수를 기억해야 함 → 이게 상태입니다.

 상태 저장소

  • Kafka Streams에서 상태를 저장하고 관리하기 위한 내부 저장 공간입니다.
  • 기본적으로는 애플리케이션 로컬 디스크에 저장되며, Kafka 토픽을 통해 백업(changelog)도 됩니다.
  • 모든 상태 기반 연산(집계, 윈도우, 조인)은 상태 저장소를 내부적으로 사용합니다.

 상태 저장소 종류

유형 설명 용도
KeyValueStore 키-값 구조의 저장소 가장 일반적, 집계/상태 저장
WindowStore 시간 기반의 키-값 저장소 윈도우 집계, 시간 조건 조회
SessionStore 세션 기반 저장소 세션 윈도우 처리
  •  Kafka Streams DSL을 사용할 때는 내부적으로 이 저장소들이 자동으로 생성됩니다.

KeyValueStore 예제 : 사용자별 누적 카운

  • DSL 사용 시 자동 생성되지만, Processor API에서는 직접 사용 가능합니다.
  • 키: userId
  • 값: 누적 클릭 수
public class ClickCountProcessor implements Processor<String, String, String, Long> {
    private KeyValueStore<String, Long> store;

    @Override
    public void init(ProcessorContext<String, Long> context) {
        store = context.getStateStore("click-store");
    }

    @Override
    public void process(Record<String, String> record) {
        String user = record.key();
        Long count = store.get(user);
        count = (count == null) ? 1L : count + 1;
        store.put(user, count);

        // 누적 결과 전달
        context().forward(new Record<>(user, count, record.timestamp()));
    }
}
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("click-store"),
        Serdes.String(),
        Serdes.Long()
    );
builder.addStateStore(storeBuilder);

 

 KeyValueStore 예제 : 시간별 클릭 수 집계(5분 단위)

  • 키: userId
  • 값: 시간별 클릭 수
  • 윈도우: 5분 고정 창
public class WindowClickCountProcessor implements Processor<String, String, String, Long> {
    private WindowStore<String, Long> windowStore;

    @Override
    public void init(ProcessorContext<String, Long> context) {
        windowStore = context.getStateStore("click-window-store");
    }

    @Override
    public void process(Record<String, String> record) {
        long now = record.timestamp();
        String user = record.key();
        Long count = windowStore.fetch(user, now);
        count = (count == null) ? 1L : count + 1;
        windowStore.put(user, count, now);

        context().forward(new Record<>(user, count, now));
    }
}
StoreBuilder<WindowStore<String, Long>> windowStoreBuilder =
    Stores.windowStoreBuilder(
        Stores.persistentWindowStore("click-window-store",
            Duration.ofMinutes(10),  // retention 기간
            Duration.ofMinutes(5),   // 윈도우 크기
            false
        ),
        Serdes.String(),
        Serdes.Long()
    );
builder.addStateStore(windowStoreBuilder);

 

SessionStore 예제 : 사용자 세션 클릭 수 집계

  • 키: userId
  • 값: 세션 단위 클릭 수
  • 세션 간 간격이 3분 이상이면 새 세션 시작
public class SessionClickCountProcessor implements Processor<String, String, String, Long> {
    private SessionStore<String, Long> sessionStore;

    @Override
    public void init(ProcessorContext<String, Long> context) {
        sessionStore = context.getStateStore("click-session-store");
    }

    @Override
    public void process(Record<String, String> record) {
        long timestamp = record.timestamp();
        String user = record.key();

        // 세션 병합 정책: 가장 최근 값 기준
        sessionStore.put(user, 1L, timestamp);  // 기본값 1 클릭
    }
}
StoreBuilder<SessionStore<String, Long>> sessionStoreBuilder =
    Stores.sessionStoreBuilder(
        Stores.persistentSessionStore("click-session-store",
            Duration.ofMinutes(10) // retention
        ),
        Serdes.String(),
        Serdes.Long()
    );
builder.addStateStore(sessionStoreBuilder);

 

 

8. 처리 보장 (Exactly-Once)

처리 보장(Processing Guarantees)이란, Kafka Streams가 스트림 데이터를 처리하는 과정에서 중복 없이 정확히 한 번만 처리하거나, 최소한 한 번은 처리되도록 보장하는 능력을 의미합니다.

실시간 데이터 분석, 결제 처리, 이벤트 감지 등에서는 중복 처리나 데이터 손실이 큰 문제가 될 수 있습니다.

 

구현 방식

  • Kafka 트랜잭션 기능 사용
  • 입력 오프셋 커밋, 상태 저장, 출력 메시지를 하나의 트랜잭션으로 처리
  • processing.guarantee = exactly_once_v2 설정 필요 (Kafka 2.5 이상)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
처리 보장 수준 특징 설명
at_least_once 빠르지만 중복 가능성 있음 로그 수집, 중복 허용
exactly_once_v2 느리지만 정확한 처리 보장 집계, 결제, 비즈니스 핵심 처리

 

 

9. 순서가 뒤바뀐 데이터 처리(Out-of-Order Handling)

Kafka Streams에서는 실시간 데이터 처리 환경에서 자주 발생하는 문제 중 하나인 순서가 뒤바뀐 데이터 처리(Out-of-Order Handling)를 효과적으로 다룰 수 있는 기능들을 제공합니다.

이 개념을 잘 이해하면, 정확한 시계열 분석, 집계, 윈도우 처리 등을 할 수 있게 됩니다.

 

순서 뒤바뀜 발생 예

   1) 단일 파티션에서

       - 이전 offset이 나중 타임스탬프일 수 있음

   2) 여러 파티션 간 병렬 처리 시

       - Streams는 파티션별로 독립적으로 처리하기 때문에 타임스탬프가 과거인 데이터가 뒤늦게 도착할 수 있음

 

Kafka Streams는 윈도우 기반 연산을 할 때 Grace Period를 통해 뒤늦게 도착한 데이터를 처리할 수 있습니다.

TimeWindows.of(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1))

 

Kafka Streams는 실시간 스트림 환경에서 발생하는 Out-of-Order 이벤트를 유연하게 처리할 수 있는 구조를 제공하며,
타임스탬프 기반 처리, 유예 시간(grace), 세션 윈도우, 상태 저장소 등과 함께 사용할 때 더욱 강력해집니다.

 

10. 예제 코드

 

5분 단위로 데이터 갯수 집계

  • 입력 토픽: topic-basic
  • 출력 토픽: event-counts-per-5min
package kafka.exam.stream;

import java.time.Duration;
import java.util.Properties;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;

public class CountStreamExample {
	String bootstrapServers = "192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092";

	public void testStream() {
		Properties props = new Properties();
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, "value-count-by-window-app");
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
		props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

		StreamsBuilder builder = new StreamsBuilder();

		// 입력 스트림 (key 없음, value만 있음)
		KStream<String, String> inputStream  = builder.stream("topic-basic");

        // step 1: 모든 메시지를 동일한 key "fixed"로 설정 → 하나의 그룹으로 묶음
        KGroupedStream<String, String> groupedStream = inputStream
                .map((key, value) -> new KeyValue<>("fixed", value)) // key를 고정하여 그룹핑
                .groupByKey(); // key("fixed") 기준으로 그룹화'
        
     // step 2: 5분 단위 윈도우로 개수 세기
        TimeWindows windowSize = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)); // 5분 고정 윈도우
        KTable<Windowed<String>, Long> countedStream = groupedStream
                .windowedBy(windowSize)
                .count(); // 메시지 개수 집계

        // step 3: 집계 결과를 문자열로 변환하여 출력 토픽에 전송
        countedStream
                .toStream()
                .map((windowedKey, count) -> {
                    // 윈도우 시간 범위 표시
                    String timeWindow = windowedKey.window().startTime() + " ~ " + windowedKey.window().endTime();
                    String result = "Window: " + timeWindow + ", Count: " + count;

                    return new KeyValue<String, String>(null, result); // key 없이 전송
                }).to("event-counts-per-5min", Produced.with(Serdes.String(), Serdes.String())); // 출력 토픽

        // Kafka Streams 애플리케이션 시작
        KafkaStreams streams = new KafkaStreams(builder.build(), props);

        // 종료 시 정상적으로 정리되도록 종료 훅 등록
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

        // 스트림 실행
        streams.start();
        System.out.println("Kafka Streams 실행됨: 5분 단위 메시지 수를 집계 중...");
	}

	public static void main(String args[]) {

		 CountStreamExample streamExam = new CountStreamExample();
		 streamExam.testStream();
		
	}
}

 

사용자 ID 기준 클릭수 누적 집계

  • 입력 토픽: click-events
  • 출력 토픽: user-click-counts
  • 상태 저장소: click-store (KeyValueStore 사용)
  • 처리 내용: 사용자 ID 기준 클릭 수 누적 집계
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.processor.*;
import org.apache.kafka.streams.state.*;
import org.apache.kafka.streams.state.Stores;

import java.time.Duration;
import java.util.Properties;

public class ClickCountStreamApp {

    public static void main(String[] args) {
        // 1. Kafka Streams 설정
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "click-count-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // 2. Topology 정의
        StreamsBuilder builder = new StreamsBuilder();

        // 3. 상태 저장소 정의 및 등록
        StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
            Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("click-store"),
                Serdes.String(),
                Serdes.Long()
            );
        builder.addStateStore(storeBuilder);

        // 4. 토픽에서 데이터 수신 후 Processor 등록
        builder.stream("click-events")
            .process(() -> new ClickCountProcessor(), "click-store");

        // 5. Kafka Streams 시작
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

        // 6. 종료 처리
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }

    // 7. Processor 정의
    public static class ClickCountProcessor implements Processor<String, String, Void, Void> {
        private ProcessorContext<Void, Void> context;
        private KeyValueStore<String, Long> stateStore;

        @Override
        public void init(ProcessorContext<Void, Void> context) {
            this.context = context;
            this.stateStore = context.getStateStore("click-store");
        }

        @Override
        public void process(Record<String, String> record) {
            String userId = record.key(); // user ID 기준
            Long count = stateStore.get(userId);
            count = (count == null) ? 1L : count + 1;
            stateStore.put(userId, count);

            // 결과 출력용 로그
            System.out.printf("User [%s] click count = %d\n", userId, count);

            // 출력 토픽으로 전송
            context.forward(new Record<>(userId, count.toString(), record.timestamp()));
        }

        @Override
        public void close() {
            // 리소스 정리 필요 시 구현
        }
    }
}

 


Kafka Streams는 Kafka에 저장된 데이터를 실시간으로 처리하고 분석할 수 있게 해주는 경량 스트림 처리 라이브러리입니다.
스트림과 테이블의 개념, 윈도우 및 집계, 상태 관리, 정확한 처리 보장 등 강력한 기능을 Java 애플리케이션 내에서 간단히 활용할 수 있다는 점이 큰 장점입니다.

 

 

관련 글 링크

13.Kafka 명령어 예제 정리: Topic, Producer, Consumer

2. Kafka 단일 노드 동작 원리: 파티션 분배부터 Consumer 전략까지

16. Kafka AdminClient API 예제: Topic,Cluster,ConsumerGroups

17.Kafka Producer API 사용법(비동기 전송, Transaction, 예제)

18.Kafka Consumer API 사용법(subscribe vs assign, 수동 커밋)

 

728x90
반응형