1.시스템&인프라/Apache Kafka

13.Kafka 명령어 예제 정리: Topic, Producer, Consumer

쿼드큐브 2025. 3. 31. 22:24
728x90

Kafka를 사용할 때 가장 많이 사용하는 topic, producer, consumer-group, consumer 명령어를 정리했습니다.

 

Kafka 명령어 예제 정리: Topic, Producer, Consumer

 

목차

1. Kafka Topic 명령어 예제: kafka-topics.sh

2. Kafka Producer 명령어 예제: kafka-console-producer.sh  

3. Kafka Consumer Group 명령어 예제: kafka-consumer-groups.sh

4. Kafka Consumer 명령어 예제: kafka-console-consumer.sh

관련 글 링크

 

 

1. Kafka Topic 명령어 예제: kafka-topics.sh

Kafka에서 토픽은 메시지를 전달하는 기본 단위입니다.

kafka-topics.sh 명령어는 토픽을 생성, 삭제, 조회하는 데 사용됩니다.

옵션 설명
--bootstrap-server <호스트:포트> Kafka 서버에 접속할 주소를 지정하는 필수 옵션입니다.
--create 새로운 토픽을 생성합니다.
--delete 기존에 존재하는 토픽을 삭제합니다.
--list Kafka 클러스터에 존재하는 모든 토픽 목록을 출력합니다.
--describe 특정 토픽의 파티션, 복제본, 설정 등을 상세히 출력합니다.
--partitions <숫자> 토픽 생성 시 파티션 개수를 지정합니다.
--replication-factor <숫자> 토픽 생성 시 복제본의 수를 설정합니다.
--config <설정=값> 토픽 생성 시 보존 기간, 압축 방식 등 추가 설정을 적용합니다.

 

1. 기본 토픽 생성
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic my-topic \
  --partitions 3 --replication-factor 2

2. 보존 기간 7일 설정
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic log-topic \
  --partitions 3 --replication-factor 2 \
  --config retention.ms=604800000

3. 토픽 목록 조회
kafka-topics.sh --bootstrap-server localhost:9092 --list

4.토픽 상세 정보 조회
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic my-topic

5.파티션 개수 증가
kafka-topics.sh --bootstrap-server localhost:9092 \
  --alter --topic my-topic --partitions 5

6. 토픽 삭제
kafka-topics.sh --bootstrap-server localhost:9092 \
  --delete --topic my-topic

 

 

2. Kafka Producer 명령어 예제: kafka-console-producer.sh

Kafka Producer는 메시지를 Kafka로 전송하는 역할을 합니다.

kafka-console-producer.sh 명령어로 테스트 메시지를 손쉽게 보낼 수 있습니다.

주요 옵션 설명
--bootstrap-server Kafka 브로커 주소를 지정하는 필수 옵션입니다.
--topic 메시지를 전송할 대상 토픽을 지정합니다.
--property parse.key=true 메시지에 키를 포함할 수 있도록 설정합니다.
--property key.separator=: 메시지 키와 값을 구분하는 구분자를 설정합니다.
--compression-codec gzip 메시지를 압축하여 전송합니다. (예: gzip, snappy 등)
--batch-size 배치로 보낼 메시지 크기를 설정합니다.
--timeout 메시지를 묶어서 전송할 때 최대 대기 시간을 설정합니다.
--request-required-acks 메시지 수신 확인 방식(acks)을 설정합니다.
1. 기본 메시지 전송
kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic test-topic

2.메시지에 Key 포함
kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic test-topic \
  --property parse.key=true \
  --property key.separator=:

#입력 예시: user1:{"name":"Alice"}

3. 압축 및 배치 크기 설정
kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic test-topic \
  --compression-codec gzip \
  --batch-size 32768 \
  --timeout 100

4. acks 옵션 설정 (데이터 수신 확인)
kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic test-topic \
  --request-required-acks all

 

3. Kafka Consumer Group 명령어 예제: kafka-consumer-groups.sh

Kafka Consumer Group은 메시지를 병렬로 처리하는 데 사용됩니다.
kafka-consumer-groups.sh 명령어는 오프셋 확인, 리셋, 그룹 상태 조회에 활용됩니다.

옵션 설명
--bootstrap-server Kafka 브로커 주소를 지정하는 필수 옵션입니다.
--list Kafka 클러스터에 존재하는 모든 컨슈머 그룹 목록을 출력합니다.
--describe 특정 컨슈머 그룹의 파티션, 오프셋, lag 정보를 출력합니다.
--group 작업할 대상 컨슈머 그룹 이름을 지정합니다.
--reset-offsets 오프셋을 초기화하거나 이동시킬 때 사용합니다.
--to-earliest 가장 오래된 오프셋부터 메시지를 소비하도록 설정합니다.
--to-latest 가장 최신 오프셋부터 메시지를 소비하도록 설정합니다.
--to-offset <숫자> 지정한 오프셋 번호로 이동합니다.
--execute 오프셋 리셋 명령을 실제로 실행합니다.
--dry-run 오프셋 리셋이 어떻게 될지를 미리 확인합니다. (실제 변경 없음)

◆ Consumer Group 목록 조회

./kafka-consumer-groups.sh --bootstrap-server 192.168.56.101:9092 --list

 

Consumer Group 정보 조회

1. 명령문
./kafka-consumer-groups.sh --bootstrap-server 192.168.56.103:9092 \
   --describe \
   --group my-consumer-group
   
2. 출력예시
GROUP             TOPIC        PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   CONSUMER-ID                 HOST            CLIENT-ID
my-consumer-group test-topic   0          14              14              0     console-consumer-3cfb1f47   /192.168.56.103 console-consumer
my-consumer-group test-topic   1          0               0               0     console-consumer-3cfb1f47   /192.168.56.103 console-consumer
my-consumer-group test-topic   2          263             263             0     console-consumer-7bd04e5b   /192.168.56.102 console-consumer

3. 설명
- 컨슈머 그룹: my-consumer-group
- 토픽: test-topic
- 3개의 파티션 (0, 1, 2)
- 2개의 컨슈머가 실행 중 (console-consumer) 
- 컨슈머 분배 상태:
  - console-consumer-3cfb1f47 (192.168.56.103): 파티션 0, 1 담당
  - console-consumer-7bd04e5b (192.168.56.102): 파티션 2 담당
  - LAG 값이 0 → 모든 메시지를 실시간으로 처리 중

 

◆ Consumer Group 구성원 조회

1.명령문
./kafka-consumer-groups.sh --bootstrap-server 192.168.56.103:9092 \
  --describe \
  --group my-consumer-group \
  --members

2. 실행결과
GROUP             CONSUMER-ID                HOST            CLIENT-ID        #PARTITIONS
my-consumer-group console-consumer-3cfb1f47  /192.168.56.103 console-consumer 2
my-consumer-group console-consumer-7bd04e5b  /192.168.56.102 console-consumer 1

 

◆ Consumer Group 삭제

1. 명령문
./kafka-consumer-groups.sh --bootstrap-server 192.168.56.102:9092 \
   --delete --group my-consumer-group

2. 결과예시
Deletion of requested consumer groups ('my-consumer-group') was successful.

 

◆ Consumer Group offset reset 테스트

1. 명령문
./kafka-consumer-groups.sh --bootstrap-server 192.168.56.102:9092 \
  --group my-consumer-group \
  --reset-offsets --to-earliest --dry-run
  
실제 실행 전에 시뮬레이션(--dry-run)을 통해 어떤 변화가 일어나는지 확인할 수 있습니다.

 

◆ Consumer Group offset reset

  • Kafka에서 Consumer Group의 오프셋(Offset) 리셋 종류는 여러 가지 방식으로 수행할 수 있습니다.
  • 오프셋을 리셋하는 주요 목적은 특정 시점에서 다시 데이터를 소비하거나, 특정 조건에 따라 소비 위치를 조정하는 것입니다.
리셋 종류(방법) 옵션 설명
최신 오프셋으로 리셋 --to-latest 가장 최신(끝) 오프셋으로 이동하여 새로운 메시지만 소비
가장 오래된(초기) 오프셋으로 리셋 --to-earliest 가장 오래된 오프셋부터 다시 소비 시작
특정 날짜/시간 기준 리셋 --to-datetime <YYYY-MM-DDTHH:mm:ss.sss> 지정한 시간대의 오프셋으로 이동
현재 오프셋 유지 --to-current 현재 오프셋을 그대로 유지 (테스트용)
특정 오프셋 번호로 리셋 --to-offset <숫자> 특정 오프셋 번호를 직접 지정하여 이동
현재 오프셋을 일정 숫자만큼 이동 --shift-by <숫자> 현재 오프셋에서 지정한 숫자만큼 이동 (양수: 앞으로, 음수: 뒤로)
현재 시간에서 일정 기간만큼 전/후로 이동 --by-duration <PnDTnHnMnS> 현재 시간에서 지정한 기간만큼 전/후로 이동
CSV 파일 기반으로 리셋 --from-file <파일 경로> 미리 정의된 CSV 파일을 기반으로 특정 오프셋을 적용
오프셋 리셋 오류:
Error: Assignments can only be reset if the group 'my-consumer-group' is inactive, but the current state is Stable.
 - Kafka에서는 컨슈머 그룹이 활성화된 상태(Stable) 일 경우, --reset-offsets 명령을 실행할 수 없습니다.
 - Stable 상태는 하나 이상의 컨슈머가 현재 그룹에 참여하여 데이터를 소비 중이라는 의미입니다.
 - Kafka는 데이터 손실이나 불안정한 상태를 방지하기 위해 활성 상태(Stable)에서 오프셋을 리셋하는 것을 차단합니다.

 

 

4. Kafka Consumer 명령어 예제: kafka-console-consumer.sh

Kafka Consumer는 메시지를 Kafka에서 읽어오는 역할을 합니다.
kafka-console-consumer.sh 명령어로 메시지를 콘솔에서 바로 확인할 수 있습니다.

옵션 설명
--bootstrap-server Kafka 브로커 주소를 지정하는 필수 옵션입니다.
--topic 소비할 Kafka 토픽을 지정합니다.
--from-beginning 토픽의 처음부터 모든 메시지를 소비합니다.
--group 컨슈머가 속할 그룹 ID를 지정합니다.
--partition <숫자> 특정 파티션에서만 메시지를 소비합니다.
--offset <위치> 메시지를 읽기 시작할 위치를 지정합니다. (earliest, latest, 숫자)
--max-messages <개수> 지정한 개수만큼 메시지를 소비한 후 종료합니다.
--timeout-ms <밀리초> 지정된 시간 동안 메시지가 없으면 자동 종료합니다.
--property print.key=true 메시지 키를 출력하도록 설정합니다.
--property print.timestamp=true 메시지의 생성 시간(timestamp)을 함께 출력합니다.

◆ 기본 메시지 소비

1. test-topic의 최신 메시지부터 소비합니다.
./kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 \
  --topic test-topic
  
2. my-consumer-group에 속한 소비자로 test-topic의 메시지를 소비합니다 
./kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 \
  --topic test-topic --group my-consumer-group

 

◆ 토픽의 처음부터 메시지 소비

./kafka-console-consumer.sh --bootstrap-server 192.168.56.103:9092 \
   --topic test-topic --from-beginning

 

◆ 특정 파티션 메시지 소비

1. test-topic의 0번 파티션에서만 메시지를 소비합니다.
./kafka-console-consumer.sh --bootstrap-server 192.168.56.102:9092 \
  --topic test-topic --partition 0

 

특정 개수의 메시지만 소비 후 종료

1. 5개의 메시지만 소비한 후 종료됩니다.
./kafka-console-consumer.sh --bootstrap-server 192.168.56.102:9092 \
   --topic test-topic --max-messages 5

 

◆ 타임스태프와 키를 함께 출력

1. procedure 명령문 
./kafka-console-producer.sh --broker-list 192.168.56.102:9092 \
  --topic test-topic --property parse.key=true --property key.separator=:
#입력 예시
user1:{"id": 1, "name": "Alice", "age": 25, "city": "Seoul"}
user2:{"id": 2, "name": "Bob", "age": 30, "city": ">Busan"}
user1:{"id": 3, "name": "Charlie", "age": 28, "city": "Incheon"}

2. consumer 명령문 메시지 키와 값을 함께 출력합니다
./kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 \
  --topic test-topic \
  --property print.key=true \
  --property print.value=true \
  --property print.timestamp=true
  --from-beginning
#출력 예시
CreateTime:1742447818647    user1   {"id": 1, "name": "Alice", "age": 25, "city": "Seoul"}
CreateTime:1742447818674    user1   {"id": 3, "name": "Charlie", "age": 28, "city": "Incheon"}
CreateTime:1742447818674    user2   {"id": 2, "name": "Bob", "age": 30, "city": "Busan"}

 

◆ 메세지가 없을 경우 일정 시간 후 종료

1. 5초 동안 메시지가 없으면 자동 종료됩니다.
./kafka-console-consumer.sh --bootstrap-server 192.168.56.102:9092 \
  --topic test-topic --timeout-ms 5000

 

정규식을 사용한 여러 토픽 소비

1. test로 시작하는 모든 토픽에서 메시지를 소비합니다.
./kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 \
  --include "test.*"

 

◆ offset 설정으로 메시지 소비

1. earliest: 토픽의 가장 처음부터 소비
#earliest를 설정하면 해당 파티션에 저장된 가장 오래된 메시지부터 소비가 시작됩니다.
#기존에 실행했던 소비자 그룹이 있어도 가장 처음부터 다시 메시지를 읽습니다.

./kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 \
  --topic test-topic --offset earliest --partition 0

2. latest (기본값): 새로운 메시지부터 소비
#latest를 설정하면 현재까지 존재하는 메시지를 무시하고, 새롭게 추가되는 메시지부터 소비합니다.
#기본 설정값이므로 명시적으로 --offset latest를 입력하지 않아도 동일한 동작을 합니다.

./kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 \
  --topic test-topic --offset latest --partition 0

3. 특정 오프셋 값 (0, 10 등)에서 시작
#지정한 오프셋 (10)부터 메시지를 소비합니다.
#0부터 시작하면 가장 처음 메시지부터 읽는 것과 동일합니다.
#특정 오프셋 이후의 메시지만 소비하기 때문에, 이전 메시지는 무시됩니다.

./kafka-console-consumer.sh --bootstrap-server 192.168.56.103:9092 \
  --topic test-topic --partition 0 --offset 10

 

◆ 특정 키 메시지만 소비

  • 키(Key)를 기반으로 특정 메시지만 소비하려면, --partition 옵션을 활용할 수 있습니다.
  • Kafka는 같은 키를 가진 메시지를 같은 파티션으로 할당하므로, 특정 키를 가진 메시지만 필터링할 수 있습니다.
#특정 파티션을 지정하면 해당 파티션에 있는 메시지만 소비 가능.
#키가 어떤 파티션에 할당되었는지는 Kafka 내부 로직에 따라 결정됩니다

./kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 \
  --topic test-topic --partition 0 --from-beginning \
  --property print.key=true --property print.value=true

 


Kafka에서 가장 많이 사용하는 topic, producer, consumer-group, consumer 명령어를 정리해 보았습니다.
각 명령어의 주요 옵션과 자주 사용하는 예제를 함께 익힐 수 있습니다.

 

 

관련 글 링크

7. Kafka Producer acks 설정 및 동작 이해하기: 데이터 유실 방지

8. Kafka Consumer 동작 이해하기: Polling, Offset, Commit

5. Kafka 리더 장애 발생 시 Failover를 위한 Producer와 Consumer 설정

728x90