1.시스템&인프라/Apache Kafka

18.Kafka Consumer API 사용법(subscribe vs assign, 수동 커밋)

쿼드큐브 2025. 4. 4. 13:57
728x90
반응형

Kafka Consumer API 사용법을 정리했습니다. 기본 개념부터 주요 메서드 설명, 설정 옵션, 자바 예제 코드까지 포함되어 있어 Kafka 메시지 소비 클라이언트를 구현할 때 큰 도움이 됩니다.

 

Kafka Consumer API 사용법(subscribe vs assign, 수동 커밋)

 

목차

1. Kafka Consumer API 함수 정리

2. Kafka Consumer 기본 사용법

3. Kafka Consumer Offset 수동 커밋

4. Kafka Offset 직접 지정: Seek

5. Kafka Consumer 리밸런싱: Rebalancing

6. Kafka Consumer subscribe() vs assign() 차이

관련 글 링크

 

 

1. Kafka Consumer API 함수 정리

◆ 기본 함수

함수 설명
poll(Duration timeout) 브로커로부터 메시지를 가져옴 (pull 방식)
subscribe(Collection<String> topics) 토픽 목록 구독
subscribe(Pattern pattern) 정규표현식 기반 구독
assign(Collection<TopicPartition> partitions) 명시적으로 파티션 할당
unsubscribe() 구독 중인 토픽 해제
listTopics() 토픽 및 파티션 정보 조회
partitionsFor(String topic) 특정 토픽의 파티션 정보 조회
metrics() 소비자 측정 지표 조회
close(), wakeup(), enforceRebalance() 종료, 인터럽트 등 처리, 강제 리밸런싱(일반적으로 자동 수행_

 

◆ 오프셋 처리

함수 설명
commitSync() 동기식 수동 커밋
commitSync(Map<TopicPartition, OffsetAndMetadata>) 특정 오프셋을 동기 커밋
commitAsync() 비동기식 수동 커밋
commitAsync(OffsetCommitCallback) 콜백 포함 비동기 커밋
seek(TopicPartition, long offset) 특정 파티션의 오프셋 설정
seekToBeginning(Collection<TopicPartition>) 가장 처음부터 읽기
seekToEnd(Collection<TopicPartition>) 가장 마지막 오프셋부터 읽기
position(TopicPartition) 현재 오프셋 위치 조회
committed(TopicPartition) 커밋된 오프셋 정보 조회
offsetsForTimes(Map<TopicPartition, Long>) 타임스탬프 기준 오프셋 조회
beginningOffsets(...), endOffsets(...) 시작/끝 오프셋 조회

 

◆ 컨트롤 및 상태

함수 설명
pause(Collection<TopicPartition>) 일시적으로 소비 중단
resume(Collection<TopicPartition>) 일시 중단된 소비 재개
paused() 일시 중지된 파티션 목록 조회
assignment() 현재 할당된 파티션 목록 조회
subscription() 현재 구독 중인 토픽 목록

 

 

2. Kafka Consumer 기본 사용법

Kafka Consumer는 하나 이상의 토픽으로부터 메시지를 읽어오는 역할을 합니다. 기본적인 흐름은 다음과 같습니다:

  1. Consumer 설정
  2. KafkaConsumer 객체 생성
  3. 토픽 구독 (subscribe)
  4. 메시지 폴링 (poll)
  5. 처리 후 커밋 (auto 또는 manual commit)
  6. 종료 시 close()
// 1. Consumer 설정
Properties props = new Properties();
// 브로커 주소
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 컨슈머 그룹 ID
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
// 키 역직렬화
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 
// 값 역직렬화
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); 
// 없을 경우 처음부터 읽기
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
// 자동 커밋
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

// 2. Consumer 객체 생성
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
// 3. 토픽 구독
    consumer.subscribe(Collections.singletonList("my-topic"));

// 4. 메시지 지속적으로 폴링
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

// 5. 레코드 처리
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("토픽: %s, 파티션: %d, 오프셋: %d, 키: %s, 값: %s%n",
                    record.topic(), record.partition(), 
                    record.offset(), record.key(), record.value());
        }

// 6. 자동 커밋이 활성화되어 있으면 커밋은 자동 수행됨
// 수동 커밋의 경우: consumer.commitSync();
    }
}

 

◆ 자동 커밋 vs 수동 커밋

  • enable.auto.commit=true: 단순한 처리에 적합.
  • false 설정 후 commitSync() 사용: 정확한 메시지 처리 후 커밋해야 하는 경우 적합.

poll() 호출 주기 주의

  • poll()을 5초 이상 호출하지 않으면 리밸런싱이 발생해 파티션이 다른 컨슈머로 넘어갈 수 있습니다.

Consumer는 close() 호출로 자원 해제 필수

 

 

3. Kafka Consumer Offset 수동 커밋

Kafka는 기본적으로 메시지를 소비한 후 오프셋을 자동 커밋하거나, 개발자가 직접 오프셋을 수동 커밋하도록 설정할 수 있습니다.
수동 커밋(Manual Commit)은 다음과 같은 상황에서 유용합니다:

  • 메시지를 정확히 한 번 처리해야 하는 경우
  • 메시지 처리 로직이 복잡하거나 실패 가능성이 있는 경우
  • 중복처리 방지 또는 정확한 체크포인트가 필요한 경우
// 1. Consumer 설정
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-commit-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 자동 커밋 끄기
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// 2. Consumer 생성 및 구독
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Collections.singletonList("my-topic"));

    while (true) {
        // 3. 메시지 폴링
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

        // 4. 레코드 처리
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("Consumed message: offset=%d, key=%s, value=%s%n",
                    record.offset(), record.key(), record.value());

            //실제 로직: DB 저장, 처리 성공 여부 판단 등 수행
        }

        // 5. 수동 커밋 (처리가 끝난 후)
        try {
            consumer.commitSync(); // 현재까지 처리한 오프셋 커밋
            System.out.println("Offsets committed successfully");
        } catch (CommitFailedException e) {
            System.err.println("Commit failed: " + e.getMessage());
        }
    }
}

 

 커밋 실패시 발생 가능한 문제

  • Kafka 브로커 장애, 네트워크 문제, 컨슈머 리밸런싱 등으로 커밋이 실패할 수 있음
  • 오프셋이 커밋되지 않으면, 컨슈머 재시작 시 동일 메시지를 재처리하게 됨 (중복 가능성)
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Collections.singletonList("important-topic"));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
        boolean allProcessed = true;

// 1. 메시지 처리
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("처리 중: offset=%d, key=%s, value=%s%n",
                    record.offset(), record.key(), record.value());
            try {
                // 메시지 처리 로직 예: DB 저장, 외부 시스템 호출 등
                // processMessage(record.value());

            } catch (Exception e) {
                allProcessed = false;
                System.err.println("처리 실패, 커밋하지 않음: " + e.getMessage());
                break; // 하나라도 실패하면 커밋 생략
            }
        }

// 2. 모든 메시지 처리가 성공했다면 커밋
        if (allProcessed) {
            try {
                consumer.commitSync(); // 동기 커밋
                System.out.println("오프셋 커밋 성공");
            } catch (CommitFailedException e) {
                System.err.println("커밋 실패: " + e.getMessage());
                e.printStackTrace();
                // 대응: 재시도 또는 로그에 백업
                // 예: 실패 오프셋을 파일/DB에 기록
                // backupOffsets(records);
            }
        } else {
            System.out.println("일부 메시지 처리 실패 → 커밋 생략");
        }
    }
}

 

 

4. Kafka Offset 직접 지정: Seek

seek()은 Kafka Consumer가 특정 파티션의 읽기 오프셋을 직접 지정할 수 있게 해주는 메서드입니다.
기본적으로 Kafka는 자동으로 오프셋을 관리하지만, 다음과 같은 상황에서 seek()을 사용하면 고급 기능이 가능합니다.

상황 설명
메시지를 재처리해야 할 때 예: 특정 오프셋 이후 데이터에 오류가 있었던 경우
특정 시점의 데이터만 처리 예: 오늘 오전 9시 이후 데이터만 보고 싶을 때
수동 오프셋 관리 Redis/DB 등에 저장된 오프셋으로 복구
특정 파티션만 테스트 파티션 별로 수동 탐색 및 필터링

assign() 없이 seek()만 쓰면 안 되며,  seek()은 먼저 assign()으로 파티션을 등록한 뒤에만 유효합니다.

수동 오프셋 제어를 하려면 enable.auto.commit=false 설정을 권장합니다.

seek()은 단순히 위치만 지정하는 것으로, 실제 메시지는 poll()로 가져와야 합니다.

 

 기본 흐름

  • assign()을 통해 수동으로 파티션 할당을 설정합니다. (subscribe()를 쓰지 않음)
  • seek()을 사용해서 지정한 오프셋으로 읽기 위치를 조정합니다.
  • 이후 poll()을 호출하면, offset 100 이후부터 데이터가 수신됩니다.
TopicPartition tp = new TopicPartition("my-topic", 0);
consumer.assign(List.of(tp));        // 파티션 직접 할당
consumer.seek(tp, 100);              // 오프셋 100부터 읽기

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
    System.out.printf("📨 offset=%d, key=%s, value=%s%n",
        record.offset(), record.key(), record.value());
}

 

메시지 재처리 예시

  • 특정 오프셋에서 메시지 재처리
TopicPartition tp = new TopicPartition("my-topic", 0);
consumer.assign(List.of(tp));

// 오프셋 200부터 다시 읽기
consumer.seek(tp, 200);

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    System.out.printf("재처리: offset=%d, value=%s%n", record.offset(), record.value());
}
  • 가장 처음(earliest)부터 재처리
consumer.assign(List.of(new TopicPartition("my-topic", 0)));
consumer.seekToBeginning(consumer.assignment()); // 모든 할당 파티션 처음부터

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
    System.out.println("처음부터 처리: " + record.offset());
}
  • 가장 마지막(최신)부터 읽기
consumer.assign(List.of(new TopicPartition("my-topic", 0)));
consumer.seekToEnd(consumer.assignment()); // 가장 최근 오프셋 위치로 이동

// poll 하면 새로 들어온 메시지만 읽게 됨
  • 타임스탬프 기반 오프셋 탐색 (예: 특정 시간 이후)
TopicPartition tp = new TopicPartition("my-topic", 0);
consumer.assign(List.of(tp));

// 1시간 전의 타임스탬프를 기준으로 오프셋 검색
//Kafka가 자동으로 타임스탬프를 부여하기 때문에, 
//기본 설정으로도 잘 동작합니다.
long targetTime = System.currentTimeMillis() - (60 * 60 * 1000);
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(Map.of(tp, targetTime));

if (result.get(tp) != null) {
    long offset = result.get(tp).offset();
    consumer.seek(tp, offset); //특정 시간 이후의 첫 오프셋으로 이동
    System.out.println("시간 기반 seek → offset: " + offset);
}

 

 

5. Kafka Consumer 리밸런싱: Rebalancing

Kafka 3.0.0 버전부터 추가된 메서드이며, 현재 컨슈머가 리밸런싱을 강제로 유도하도록 요청하는 기능입니다.

Kafka Consumer는 컨슈머 그룹 내에서 토픽의 파티션을 자동으로 나누어 처리합니다.
이때 파티션 할당은 리밸런싱(rebalancing) 과정에서 결정됩니다.

보통 리밸런싱은 다음 상황에서 자동 발생합니다:

  • 새로운 컨슈머가 그룹에 참여
  • 기존 컨슈머가 종료되거나 실패
  • subscribe()한 토픽이 바뀜
  • heartbeat 실패

그런데

  • 설정 변경,
  • 외부 조건에 따라 할당 재조정 필요 시

와 같은 경우 명시적으로 enforceRebalance() 함수 호출로 리밸런싱을 요청할 수 있습니다.

ssign() 방식에서는 사용 불가하며,  subscribe() 방식에서만 의미 있음 (그룹 기반 컨슈머일 때만)

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("my-topic"));

// 특정 조건에서 리밸런싱을 강제로 유도
if (shouldTriggerRebalance()) {
    consumer.enforceRebalance();
}
  • enforceRebalance()를 호출하면, 내부적으로는 컨슈머 리밸런싱을 트리거합니다.
  • 다음 poll() 호출 시, 현재 컨슈머는 파티션 할당이 해제되고 다시 리밸런싱을 수행합니다.
  • 이 메서드는 비동기적이고, 즉시 리밸런싱이 일어나진 않지만, 다음 poll() 시점에서 반영됩니다.

 

6. Kafka Consumer subscribe() vs assign() 차이

 

subscribe()란?

consumer.subscribe(List.of("my-topic"));
  • 컨슈머 그룹을 기반으로 자동 파티션 분배를 해주는 방식
  • Kafka 내부의 Group Coordinator가 각 컨슈머에 파티션을 나눠줌
  • 컨슈머가 늘거나 줄면 자동 리밸런싱이 발생
  • 가장 일반적이고 많이 쓰이는 방식 : 운영 서비스에서는 거의 항상 이 방식 사용
  • enable.auto.commit=true로 설정하면 오프셋도 자동 커밋 가능
Properties props = ...
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("logs", "metrics"));

while (true) {
    var records = consumer.poll(Duration.ofMillis(100));
    for (var record : records) {
        System.out.println("Received: " + record.value());
    }
}

 

assign()이란?

consumer.assign(List.of(new TopicPartition("my-topic", 0)));
  • 개발자가 직접 파티션을 지정해서 소비하는 방식
  • 컨슈머 그룹 기능을 사용하지 않음, 그룹 리밸런싱 없음
  • 자동 오프셋 커밋, 자동 할당 없음 → 개발자가 수동 처리해야 함
  • 특정 파티션 대상 테스트, 장애 복구, 재처리, 실험적 제어 할 때 주로 사용
  • seek()을 통해 오프셋을 직접 제어할 수 있음
  • 수동 오프셋 저장/복구 로직을 직접 구현해야 함

subscribe() vs assign()

항목 subscribe() assign()
주 사용 목적 일반 서비스 처리 테스트 / 장애 복구 / 실험
파티션 제어 자동 수동
오프셋 관리 자동 가능 직접 관리
리밸런싱 자동 발생 없음
seek() 활용 제한적 (리밸런싱 중 무효화) 자유롭게 사용 가능
사용 예 로그 수집기, 알림 처리기 Redis 기반 오프셋 복구, 특정 메시지 재처리

 

상황 subscribe() assign()
seek() 사용하고 싶다면? 리밸런싱 시 무효화됨 안정적
여러 파티션 중 일부만 처리하고 싶다면? 전체 토픽 구독됨 원하는 파티션만
컨슈머가 늘었다 줄었다 한다면? 자동 조절 수동 수정 필요
오프셋을 외부(Redis/DB)에 저장한다면? 복잡함 추천 방식

 


Kafka Consumer API는 Kafka 메시지를 효과적으로 소비하고 처리 흐름을 제어하는 핵심 컴포넌트입니다. 다양한 설정 옵션과 메서드를 이해하면 안정적인 메시지 소비 애플리케이션을 만들 수 있습니다.

 

 

관련 글 링크

16. Kafka AdminClient API 예제: Topic,Cluster,ConsumerGroups

17.Kafka Producer API 사용법(비동기 전송, Transaction, 예제)

13.Kafka 명령어 예제 정리: Topic, Producer, Consumer

5. Kafka 리더 장애 발생 시 Failover를 위한 Producer와 Consumer 설정

 

728x90
반응형