Kafka Streams는 Kafka에 저장된 데이터를 실시간으로 처리하고 분석할 수 있게 도와주는 클라이언트 라이브러리입니다.
Java 애플리케이션에 쉽게 통합할 수 있고, 외부 시스템 없이 Kafka만으로 고성능 스트림 처리가 가능합니다.
Kafka Stream 핵심 개념 이해하기
목차4. 스트림과 테이블의 이중성(Duality): 상호 변환
1. Kafka Stream란?
Kafka Streams는 Kafka에 저장된 데이터를 실시간으로 처리하고 분석하기 위한 Java 기반 라이브러리입니다.
따로 클러스터나 서버 없이 단순히 Java 애플리케이션에 포함해서 실행하면 됩니다.
- Kafka 외에 별도 의존 시스템 없음
- Java 애플리케이션에 내장 가능
- 정확히 한 번만 처리(Exactly-once) 보장
- 이벤트 시간 기반 처리 지원
2. 스트림 처리의 기본 구조
Kafka Streams는 데이터를 스트림(stream) 단위로 처리합니다.
- Stream(스트림): Kafka의 토픽에서 읽는 연속된 데이터 흐름이며, 모든 데이터는 Key-Value 쌍으로 구성됩니다
- Topology(토폴로지): 데이터가 어떤 순서로 어떤 처리를 거칠지를 정의합니다.
처리 구성 요소
- Source Processor: Kafka에서 데이터를 읽어오는 역할 (user-events 토픽 구독)
- Processor(Node): 데이터를 변환(가공/처리)하는 노드(필터, 변환, 집계 등)
- Sink Processor: 처리된 데이터를 Kafka로 다시 쓰거나 외부로 전달(output-토픽으로 결과 전달)
Kafka Streams 애플리케이션은 내부적으로 Topology를 구성한 후 이를 실행합니다.
Kafka Streams의 핵심은 데이터 흐름을 그래프로 설계하는 Topology입니다.
각 노드는 Source → Processor → Sink 형태로 구성되어 있으며,이를 통해 복잡한 비즈니스 로직을 단계별로 처리할 수 있습니다.
Kafka Streams Topology 구성도 예
Kafka Streams DSL(Domain-Specific Language)을 사용하면 Topology를 간단하게 정의할 수 있습니다.
Kafka Streams DSL은
- Kafka Streams에서 자주 사용되는 스트림 처리 연산(map, filter, join, groupBy 등)을
함수형 스타일로 쉽게 작성할 수 있게 도와주는 고수준 API 입니다.
- 주요 클래스(KStream, KTable, GlobalKTable)
3. 시간(Time)의 개념
스트림 처리에서 시간은 매우 중요합니다. Kafka Streams에서는 다음 세 가지 시간 개념을 사용합니다.
- 이벤트 시간(Event time): 실제 데이터가 발생한 시점 (예: GPS 센서가 좌표를 기록한 시간)
- 처리 시간(Processing time): 애플리케이션이 해당 데이터를 처리한 시간
- 수집 시간(Ingestion time): Kafka 브로커가 데이터를 수신한 시간
Kafka Streams는 이벤트 시간 기반으로 동작하도록 설정할 수 있으며, 이를 통해 정확한 시계열 처리와 윈도우 연산이 가능합니다.
4. 스트림과 테이블의 이중성(Duality) : 상호 변환
Kafka Streams는 스트림뿐만 아니라 테이블(Table) 개념도 함께 지원합니다.
- Stream : 이벤트의 연속된 흐름
- Table : 특정 시점의 상태를 나타내는 키-값 구조
◆ 스트림 → 테이블(Stream as Table):
- 변경 기록의 연속(스트림)을 재생하면 테이블을 만들 수 있음 (예: 페이지뷰 수를 집계)
- Kafka의 log-compacted 토픽이나 KTable은 스트림을 기반으로 최신 상태를 계속 반영합니다
◆ 테이블 → 스트림(Table as Stream):
- 테이블은 특정 시점의 상태이며, 이를 다시 스트림으로 변환 가능
- 반대로 테이블도 내부적으로는 변경 로그(changelog)를 기반으로 만들어졌기 때문에
변경된 내용을 다시 스트림으로 복원할 수 있습니다.
- 즉, 테이블은 특정 시점에서의 상태이고, 그 상태의 변화 이력은 스트림으로 표현 가능합니다.
Kafka Streams는 이 이중성을 다음과 같이 구현합니다:
인터페이스 | 설명 |
KStream<K, V> | 일반적인 스트림, 이벤트의 연속 |
KTable<K, V> | 변경 사항을 반영한 테이블 |
GlobalKTable<K, V> | 모든 인스턴스에 공유되는 전역 테이블 |
groupByKey().aggregate() 같은 연산은 KStream → KTable 변환입니다.
5. 집계(Aggregation) 이해하기
Kafka Streams에서 집계는 여러 개의 입력 데이터를 하나로 묶는 작업입니다.
예를 들어 사용자별 총 페이지뷰 수를 계산할 수 있습니다.
◆ 집계 대상
- Kafka Streams DSL에서는 KStream과 KTable 모두 집계할 수 있습니다.
구분 | 설명 | 집계 후 결과 |
KStream | 연속된 이벤트 스트림 | 항상 KTable |
KTable | 상태 변경 스트림 | 변경된 KTable |
- 모든 집계 결과는 KTable입니다. 왜냐하면 집계 결과는 상태(state)를 의미하기 때문입니다.
- 값은 키별로 합산되며, 나중에 도착한 데이터가 이전 값을 덮어씀
◆ 집계 처리 흐름
KStream
▼
groupByKey() 또는 groupBy()
▼
count(), reduce(), aggregate()
▼
KTable (집계 결과)
- groupByKey()는 기존 키를 기준으로 그룹화하고, groupBy()는 새 키로 재정의할 수 있게 해줍니다.
- 예제: 사용자별 클릭 수 계산
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> clickStream = builder.stream("click-events");
//사용자 ID를 키로 설정
//같은 사용자(키)끼리 그룹화
//개수를세어 집계 결과 생성
//결과를 Kafka 토픽에 출력
KTable<String, Long> userClickCounts = clickStream
.map((key, value) -> new KeyValue<>(value, "1")) // value = userId
.groupByKey()
.count();
userClickCounts.toStream().to("user-click-counts", Produced.with(Serdes.String(), Serdes.Long()));
6. 윈도우(Windowing) 개념
윈도우는 시간을 기준으로 데이터를 잘라서 묶는 단위입니다.
Kafka Streams에서는 같은 키에 대해 특정 시간 범위 내에 들어오는 데이터들을 하나의 윈도우로 간주하고,
이 범위 안에서만 집계/조인/분석이 이루어집니다.
◆ 윈도우 종류
종류 | 설명 | 사용예시 |
Tumbling Window | 고정된 크기, 겹치지 않음 | 매 5분마다 클릭 수 |
Hopping Window | 고정된 크기, 슬라이딩 가능 (겹침) | 매 1분 간격의 5분 통계 |
Sliding Window | 두 이벤트 간 시간차를 기준으로 처리 | A가 B 이후에 발생했을 때만 |
Session Window | 사용자 활동 간 간격이 일정 시간 이상 비면 새 윈도우 시작 | 사용자별 방문 세션 분석 |
◆ 윈도우 구성요소
구성요소 | 설명 | 예시 |
윈도우 크기(Window size) | 집계를 수행할 시간 간격 | 5분, 1시간 등 |
슬라이드 간격(Advance interval) | 다음 윈도우를 언제 시작할지 | 1분 단위 슬라이딩 |
유예 시간(Grace period) | 지연 도착 데이터를 언제까지 수용할지 | 1분 |
- 슬라이드 간격 < 윈도우 크기이면 겹치는 윈도우가 생성됩니다. (Hopping Window)
◆ 예제 코드
1. Tumbling Window
- 고정된 크기, 겹치지 않음
- 한 번에 한 윈도우만 유효
- 매 5분 단위 집계 같은 상황에 적합
KTable<Windowed<String>, Long> result = stream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count();
[0~5분] [5~10분] [10~15분] ...
▲ ▲ ▲
집계1 집계2 집계3
2. Hopping Window
- 고정된 크기, 겹치는 윈도우 가능
- 윈도우가 일정 간격으로 슬라이딩
- 하나의 이벤트가 여러 윈도우에 포함될 수 있음
KTable<Windowed<String>, Long> result = stream
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1))
.advanceBy(Duration.ofMinutes(1)))
.count();
윈도우 크기: 5분, 슬라이드: 1분
[0~5분]
[1~6분]
[2~7분]
...
3. Sliding Window
- A 이벤트와 B 이벤트가 일정 시간 이내에 발생하면 같은 윈도우로 묶임
- 두 이벤트 간의 시간차를 기준으로 윈도우 구성
SlidingWindows windows = SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(10));
KTable<Windowed<String>, Long> result = stream
.groupByKey()
.windowedBy(windows)
.count();
A: 10:00 발생
B: 10:07 발생 → A와 B는 같은 슬라이딩 윈도우
윈도우 범위: [10:00 ~ 10:10]
4. Session Window
- 사용자 활동 단위로 윈도우 구성
- 활동 간 비활동 기간(gap)이 기준보다 길면 새로운 윈도우 시작
- 비정기적/불규칙한 이벤트에 매우 유용
- 5분 이상 아무 이벤트가 없으면 세션이 종료되고, 새로운 세션을 시작한다.
SessionWindows sessionWindows = SessionWindows.with(Duration.ofMinutes(5));
KTable<Windowed<String>, Long> result = stream
.groupByKey()
.windowedBy(sessionWindows)
.count();
사용자 이벤트 시점: [10:00], [10:02], [10:10]
윈도우 1: [10:00 ~ 10:02]
윈도우 2: [10:10 ~ 10:10] (새로운 세션 시작)
7. 상태(State)와 상태 저장소
Kafka Streams는 상태 기반 처리도 지원합니다.
예를 들어, 특정 키의 총합을 계산할 때 상태 저장소(State Store)가 필요합니다.
◆ 상태:
- 상태란, 스트림 처리 중에 과거 데이터를 기억하고 있는 값을 말합니다.
- "사용자별 클릭 수"를 계산할 때, 이전까지의 클릭 수를 기억해야 함 → 이게 상태입니다.
◆ 상태 저장소
- Kafka Streams에서 상태를 저장하고 관리하기 위한 내부 저장 공간입니다.
- 기본적으로는 애플리케이션 로컬 디스크에 저장되며, Kafka 토픽을 통해 백업(changelog)도 됩니다.
- 모든 상태 기반 연산(집계, 윈도우, 조인)은 상태 저장소를 내부적으로 사용합니다.
◆ 상태 저장소 종류
유형 | 설명 | 용도 |
KeyValueStore | 키-값 구조의 저장소 | 가장 일반적, 집계/상태 저장 |
WindowStore | 시간 기반의 키-값 저장소 | 윈도우 집계, 시간 조건 조회 |
SessionStore | 세션 기반 저장소 | 세션 윈도우 처리 |
- Kafka Streams DSL을 사용할 때는 내부적으로 이 저장소들이 자동으로 생성됩니다.
◆ KeyValueStore 예제 : 사용자별 누적 카운
- DSL 사용 시 자동 생성되지만, Processor API에서는 직접 사용 가능합니다.
- 키: userId
- 값: 누적 클릭 수
public class ClickCountProcessor implements Processor<String, String, String, Long> {
private KeyValueStore<String, Long> store;
@Override
public void init(ProcessorContext<String, Long> context) {
store = context.getStateStore("click-store");
}
@Override
public void process(Record<String, String> record) {
String user = record.key();
Long count = store.get(user);
count = (count == null) ? 1L : count + 1;
store.put(user, count);
// 누적 결과 전달
context().forward(new Record<>(user, count, record.timestamp()));
}
}
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("click-store"),
Serdes.String(),
Serdes.Long()
);
builder.addStateStore(storeBuilder);
◆ KeyValueStore 예제 : 시간별 클릭 수 집계(5분 단위)
- 키: userId
- 값: 시간별 클릭 수
- 윈도우: 5분 고정 창
public class WindowClickCountProcessor implements Processor<String, String, String, Long> {
private WindowStore<String, Long> windowStore;
@Override
public void init(ProcessorContext<String, Long> context) {
windowStore = context.getStateStore("click-window-store");
}
@Override
public void process(Record<String, String> record) {
long now = record.timestamp();
String user = record.key();
Long count = windowStore.fetch(user, now);
count = (count == null) ? 1L : count + 1;
windowStore.put(user, count, now);
context().forward(new Record<>(user, count, now));
}
}
StoreBuilder<WindowStore<String, Long>> windowStoreBuilder =
Stores.windowStoreBuilder(
Stores.persistentWindowStore("click-window-store",
Duration.ofMinutes(10), // retention 기간
Duration.ofMinutes(5), // 윈도우 크기
false
),
Serdes.String(),
Serdes.Long()
);
builder.addStateStore(windowStoreBuilder);
◆ SessionStore 예제 : 사용자 세션 클릭 수 집계
- 키: userId
- 값: 세션 단위 클릭 수
- 세션 간 간격이 3분 이상이면 새 세션 시작
public class SessionClickCountProcessor implements Processor<String, String, String, Long> {
private SessionStore<String, Long> sessionStore;
@Override
public void init(ProcessorContext<String, Long> context) {
sessionStore = context.getStateStore("click-session-store");
}
@Override
public void process(Record<String, String> record) {
long timestamp = record.timestamp();
String user = record.key();
// 세션 병합 정책: 가장 최근 값 기준
sessionStore.put(user, 1L, timestamp); // 기본값 1 클릭
}
}
StoreBuilder<SessionStore<String, Long>> sessionStoreBuilder =
Stores.sessionStoreBuilder(
Stores.persistentSessionStore("click-session-store",
Duration.ofMinutes(10) // retention
),
Serdes.String(),
Serdes.Long()
);
builder.addStateStore(sessionStoreBuilder);
8. 처리 보장 (Exactly-Once)
처리 보장(Processing Guarantees)이란, Kafka Streams가 스트림 데이터를 처리하는 과정에서 중복 없이 정확히 한 번만 처리하거나, 최소한 한 번은 처리되도록 보장하는 능력을 의미합니다.
실시간 데이터 분석, 결제 처리, 이벤트 감지 등에서는 중복 처리나 데이터 손실이 큰 문제가 될 수 있습니다.
구현 방식
- Kafka 트랜잭션 기능 사용
- 입력 오프셋 커밋, 상태 저장, 출력 메시지를 하나의 트랜잭션으로 처리
- processing.guarantee = exactly_once_v2 설정 필요 (Kafka 2.5 이상)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
처리 보장 수준 | 특징 | 설명 |
at_least_once | 빠르지만 중복 가능성 있음 | 로그 수집, 중복 허용 |
exactly_once_v2 | 느리지만 정확한 처리 보장 | 집계, 결제, 비즈니스 핵심 처리 |
9. 순서가 뒤바뀐 데이터 처리(Out-of-Order Handling)
Kafka Streams에서는 실시간 데이터 처리 환경에서 자주 발생하는 문제 중 하나인 순서가 뒤바뀐 데이터 처리(Out-of-Order Handling)를 효과적으로 다룰 수 있는 기능들을 제공합니다.
이 개념을 잘 이해하면, 정확한 시계열 분석, 집계, 윈도우 처리 등을 할 수 있게 됩니다.
◆ 순서 뒤바뀜 발생 예
1) 단일 파티션에서
- 이전 offset이 나중 타임스탬프일 수 있음
2) 여러 파티션 간 병렬 처리 시
- Streams는 파티션별로 독립적으로 처리하기 때문에 타임스탬프가 과거인 데이터가 뒤늦게 도착할 수 있음
Kafka Streams는 윈도우 기반 연산을 할 때 Grace Period를 통해 뒤늦게 도착한 데이터를 처리할 수 있습니다.
TimeWindows.of(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1))
Kafka Streams는 실시간 스트림 환경에서 발생하는 Out-of-Order 이벤트를 유연하게 처리할 수 있는 구조를 제공하며,
타임스탬프 기반 처리, 유예 시간(grace), 세션 윈도우, 상태 저장소 등과 함께 사용할 때 더욱 강력해집니다.
10. 예제 코드
◆ 5분 단위로 데이터 갯수 집계
- 입력 토픽: topic-basic
- 출력 토픽: event-counts-per-5min
package kafka.exam.stream;
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
public class CountStreamExample {
String bootstrapServers = "192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092";
public void testStream() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "value-count-by-window-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsBuilder builder = new StreamsBuilder();
// 입력 스트림 (key 없음, value만 있음)
KStream<String, String> inputStream = builder.stream("topic-basic");
// step 1: 모든 메시지를 동일한 key "fixed"로 설정 → 하나의 그룹으로 묶음
KGroupedStream<String, String> groupedStream = inputStream
.map((key, value) -> new KeyValue<>("fixed", value)) // key를 고정하여 그룹핑
.groupByKey(); // key("fixed") 기준으로 그룹화'
// step 2: 5분 단위 윈도우로 개수 세기
TimeWindows windowSize = TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)); // 5분 고정 윈도우
KTable<Windowed<String>, Long> countedStream = groupedStream
.windowedBy(windowSize)
.count(); // 메시지 개수 집계
// step 3: 집계 결과를 문자열로 변환하여 출력 토픽에 전송
countedStream
.toStream()
.map((windowedKey, count) -> {
// 윈도우 시간 범위 표시
String timeWindow = windowedKey.window().startTime() + " ~ " + windowedKey.window().endTime();
String result = "Window: " + timeWindow + ", Count: " + count;
return new KeyValue<String, String>(null, result); // key 없이 전송
}).to("event-counts-per-5min", Produced.with(Serdes.String(), Serdes.String())); // 출력 토픽
// Kafka Streams 애플리케이션 시작
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// 종료 시 정상적으로 정리되도록 종료 훅 등록
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
// 스트림 실행
streams.start();
System.out.println("Kafka Streams 실행됨: 5분 단위 메시지 수를 집계 중...");
}
public static void main(String args[]) {
CountStreamExample streamExam = new CountStreamExample();
streamExam.testStream();
}
}
◆ 사용자 ID 기준 클릭수 누적 집계
- 입력 토픽: click-events
- 출력 토픽: user-click-counts
- 상태 저장소: click-store (KeyValueStore 사용)
- 처리 내용: 사용자 ID 기준 클릭 수 누적 집계
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.processor.*;
import org.apache.kafka.streams.state.*;
import org.apache.kafka.streams.state.Stores;
import java.time.Duration;
import java.util.Properties;
public class ClickCountStreamApp {
public static void main(String[] args) {
// 1. Kafka Streams 설정
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "click-count-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 2. Topology 정의
StreamsBuilder builder = new StreamsBuilder();
// 3. 상태 저장소 정의 및 등록
StoreBuilder<KeyValueStore<String, Long>> storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("click-store"),
Serdes.String(),
Serdes.Long()
);
builder.addStateStore(storeBuilder);
// 4. 토픽에서 데이터 수신 후 Processor 등록
builder.stream("click-events")
.process(() -> new ClickCountProcessor(), "click-store");
// 5. Kafka Streams 시작
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// 6. 종료 처리
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
// 7. Processor 정의
public static class ClickCountProcessor implements Processor<String, String, Void, Void> {
private ProcessorContext<Void, Void> context;
private KeyValueStore<String, Long> stateStore;
@Override
public void init(ProcessorContext<Void, Void> context) {
this.context = context;
this.stateStore = context.getStateStore("click-store");
}
@Override
public void process(Record<String, String> record) {
String userId = record.key(); // user ID 기준
Long count = stateStore.get(userId);
count = (count == null) ? 1L : count + 1;
stateStore.put(userId, count);
// 결과 출력용 로그
System.out.printf("User [%s] click count = %d\n", userId, count);
// 출력 토픽으로 전송
context.forward(new Record<>(userId, count.toString(), record.timestamp()));
}
@Override
public void close() {
// 리소스 정리 필요 시 구현
}
}
}
Kafka Streams는 Kafka에 저장된 데이터를 실시간으로 처리하고 분석할 수 있게 해주는 경량 스트림 처리 라이브러리입니다.
스트림과 테이블의 개념, 윈도우 및 집계, 상태 관리, 정확한 처리 보장 등 강력한 기능을 Java 애플리케이션 내에서 간단히 활용할 수 있다는 점이 큰 장점입니다.
관련 글 링크
13.Kafka 명령어 예제 정리: Topic, Producer, Consumer
2. Kafka 단일 노드 동작 원리: 파티션 분배부터 Consumer 전략까지
16. Kafka AdminClient API 예제: Topic,Cluster,ConsumerGroups
17.Kafka Producer API 사용법(비동기 전송, Transaction, 예제)
18.Kafka Consumer API 사용법(subscribe vs assign, 수동 커밋)
'1.시스템&인프라 > Apache Kafka' 카테고리의 다른 글
21.Kafka Connect Source, Sink 실습: PostgreSQL 연동 (0) | 2025.04.10 |
---|---|
20.Kafka Connect 이해 및 서버 설정하기 : REST API 명령어 (0) | 2025.04.08 |
18.Kafka Consumer API 사용법(subscribe vs assign, 수동 커밋) (0) | 2025.04.04 |
17.Kafka Producer API 사용법(비동기 전송, Transaction, 예제) (0) | 2025.04.04 |
16. Kafka AdminClient API 예제: Topic,Cluster,ConsumerGroups (0) | 2025.04.03 |