Kafka를 사용할 때 가장 많이 사용하는 topic, producer, consumer-group, consumer 명령어를 정리했습니다.
Kafka 명령어 예제 정리: Topic, Producer, Consumer
목차1. Kafka Topic 명령어 예제: kafka-topics.sh
2. Kafka Producer 명령어 예제: kafka-console-producer.sh
3. Kafka Consumer Group 명령어 예제: kafka-consumer-groups.sh
1. Kafka Topic 명령어 예제: kafka-topics.sh
Kafka에서 토픽은 메시지를 전달하는 기본 단위입니다.
kafka-topics.sh 명령어는 토픽을 생성, 삭제, 조회하는 데 사용됩니다.
옵션 | 설명 |
--bootstrap-server <호스트:포트> | Kafka 서버에 접속할 주소를 지정하는 필수 옵션입니다. |
--create | 새로운 토픽을 생성합니다. |
--delete | 기존에 존재하는 토픽을 삭제합니다. |
--list | Kafka 클러스터에 존재하는 모든 토픽 목록을 출력합니다. |
--describe | 특정 토픽의 파티션, 복제본, 설정 등을 상세히 출력합니다. |
--partitions <숫자> | 토픽 생성 시 파티션 개수를 지정합니다. |
--replication-factor <숫자> | 토픽 생성 시 복제본의 수를 설정합니다. |
--config <설정=값> | 토픽 생성 시 보존 기간, 압축 방식 등 추가 설정을 적용합니다. |
1. 기본 토픽 생성
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic my-topic \
--partitions 3 --replication-factor 2
2. 보존 기간 7일 설정
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic log-topic \
--partitions 3 --replication-factor 2 \
--config retention.ms=604800000
3. 토픽 목록 조회
kafka-topics.sh --bootstrap-server localhost:9092 --list
4.토픽 상세 정보 조회
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic my-topic
5.파티션 개수 증가
kafka-topics.sh --bootstrap-server localhost:9092 \
--alter --topic my-topic --partitions 5
6. 토픽 삭제
kafka-topics.sh --bootstrap-server localhost:9092 \
--delete --topic my-topic
2. Kafka Producer 명령어 예제: kafka-console-producer.sh
Kafka Producer는 메시지를 Kafka로 전송하는 역할을 합니다.
kafka-console-producer.sh 명령어로 테스트 메시지를 손쉽게 보낼 수 있습니다.
주요 옵션 | 설명 |
--bootstrap-server | Kafka 브로커 주소를 지정하는 필수 옵션입니다. |
--topic | 메시지를 전송할 대상 토픽을 지정합니다. |
--property parse.key=true | 메시지에 키를 포함할 수 있도록 설정합니다. |
--property key.separator=: | 메시지 키와 값을 구분하는 구분자를 설정합니다. |
--compression-codec gzip | 메시지를 압축하여 전송합니다. (예: gzip, snappy 등) |
--batch-size | 배치로 보낼 메시지 크기를 설정합니다. |
--timeout | 메시지를 묶어서 전송할 때 최대 대기 시간을 설정합니다. |
--request-required-acks | 메시지 수신 확인 방식(acks)을 설정합니다. |
1. 기본 메시지 전송
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic test-topic
2.메시지에 Key 포함
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic test-topic \
--property parse.key=true \
--property key.separator=:
#입력 예시: user1:{"name":"Alice"}
3. 압축 및 배치 크기 설정
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic test-topic \
--compression-codec gzip \
--batch-size 32768 \
--timeout 100
4. acks 옵션 설정 (데이터 수신 확인)
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic test-topic \
--request-required-acks all
3. Kafka Consumer Group 명령어 예제: kafka-consumer-groups.sh
Kafka Consumer Group은 메시지를 병렬로 처리하는 데 사용됩니다.
kafka-consumer-groups.sh 명령어는 오프셋 확인, 리셋, 그룹 상태 조회에 활용됩니다.
옵션 | 설명 |
--bootstrap-server | Kafka 브로커 주소를 지정하는 필수 옵션입니다. |
--list | Kafka 클러스터에 존재하는 모든 컨슈머 그룹 목록을 출력합니다. |
--describe | 특정 컨슈머 그룹의 파티션, 오프셋, lag 정보를 출력합니다. |
--group | 작업할 대상 컨슈머 그룹 이름을 지정합니다. |
--reset-offsets | 오프셋을 초기화하거나 이동시킬 때 사용합니다. |
--to-earliest | 가장 오래된 오프셋부터 메시지를 소비하도록 설정합니다. |
--to-latest | 가장 최신 오프셋부터 메시지를 소비하도록 설정합니다. |
--to-offset <숫자> | 지정한 오프셋 번호로 이동합니다. |
--execute | 오프셋 리셋 명령을 실제로 실행합니다. |
--dry-run | 오프셋 리셋이 어떻게 될지를 미리 확인합니다. (실제 변경 없음) |
◆ Consumer Group 목록 조회
./kafka-consumer-groups.sh --bootstrap-server 192.168.56.101:9092 --list
◆ Consumer Group 정보 조회
1. 명령문
./kafka-consumer-groups.sh --bootstrap-server 192.168.56.103:9092 \
--describe \
--group my-consumer-group
2. 출력예시
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-consumer-group test-topic 0 14 14 0 console-consumer-3cfb1f47 /192.168.56.103 console-consumer
my-consumer-group test-topic 1 0 0 0 console-consumer-3cfb1f47 /192.168.56.103 console-consumer
my-consumer-group test-topic 2 263 263 0 console-consumer-7bd04e5b /192.168.56.102 console-consumer
3. 설명
- 컨슈머 그룹: my-consumer-group
- 토픽: test-topic
- 3개의 파티션 (0, 1, 2)
- 2개의 컨슈머가 실행 중 (console-consumer)
- 컨슈머 분배 상태:
- console-consumer-3cfb1f47 (192.168.56.103): 파티션 0, 1 담당
- console-consumer-7bd04e5b (192.168.56.102): 파티션 2 담당
- LAG 값이 0 → 모든 메시지를 실시간으로 처리 중
◆ Consumer Group 구성원 조회
1.명령문
./kafka-consumer-groups.sh --bootstrap-server 192.168.56.103:9092 \
--describe \
--group my-consumer-group \
--members
2. 실행결과
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS
my-consumer-group console-consumer-3cfb1f47 /192.168.56.103 console-consumer 2
my-consumer-group console-consumer-7bd04e5b /192.168.56.102 console-consumer 1
◆ Consumer Group 삭제
1. 명령문
./kafka-consumer-groups.sh --bootstrap-server 192.168.56.102:9092 \
--delete --group my-consumer-group
2. 결과예시
Deletion of requested consumer groups ('my-consumer-group') was successful.
◆ Consumer Group offset reset 테스트
1. 명령문
./kafka-consumer-groups.sh --bootstrap-server 192.168.56.102:9092 \
--group my-consumer-group \
--reset-offsets --to-earliest --dry-run
실제 실행 전에 시뮬레이션(--dry-run)을 통해 어떤 변화가 일어나는지 확인할 수 있습니다.
◆ Consumer Group offset reset
- Kafka에서 Consumer Group의 오프셋(Offset) 리셋 종류는 여러 가지 방식으로 수행할 수 있습니다.
- 오프셋을 리셋하는 주요 목적은 특정 시점에서 다시 데이터를 소비하거나, 특정 조건에 따라 소비 위치를 조정하는 것입니다.
리셋 종류(방법) | 옵션 | 설명 |
최신 오프셋으로 리셋 | --to-latest | 가장 최신(끝) 오프셋으로 이동하여 새로운 메시지만 소비 |
가장 오래된(초기) 오프셋으로 리셋 | --to-earliest | 가장 오래된 오프셋부터 다시 소비 시작 |
특정 날짜/시간 기준 리셋 | --to-datetime <YYYY-MM-DDTHH:mm:ss.sss> | 지정한 시간대의 오프셋으로 이동 |
현재 오프셋 유지 | --to-current | 현재 오프셋을 그대로 유지 (테스트용) |
특정 오프셋 번호로 리셋 | --to-offset <숫자> | 특정 오프셋 번호를 직접 지정하여 이동 |
현재 오프셋을 일정 숫자만큼 이동 | --shift-by <숫자> | 현재 오프셋에서 지정한 숫자만큼 이동 (양수: 앞으로, 음수: 뒤로) |
현재 시간에서 일정 기간만큼 전/후로 이동 | --by-duration <PnDTnHnMnS> | 현재 시간에서 지정한 기간만큼 전/후로 이동 |
CSV 파일 기반으로 리셋 | --from-file <파일 경로> | 미리 정의된 CSV 파일을 기반으로 특정 오프셋을 적용 |
오프셋 리셋 오류:
Error: Assignments can only be reset if the group 'my-consumer-group' is inactive, but the current state is Stable.
- Kafka에서는 컨슈머 그룹이 활성화된 상태(Stable) 일 경우, --reset-offsets 명령을 실행할 수 없습니다.
- Stable 상태는 하나 이상의 컨슈머가 현재 그룹에 참여하여 데이터를 소비 중이라는 의미입니다.
- Kafka는 데이터 손실이나 불안정한 상태를 방지하기 위해 활성 상태(Stable)에서 오프셋을 리셋하는 것을 차단합니다.
4. Kafka Consumer 명령어 예제: kafka-console-consumer.sh
Kafka Consumer는 메시지를 Kafka에서 읽어오는 역할을 합니다.
kafka-console-consumer.sh 명령어로 메시지를 콘솔에서 바로 확인할 수 있습니다.
옵션 | 설명 |
--bootstrap-server | Kafka 브로커 주소를 지정하는 필수 옵션입니다. |
--topic | 소비할 Kafka 토픽을 지정합니다. |
--from-beginning | 토픽의 처음부터 모든 메시지를 소비합니다. |
--group | 컨슈머가 속할 그룹 ID를 지정합니다. |
--partition <숫자> | 특정 파티션에서만 메시지를 소비합니다. |
--offset <위치> | 메시지를 읽기 시작할 위치를 지정합니다. (earliest, latest, 숫자) |
--max-messages <개수> | 지정한 개수만큼 메시지를 소비한 후 종료합니다. |
--timeout-ms <밀리초> | 지정된 시간 동안 메시지가 없으면 자동 종료합니다. |
--property print.key=true | 메시지 키를 출력하도록 설정합니다. |
--property print.timestamp=true | 메시지의 생성 시간(timestamp)을 함께 출력합니다. |
◆ 기본 메시지 소비
1. test-topic의 최신 메시지부터 소비합니다.
./kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 \
--topic test-topic
2. my-consumer-group에 속한 소비자로 test-topic의 메시지를 소비합니다
./kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 \
--topic test-topic --group my-consumer-group
◆ 토픽의 처음부터 메시지 소비
./kafka-console-consumer.sh --bootstrap-server 192.168.56.103:9092 \
--topic test-topic --from-beginning
◆ 특정 파티션 메시지 소비
1. test-topic의 0번 파티션에서만 메시지를 소비합니다.
./kafka-console-consumer.sh --bootstrap-server 192.168.56.102:9092 \
--topic test-topic --partition 0
◆ 특정 개수의 메시지만 소비 후 종료
1. 5개의 메시지만 소비한 후 종료됩니다.
./kafka-console-consumer.sh --bootstrap-server 192.168.56.102:9092 \
--topic test-topic --max-messages 5
◆ 타임스태프와 키를 함께 출력
1. procedure 명령문
./kafka-console-producer.sh --broker-list 192.168.56.102:9092 \
--topic test-topic --property parse.key=true --property key.separator=:
#입력 예시
user1:{"id": 1, "name": "Alice", "age": 25, "city": "Seoul"}
user2:{"id": 2, "name": "Bob", "age": 30, "city": ">Busan"}
user1:{"id": 3, "name": "Charlie", "age": 28, "city": "Incheon"}
2. consumer 명령문 메시지 키와 값을 함께 출력합니다
./kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 \
--topic test-topic \
--property print.key=true \
--property print.value=true \
--property print.timestamp=true
--from-beginning
#출력 예시
CreateTime:1742447818647 user1 {"id": 1, "name": "Alice", "age": 25, "city": "Seoul"}
CreateTime:1742447818674 user1 {"id": 3, "name": "Charlie", "age": 28, "city": "Incheon"}
CreateTime:1742447818674 user2 {"id": 2, "name": "Bob", "age": 30, "city": "Busan"}
◆ 메세지가 없을 경우 일정 시간 후 종료
1. 5초 동안 메시지가 없으면 자동 종료됩니다.
./kafka-console-consumer.sh --bootstrap-server 192.168.56.102:9092 \
--topic test-topic --timeout-ms 5000
◆ 정규식을 사용한 여러 토픽 소비
1. test로 시작하는 모든 토픽에서 메시지를 소비합니다.
./kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 \
--include "test.*"
◆ offset 설정으로 메시지 소비
1. earliest: 토픽의 가장 처음부터 소비
#earliest를 설정하면 해당 파티션에 저장된 가장 오래된 메시지부터 소비가 시작됩니다.
#기존에 실행했던 소비자 그룹이 있어도 가장 처음부터 다시 메시지를 읽습니다.
./kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 \
--topic test-topic --offset earliest --partition 0
2. latest (기본값): 새로운 메시지부터 소비
#latest를 설정하면 현재까지 존재하는 메시지를 무시하고, 새롭게 추가되는 메시지부터 소비합니다.
#기본 설정값이므로 명시적으로 --offset latest를 입력하지 않아도 동일한 동작을 합니다.
./kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 \
--topic test-topic --offset latest --partition 0
3. 특정 오프셋 값 (0, 10 등)에서 시작
#지정한 오프셋 (10)부터 메시지를 소비합니다.
#0부터 시작하면 가장 처음 메시지부터 읽는 것과 동일합니다.
#특정 오프셋 이후의 메시지만 소비하기 때문에, 이전 메시지는 무시됩니다.
./kafka-console-consumer.sh --bootstrap-server 192.168.56.103:9092 \
--topic test-topic --partition 0 --offset 10
◆ 특정 키 메시지만 소비
- 키(Key)를 기반으로 특정 메시지만 소비하려면, --partition 옵션을 활용할 수 있습니다.
- Kafka는 같은 키를 가진 메시지를 같은 파티션으로 할당하므로, 특정 키를 가진 메시지만 필터링할 수 있습니다.
#특정 파티션을 지정하면 해당 파티션에 있는 메시지만 소비 가능.
#키가 어떤 파티션에 할당되었는지는 Kafka 내부 로직에 따라 결정됩니다
./kafka-console-consumer.sh --bootstrap-server 192.168.56.101:9092 \
--topic test-topic --partition 0 --from-beginning \
--property print.key=true --property print.value=true
Kafka에서 가장 많이 사용하는 topic, producer, consumer-group, consumer 명령어를 정리해 보았습니다.
각 명령어의 주요 옵션과 자주 사용하는 예제를 함께 익힐 수 있습니다.
관련 글 링크
7. Kafka Producer acks 설정 및 동작 이해하기: 데이터 유실 방지
'1.시스템&인프라 > Apache Kafka' 카테고리의 다른 글
15. Kafka UI 관리도구 AKHQ 설치 및 설정 하기: 사용자 인증 (0) | 2025.04.01 |
---|---|
14.Kafka KRaft 명령어 예제 정리: Cluster, Storage, Metadata (0) | 2025.03.31 |
12. Kafka KRaft 모드 장애복구 및 증설 테스트 (Controller 3, Broker 3) (0) | 2025.03.31 |
11. Kafka 3.9 노드 구성별 server.properties 예시: KRaft 모드 (0) | 2025.03.31 |
10. Kafka 3.9 KRaft 모드 설치 (JDK 17 + 단일 노드 구성) (0) | 2025.03.31 |