Admin API는 Kafka 클러스터의 리소스(토픽, 브로커, ACL, 컨슈머 그룹 등)를 프로그래밍 방식으로 관리할 수 있도록 제공되는 API입니다. org.apache.kafka.clients.admin.AdminClient 클래스를 통해 사용하며, Kafka 클라이언트 라이브러리 안에 포함되어 있습니다.
Kafka AdminClient API 예제 코드: Java
목차
1. Kafka AdminClient 개요
AdminClient는 Kafka 클러스터의 메타데이터를 조회하고, 토픽이나 컨슈머 그룹을 제어할 수 있도록 도와주는 Kafka 클라이언트 API입니다.
운영자나 개발자가 Kafka 관리 작업을 자동화할 때 자주 사용되며, 대표적으로 다음 기능을 제공합니다:
- 토픽 생성, 삭제, 설정 변경
- 클러스터 정보 및 브로커 목록 조회
- 컨슈머 그룹 목록, 상세 정보, 삭제
- 파티션 수동 재할당 등
AdminClient 기본 구조 및 설정하기
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import java.util.Properties;
public class AdminClientSetupExample {
public static void main(String[] args) {
String bootstrapServers = "192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092";
Properties props = new Properties();
// Kafka 브로커 주소
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 클라이언트 ID (Kafka 로그 및 모니터링 시 유용)
props.put(AdminClientConfig.CLIENT_ID_CONFIG, "my-admin-client");
// 요청 타임아웃 (각 API 요청이 기다리는 최대 시간, ms 단위)
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000"); // 10초
// 실패 시 재시도 횟수
props.put(AdminClientConfig.RETRIES_CONFIG, "3");
// 재시도 간 대기 시간 (ms 단위)
props.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "2000"); // 2초
// AdminClient 생성
try (AdminClient admin = AdminClient.create(props)) {
System.out.println("AdminClient 연결 성공!");
// 여기서 admin 기능 호출 가능
} catch (ExecutionException | InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
AdminClient 설정에 따른 동작 시나리오
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092,broker3:9092");
props.put(AdminClientConfig.CLIENT_ID_CONFIG, "admin-client-test");
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); // 5초
props.put(AdminClientConfig.RETRIES_CONFIG, "2"); // 최대 2회 재시도
props.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1000"); // 1초 간격으로 재시도
1. AdminClient는 broker1에 연결을 시도
- 요청 시간 제한: 5초
- 만약 broker1이 다운 상태거나 응답이 지연되면 → 요청 실패
2. 첫 번째 재시도 시작
- RETRY_BACKOFF_MS_CONFIG에 따라 1초 대기 후 재시도
- AdminClient는 이번에는 broker2 또는 broker3 중 응답 가능한 브로커로 요청 전송
- 브로커가 정상적으로 응답하면 성공, 아니면 실패
3.두 번째 재시도 진행
- 다시 1초 대기 후 나머지 브로커에 요청 전송
- 여기서도 실패하면 최종적으로 예외 발생
- AdminClient는 bootstrap.servers 리스트에서 최초 연결 가능한 브로커를 통해 전체 클러스터 메타데이터를 수집합니다.
- 연결 실패 시, 자동으로 다른 브로커에 fallback되며, 이 때 request.timeout.ms, retries, retry.backoff.ms 설정이 중요한 역할을 합니다.
- AdminClient 내부는 비동기 처리 기반의 Future 패턴이므로, .get() 호출 시까지 실제 예외는 발생하지 않습니다.
2. 토픽관리 예제 코드
◆ 토픽 생성
// topic-basic 생성
// - 파티션 수: 1
// - 복제 계수: 1 (리더 + 팔로워 포함, short 타입으로 지정)
NewTopic topic1 = new NewTopic("topic-basic", 1, (short) 1);
// topic-auto-replica 생성
// - 파티션 수: 3
// - 복제 계수: 명시하지 않음 → Kafka 브로커 설정의 default.replication.factor 사용
NewTopic topic2 = new NewTopic("topic-auto-replica", Optional.of(3), Optional.empty());
// topic-config 생성
// - 파티션 수: 3
// - 복제 계수: 1
// - 추가 설정:
// - 메시지 보관 시간: 1시간 (retention.ms = 3600000ms)
// - 로그 압축 방식: compact (cleanup.policy = compact)
Map<String, String> configs = Map.of("retention.ms", "3600000", // 메시지 보관 시간 설정 (1시간)
"cleanup.policy", "compact" // 로그 정리 방식 설정 (압축)
);
NewTopic topic3 = new NewTopic("topic-config", 3, (short) 1).configs(configs);
// AdminClient를 통해 토픽 생성 요청
CreateTopicsResult result = admin.createTopics(List.of(topic1, topic2, topic3));
// 모든 토픽 생성 요청 완료까지 대기 (예외 발생 시 전체 실패로 간주됨)
result.all().get();
// result.all().get()에서 오류 발생히 호출 안됨
System.out.println(">> 토픽 생성 결과...");
Map<String, KafkaFuture<Void>> futures = result.values();
for (Map.Entry<String, KafkaFuture<Void>> entry : futures.entrySet()) {
String topicName = entry.getKey();
try {
// 토픽 생성 성공 시
entry.getValue().get();
System.out.printf("토픽 생성 성공: %s%n", topicName);
} catch (ExecutionException e) {
// 토픽 생성 실패 시 예외 메시지 출력
System.out.printf("토픽 생성 실패: %s - %s%n", topicName, e.getCause().getMessage());
}
}
== 출력 예시 ==
...setup...
[TEST] 토픽 생성 테스트...
>> 토픽 생성 결과...
토픽 생성 성공: topic-auto-replica
토픽 생성 성공: topic-config
토픽 생성 성공: topic-basic
...tearDown...
- 파티션 수, 복제 계수 설정 가능
- configs()로 토픽 단위 설정도 가능 (retention.ms 등)
- 주의사항: 존재하는 토픽을 다시 생성하면 TopicExistsException 예외 발생
◆ 토픽 목록 조회
ListTopicsResult listTopicsResult = admin.listTopics();
// 2. Future에서 토픽 이름 Set 추출
Set<String> topicNames = listTopicsResult.names().get();
topicNames.forEach(t -> System.out.println("토픽: " + t));
== 출력 예시 ==
...setup...
[TEST] 토픽 목록 조회 테스트...
토픽: topic-auto-replica
토픽: topic-config
토픽: topic-basic
...tearDown...
- .listTopics(new ListTopicsOptions().listInternal(true)): internal topic 포함 여부 설정
◆ 토픽 상세 정보
List<String> topicNames = List.of("topic-basic", "topic-auto-replica", "topic-config");
// 토픽 상세 정보 조회 요청
DescribeTopicsResult result = admin.describeTopics(topicNames);
// 토픽별 Future 결과 맵 반환 (String → KafkaFuture<TopicDescription>)
Map<String, KafkaFuture<TopicDescription>> topicMap = result.topicNameValues();
// 결과 출력
for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : topicMap.entrySet()) {
String topicName = entry.getKey();
try {
TopicDescription desc = entry.getValue().get(); // 결과 받기
System.out.printf("토픽명: %s, 파티션 수: %d, 복제본 수: %d%n",
topicName,
desc.partitions().size(),
desc.partitions().get(0).replicas().size() // 복제본 수 확인 (첫 파티션 기준)
);
} catch (Exception e) {
System.out.printf("토픽 정보 조회 실패: %s - %s%n", topicName, e.getMessage());
}
}
=== 출력 예시 ===
...setup...
[TEST] 토픽 정보 조회 테스트...
토픽명: topic-auto-replica, 파티션 수: 3, 복제본 수: 1
토픽명: topic-basic, 파티션 수: 1, 복제본 수: 1
토픽명: topic-config, 파티션 수: 3, 복제본 수: 1
...tearDown...
◆ 토픽 설정 조회
String topicName = "topic-basic";
String configKey = null;
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
Config originalConfig = admin.describeConfigs(Collections.singleton(resource)).all().get().get(resource);
for (ConfigEntry entry : originalConfig.entries()) {
if (configKey == null) {
System.out.printf("- %s = %s%n", entry.name(), entry.value());
} else if (entry.name().equals(configKey)) {
System.out.printf("- %s = %s%n", entry.name(), entry.value());
}
}
=== 출력 예시 ===
...setup...
[TEST] 토픽 설정 조회 테스트...
- compression.type = producer
- remote.log.delete.on.disable = false
- leader.replication.throttled.replicas =
이하 생략
...tearDown...
◆ 토픽 설정 변경
String topicName = "topic-basic";
String configKey = "retention.ms";
String configValue = "500000";
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
// 2. 설정 변경 요청
// SET 값을 새로 설정 또는 변경
// DELETE 해당 설정 키를 제거
// APPEND 값에 항목 추가 (쉼표 구분 설정에 유용)
// SUBTRACT 값에서 항목 제거
AlterConfigOp op = new AlterConfigOp(new ConfigEntry(configKey, configValue), // 5분
AlterConfigOp.OpType.SET // SET, DELETE, APPEND, SUBTRACT 중 하나
);
Map<ConfigResource, Collection<AlterConfigOp>> updates = Map.of(resource, List.of(op));
AlterConfigsResult result = admin.incrementalAlterConfigs(updates);
result.all().get(); // (예외 발생 시 전체 실패로 간주됨)
- SET: 값 설정 또는 변경
- DELETE: 설정 제거
- APPEND: 쉼표 기반 설정에 값 추가
- SUBTRACT: 설정에서 값 제거
위 코드 실행 후 "토픽 설정 조회" 로 설정을 조회하면 "- retention.ms = 500000" 으로 설정값이 변경된 것을 확인 할 수 있습니다.
◆ 토픽 삭제
List<String> topicNames = List.of("topic-basic");
DeleteTopicsResult result = admin.deleteTopics(topicNames);
result.all().get(); // 개별 예외 발생 시 전체 실패로 간주됨
- 토픽 삭제후 토픽 목록 조회 결과
토픽: topic-auto-replica
토픽: topic-config
3. 클러스터 정보 조회
DescribeClusterResult cluster = admin.describeCluster();
System.out.println("클러스터 ID: " + cluster.clusterId().get());
System.out.println("컨트롤러: " + cluster.controller().get());
System.out.println("브로커 목록:");
// cluster.nodes().get().forEach(System.out::println);
Collection<Node> nodes = cluster.nodes().get();
for (Node node : nodes) {
System.out.println(node);
}
== 출력 예시 ==
...setup...
[TEST] 클러스터 정보 조회 테스트...
클러스터 ID: NOgQ3G41T96rqFL6y4FZJA
컨트롤러: 192.168.56.102:9092 (id: 2 rack: null)
브로커 목록:
192.168.56.101:9092 (id: 1 rack: null)
192.168.56.102:9092 (id: 2 rack: null)
192.168.56.103:9092 (id: 3 rack: null)
...tearDown...
4. 컨슈머 그룹 관리
◆ 컨슈머 그룹 목록 조회
Collection<ConsumerGroupListing> groups = admin.listConsumerGroups().all().get();
for (ConsumerGroupListing group : groups) {
System.out.println("그룹 ID: " + group.groupId());
}
== 출력 예시 ==
...setup...
[TEST] 컨슈머 그룹 목록 조회 테스트...
그룹 ID: my-consumer-group
...tearDown...
◆ 컨슈머 그룹 상세 조회
List<String> groupIds = List.of("my-consumer-group");
admin.describeConsumerGroups(groupIds).all().get().forEach((groupId, desc) -> {
System.out.printf("▶ 그룹 ID: %s, 상태: %s, 멤버 수: %d%n",
groupId, desc.state(), desc.members().size());
desc.members().forEach(member -> {
System.out.printf(" - 멤버: %s, 호스트: %s%n", member.consumerId(), member.host());
System.out.printf(" 파티션 할당: %s%n", member.assignment().topicPartitions());
});
});
== 결과 예시 ==
...setup...
[TEST] 컨슈머 그룹 정보 조회 테스트...
▶ 그룹 ID: my-consumer-group, 상태: Stable, 멤버 수: 1
- 멤버: console-consumer-06156951-395a-47fa-a18a-f13147f23046, 호스트: /192.168.56.101
파티션 할당: [topic-basic-2, topic-basic-1, topic-basic-0]
...tearDown...
◆ 컨슈머 그룹 삭제
List<String> groupIds = List.of("my-consumer-group");
// admin.deleteConsumerGroups(groupIdsToDelete).all().get();
// 삭제 요청 결과를 result 변수에 저장
DeleteConsumerGroupsResult result = admin.deleteConsumerGroups(groupIds);
// 삭제 완료 대기
result.all().get();
== 출력 예시 ==
...setup...
[TEST] 컨슈머 그룹 삭제 테스트...
org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
...tearDown...
- GroupNotEmptyException은 다음 두 가지 상황 중 하나일 수 있습니다:
원인 | 설명 |
활성 컨슈머가 있음 | 컨슈머 그룹에 소속된 클라이언트가 현재 Kafka에 연결 중 |
오프셋 데이터가 존재 | 컨슈머 그룹의 할당된 오프셋 정보가 여전히 브로커에 존재 (비활성 상태일 수 있음) |
- 비활성 상태지만 오프셋 정보가 남아있다면, Kafka는 그룹 삭제를 거부합니다
- Kafka에서 해당 그룹의 오프셋을 먼저 삭제한 후 그룹을 삭제해야 합니다.
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--delete \
--group my-group
5. 컨슈머 그룹 Offset 관리
◆ 컨슈머 Offset 조회
컨슈머 그룹이 어떤 토픽의 어떤 파티션을 소비하고 있고, 각각 어디까지 읽었는지(offset) 정보를 확인하는 코드입니다.
String groupIds = "my-consumer-group";
// 1. 오프셋 정보 요청 → Future로 비동기 반환됨
ListConsumerGroupOffsetsResult offsetResult = admin.listConsumerGroupOffsets(groupIds);
// 2. Future에서 오프셋 결과 Map<TopicPartition, OffsetAndMetadata> 추출
Map<TopicPartition, OffsetAndMetadata> offsets
= offsetResult.partitionsToOffsetAndMetadata().get();
// 3. 결과 출력
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
TopicPartition tp = entry.getKey(); // 토픽명 + 파티션 번호
OffsetAndMetadata metadata = entry.getValue(); // 오프셋 정보
System.out.printf("▶ 토픽: %s, 파티션: %d, offset: %d%n",
tp.topic(), tp.partition(), metadata.offset());
}
== 결과 예시 ==
...setup...
[TEST] 컨슈머 그룹 Offset 조회 테스트...
▶ 토픽: topic-basic, 파티션: 2, offset: 0
▶ 토픽: topic-basic, 파티션: 1, offset: 0
▶ 토픽: topic-basic, 파티션: 0, offset: 7
...tearDown...
◆ 컨슈머 Offset 설정
- 활용 예시: 특정 그룹의 오프셋을 0부터 다시 읽게 하고 싶을 때 사용(삭제는 불가능)
- 컨슈머가 실행 중인 상태에서 오프셋을 변경하면, 변경 사항이 반영되지 않을 수 있습니다.
→ 보통 컨슈머를 종료한 뒤 오프셋을 변경하고 다시 실행합니다.
String consumerGroupId = "my-consumer-group";
// 1. 오프셋을 변경할 대상 토픽과 파티션 지정
TopicPartition targetPartition = new TopicPartition("topic-basic", 0);
// 2. 새 오프셋 값 설정 (여기서는 0부터 다시 읽도록 설정)
OffsetAndMetadata newOffset = new OffsetAndMetadata(0L); // 0: 처음부터
// 3. 오프셋 변경 요청용 Map 구성
Map<TopicPartition, OffsetAndMetadata> newOffsets = Map.of(targetPartition, newOffset);
// 4. 오프셋 변경 요청 전송
admin.alterConsumerGroupOffsets(consumerGroupId, newOffsets).all().get();
System.out.println("오프셋 변경 완료: group = " + consumerGroupId);
== 결과 예시 ==
...setup...
[TEST] 컨슈머 그룹 Offset 설정 테스트...
오프셋 변경 완료: group = my-consumer-group
...tearDown...
6. 토픽 파티션 재 할당
// 1. 대상 토픽과 파티션 설정
// "topic-basic"의 0번 파티션을 대상으로 재할당 진행
TopicPartition targetPartition = new TopicPartition("topic-basic", 0);
// 2. 새롭게 할당할 replica 브로커 목록 정의
// 이 예제에서는 브로커 ID 1, 2, 3을 새 replica로 지정
List<Integer> newReplicas = List.of(1, 2, 3);
// 3. 재할당 요청을 위한 Map 구성
// Map<파티션, 재할당 정보> 형태로 구성
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignment = Map.of(targetPartition,
Optional.of(new NewPartitionReassignment(newReplicas)));
// 4. AdminClient를 통해 재할당 요청 전송
admin.alterPartitionReassignments(reassignment).all().get();
System.out.println("파티션 replica 재할당 요청 완료!");
== 실행 예시 ==
...setup...
[TEST] 토픽 파티션 수동 재 할당 테스트...
파티션 replica 재할당 요청 완료!
...tearDown...
재할당 시나리오별 예시
◆ 복제본을 3개로 늘리기 : replication factor 1 → 3
// 기존: [1]
// 재할당: [1, 2, 3] → replication factor 3
◆ 브로커 2가 다운되어, 해당 브로커가 포함된 partition을 재할당하는 경우
기존: [1, 2, 3]
변경: [1, 3, 4] → 브로커 2 제거, 브로커 4 추가
◆ 브로커 추가 후 로드 밸런싱
- 브로커 4, 5를 새로 추가했지만 파티션이 아직 브로커 1~3에 집중됨
- 일부 파티션을 브로커 4, 5로 이동시켜 분산 처리 성능 향상
이전: 파티션 0 → [1,2,3]
변경: 파티션 0 → [2,4,5]
AdminClient의 전체 구조부터 토픽 관리, 컨슈머 그룹 관리, 파티션 재할당까지 다양한 기능을 확인하기 위한 예제 코드를 정리해 보았습니다.
전체 소스 코스는 아래에서 확인 할 수 있습니다.
examples/apache-kafka-example at master · cericube/examples
examples/apache-kafka-example at master · cericube/examples
Contribute to cericube/examples development by creating an account on GitHub.
github.com
관련 글 링크
- https://kafka.apache.org/documentation/#api
- JDK + Eclipse 포터블(Portable) 개발환경 구축 방법
- 13.Kafka 명령어 예제 정리: Topic, Producer, Consumer
'1.시스템&인프라 > Apache Kafka' 카테고리의 다른 글
18.Kafka Consumer API 사용법(subscribe vs assign, 수동 커밋) (0) | 2025.04.04 |
---|---|
17.Kafka Producer API 사용법(비동기 전송, Transaction, 예제) (0) | 2025.04.04 |
15. Kafka UI 관리도구 AKHQ 설치 및 설정 하기: 사용자 인증 (0) | 2025.04.01 |
14.Kafka KRaft 명령어 예제 정리: Cluster, Storage, Metadata (0) | 2025.03.31 |
13.Kafka 명령어 예제 정리: Topic, Producer, Consumer (0) | 2025.03.31 |