1.시스템&인프라/Apache Kafka

16. Kafka AdminClient API 예제: Topic,Cluster,ConsumerGroups

쿼드큐브 2025. 4. 3. 17:12
728x90
반응형

Admin API는 Kafka 클러스터의 리소스(토픽, 브로커, ACL, 컨슈머 그룹 등)를 프로그래밍 방식으로 관리할 수 있도록 제공되는 API입니다. org.apache.kafka.clients.admin.AdminClient 클래스를 통해 사용하며, Kafka 클라이언트 라이브러리 안에 포함되어 있습니다.

 

Kafka AdminClient API 예제 코드: Java 

 

목차

1. Kafka AdminClient 개요

2. 토픽 관리

3. 클러스터 정보 조회

4. 컨슈머 그룹 관리

5. 컨슈머 그룹 Offset 관리

6. 토픽 파티션 재 할당

관련 글 링크

 

 

1. Kafka AdminClient 개요

AdminClient는 Kafka 클러스터의 메타데이터를 조회하고, 토픽이나 컨슈머 그룹을 제어할 수 있도록 도와주는 Kafka 클라이언트 API입니다.
운영자나 개발자가 Kafka 관리 작업을 자동화할 때 자주 사용되며, 대표적으로 다음 기능을 제공합니다:

  • 토픽 생성, 삭제, 설정 변경
  • 클러스터 정보 및 브로커 목록 조회
  • 컨슈머 그룹 목록, 상세 정보, 삭제
  • 파티션 수동 재할당 등

AdminClient 기본 구조 및 설정하기

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;

import java.util.Properties;

public class AdminClientSetupExample {

    public static void main(String[] args) {
        String bootstrapServers = "192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092";

        Properties props = new Properties();
        
        // Kafka 브로커 주소
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
       
        // 클라이언트 ID (Kafka 로그 및 모니터링 시 유용)
        props.put(AdminClientConfig.CLIENT_ID_CONFIG, "my-admin-client");

        // 요청 타임아웃 (각 API 요청이 기다리는 최대 시간, ms 단위)
        props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000"); // 10초

        // 실패 시 재시도 횟수
        props.put(AdminClientConfig.RETRIES_CONFIG, "3");

        // 재시도 간 대기 시간 (ms 단위)
        props.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "2000"); // 2초
              
        // AdminClient 생성
        try (AdminClient admin = AdminClient.create(props)) {
            System.out.println("AdminClient 연결 성공!");
            // 여기서 admin 기능 호출 가능
        } catch (ExecutionException | InterruptedException e) {
            System.out.println(e.getMessage());
        }
    }
}

 

AdminClient 설정에 따른 동작 시나리오

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9092,broker3:9092");
props.put(AdminClientConfig.CLIENT_ID_CONFIG, "admin-client-test");
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000");    // 5초
props.put(AdminClientConfig.RETRIES_CONFIG, "2");                  // 최대 2회 재시도
props.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "1000");      // 1초 간격으로 재시도

1. AdminClient는 broker1에 연결을 시도
  - 요청 시간 제한: 5초
  - 만약 broker1이 다운 상태거나 응답이 지연되면 → 요청 실패

2. 첫 번째 재시도 시작
  - RETRY_BACKOFF_MS_CONFIG에 따라 1초 대기 후 재시도
  - AdminClient는 이번에는 broker2 또는 broker3 중 응답 가능한 브로커로 요청 전송
  - 브로커가 정상적으로 응답하면 성공, 아니면 실패

3.두 번째 재시도 진행
  - 다시 1초 대기 후 나머지 브로커에 요청 전송
  - 여기서도 실패하면 최종적으로 예외 발생
  • AdminClient는 bootstrap.servers 리스트에서 최초 연결 가능한 브로커를 통해 전체 클러스터 메타데이터를 수집합니다.
  • 연결 실패 시, 자동으로 다른 브로커에 fallback되며, 이 때 request.timeout.ms, retries, retry.backoff.ms 설정이 중요한 역할을 합니다.
  • AdminClient 내부는 비동기 처리 기반의 Future 패턴이므로, .get() 호출 시까지 실제 예외는 발생하지 않습니다.

 

2. 토픽관리 예제 코드

◆ 토픽 생성

// topic-basic 생성
// - 파티션 수: 1
// - 복제 계수: 1 (리더 + 팔로워 포함, short 타입으로 지정)
NewTopic topic1 = new NewTopic("topic-basic", 1, (short) 1);

// topic-auto-replica 생성
// - 파티션 수: 3
// - 복제 계수: 명시하지 않음 → Kafka 브로커 설정의 default.replication.factor 사용
NewTopic topic2 = new NewTopic("topic-auto-replica", Optional.of(3), Optional.empty());

// topic-config 생성
// - 파티션 수: 3
// - 복제 계수: 1
// - 추가 설정:
// - 메시지 보관 시간: 1시간 (retention.ms = 3600000ms)
// - 로그 압축 방식: compact (cleanup.policy = compact)
Map<String, String> configs = Map.of("retention.ms", "3600000", // 메시지 보관 시간 설정 (1시간)
        "cleanup.policy", "compact" // 로그 정리 방식 설정 (압축)
);
NewTopic topic3 = new NewTopic("topic-config", 3, (short) 1).configs(configs);

// AdminClient를 통해 토픽 생성 요청
CreateTopicsResult result = admin.createTopics(List.of(topic1, topic2, topic3));

// 모든 토픽 생성 요청 완료까지 대기 (예외 발생 시 전체 실패로 간주됨)
result.all().get();

// result.all().get()에서 오류 발생히 호출 안됨
System.out.println(">> 토픽 생성 결과...");
Map<String, KafkaFuture<Void>> futures = result.values();
for (Map.Entry<String, KafkaFuture<Void>> entry : futures.entrySet()) {
    String topicName = entry.getKey();
    try {
        // 토픽 생성 성공 시
        entry.getValue().get();
        System.out.printf("토픽 생성 성공: %s%n", topicName);
    } catch (ExecutionException e) {
        // 토픽 생성 실패 시 예외 메시지 출력
        System.out.printf("토픽 생성 실패: %s - %s%n", topicName, e.getCause().getMessage());
    }
}

== 출력 예시 ==
...setup...
[TEST] 토픽 생성 테스트...
>> 토픽 생성 결과...
토픽 생성 성공: topic-auto-replica
토픽 생성 성공: topic-config
토픽 생성 성공: topic-basic
...tearDown...
  • 파티션 수, 복제 계수 설정 가능
  • configs()로 토픽 단위 설정도 가능 (retention.ms 등)
  • 주의사항: 존재하는 토픽을 다시 생성하면 TopicExistsException 예외 발생

◆ 토픽 목록 조회

ListTopicsResult listTopicsResult = admin.listTopics();
// 2. Future에서 토픽 이름 Set 추출
Set<String> topicNames = listTopicsResult.names().get();

topicNames.forEach(t -> System.out.println("토픽: " + t));

== 출력 예시 ==
...setup...
[TEST] 토픽 목록 조회 테스트...
토픽: topic-auto-replica
토픽: topic-config
토픽: topic-basic
...tearDown...
  • .listTopics(new ListTopicsOptions().listInternal(true)): internal topic 포함 여부 설정

◆ 토픽 상세 정보

List<String> topicNames = List.of("topic-basic", "topic-auto-replica", "topic-config");

// 토픽 상세 정보 조회 요청
DescribeTopicsResult result = admin.describeTopics(topicNames);
// 토픽별 Future 결과 맵 반환 (String → KafkaFuture<TopicDescription>)
Map<String, KafkaFuture<TopicDescription>> topicMap = result.topicNameValues();

// 결과 출력
for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : topicMap.entrySet()) {
    String topicName = entry.getKey();
    try {
        TopicDescription desc = entry.getValue().get(); // 결과 받기
        System.out.printf("토픽명: %s, 파티션 수: %d, 복제본 수: %d%n", 
          topicName, 
          desc.partitions().size(),
          desc.partitions().get(0).replicas().size() // 복제본 수 확인 (첫 파티션 기준)
        );
    } catch (Exception e) {
        System.out.printf("토픽 정보 조회 실패: %s - %s%n", topicName, e.getMessage());
    }
}

=== 출력 예시 ===
...setup...
[TEST] 토픽 정보 조회 테스트...
토픽명: topic-auto-replica, 파티션 수: 3, 복제본 수: 1
토픽명: topic-basic, 파티션 수: 1, 복제본 수: 1
토픽명: topic-config, 파티션 수: 3, 복제본 수: 1
...tearDown...

 

◆ 토픽 설정 조회

String topicName = "topic-basic";
String configKey = null;
		
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
Config originalConfig = admin.describeConfigs(Collections.singleton(resource)).all().get().get(resource);
for (ConfigEntry entry : originalConfig.entries()) {
	if (configKey == null) {
		System.out.printf("- %s = %s%n", entry.name(), entry.value());
	} else if (entry.name().equals(configKey)) {
		System.out.printf("- %s = %s%n", entry.name(), entry.value());
	}
}

=== 출력 예시 ===
...setup...
[TEST] 토픽 설정 조회 테스트...
- compression.type = producer
- remote.log.delete.on.disable = false
- leader.replication.throttled.replicas = 
이하 생략
...tearDown...

 

◆ 토픽 설정 변경

String topicName = "topic-basic";
String configKey = "retention.ms";
String configValue = "500000";
		
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
// 2. 설정 변경 요청
// SET 값을 새로 설정 또는 변경
// DELETE 해당 설정 키를 제거
// APPEND 값에 항목 추가 (쉼표 구분 설정에 유용)
// SUBTRACT 값에서 항목 제거
AlterConfigOp op = new AlterConfigOp(new ConfigEntry(configKey, configValue), // 5분
		AlterConfigOp.OpType.SET // SET, DELETE, APPEND, SUBTRACT 중 하나
);

Map<ConfigResource, Collection<AlterConfigOp>> updates = Map.of(resource, List.of(op));

AlterConfigsResult result = admin.incrementalAlterConfigs(updates);
result.all().get(); // (예외 발생 시 전체 실패로 간주됨)
  • SET: 값 설정 또는 변경
  • DELETE: 설정 제거
  • APPEND: 쉼표 기반 설정에 값 추가
  • SUBTRACT: 설정에서 값 제거

위 코드 실행 후 "토픽 설정 조회" 로 설정을 조회하면 "- retention.ms = 500000" 으로 설정값이 변경된 것을 확인 할 수 있습니다.

 

◆ 토픽 삭제

List<String> topicNames = List.of("topic-basic");

DeleteTopicsResult result = admin.deleteTopics(topicNames);
result.all().get(); // 개별 예외 발생 시 전체 실패로 간주됨
  • 토픽 삭제후 토픽 목록 조회 결과
토픽: topic-auto-replica
토픽: topic-config

 

 

3. 클러스터 정보 조회

DescribeClusterResult cluster = admin.describeCluster();

System.out.println("클러스터 ID: " + cluster.clusterId().get());
System.out.println("컨트롤러: " + cluster.controller().get());
System.out.println("브로커 목록:");
// cluster.nodes().get().forEach(System.out::println);
Collection<Node> nodes = cluster.nodes().get();
for (Node node : nodes) {
	System.out.println(node);
}

== 출력 예시 ==
...setup...
[TEST] 클러스터 정보 조회 테스트...
클러스터 ID: NOgQ3G41T96rqFL6y4FZJA
컨트롤러: 192.168.56.102:9092 (id: 2 rack: null)
브로커 목록:
192.168.56.101:9092 (id: 1 rack: null)
192.168.56.102:9092 (id: 2 rack: null)
192.168.56.103:9092 (id: 3 rack: null)
...tearDown...

 

 

4. 컨슈머 그룹 관리

◆ 컨슈머 그룹 목록 조회

Collection<ConsumerGroupListing> groups = admin.listConsumerGroups().all().get();

for (ConsumerGroupListing group : groups) {
	System.out.println("그룹 ID: " + group.groupId());
}

== 출력 예시 ==
...setup...
[TEST] 컨슈머 그룹 목록 조회 테스트...
그룹 ID: my-consumer-group
...tearDown...

 

◆ 컨슈머 그룹 상세 조회

List<String> groupIds = List.of("my-consumer-group");

admin.describeConsumerGroups(groupIds).all().get().forEach((groupId, desc) -> {
	System.out.printf("▶ 그룹 ID: %s, 상태: %s, 멤버 수: %d%n", 
        groupId, desc.state(), desc.members().size());

	desc.members().forEach(member -> {
		System.out.printf("  - 멤버: %s, 호스트: %s%n", member.consumerId(), member.host());
		System.out.printf("    파티션 할당: %s%n", member.assignment().topicPartitions());
	});
});

== 결과 예시 ==
...setup...
[TEST] 컨슈머 그룹 정보 조회 테스트...
▶ 그룹 ID: my-consumer-group, 상태: Stable, 멤버 수: 1
  - 멤버: console-consumer-06156951-395a-47fa-a18a-f13147f23046, 호스트: /192.168.56.101
    파티션 할당: [topic-basic-2, topic-basic-1, topic-basic-0]
...tearDown...

 

◆ 컨슈머 그룹 삭제

List<String> groupIds = List.of("my-consumer-group");

// admin.deleteConsumerGroups(groupIdsToDelete).all().get();
// 삭제 요청 결과를 result 변수에 저장
DeleteConsumerGroupsResult result = admin.deleteConsumerGroups(groupIds);
// 삭제 완료 대기
result.all().get();

== 출력 예시 ==
...setup...
[TEST] 컨슈머 그룹 삭제 테스트...
org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
...tearDown...
  • GroupNotEmptyException은 다음 두 가지 상황 중 하나일 수 있습니다:
원인 설명
활성 컨슈머가 있음 컨슈머 그룹에 소속된 클라이언트가 현재 Kafka에 연결 중
오프셋 데이터가 존재 컨슈머 그룹의 할당된 오프셋 정보가 여전히 브로커에 존재 (비활성 상태일 수 있음)
  • 비활성 상태지만 오프셋 정보가 남아있다면, Kafka는 그룹 삭제를 거부합니다
  • Kafka에서 해당 그룹의 오프셋을 먼저 삭제한 후 그룹을 삭제해야 합니다.
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --delete \
  --group my-group

 

 

5. 컨슈머 그룹 Offset 관리

 

◆ 컨슈머 Offset 조회

컨슈머 그룹이 어떤 토픽의 어떤 파티션을 소비하고 있고, 각각 어디까지 읽었는지(offset) 정보를 확인하는 코드입니다.

String groupIds = "my-consumer-group";

// 1. 오프셋 정보 요청 → Future로 비동기 반환됨
ListConsumerGroupOffsetsResult offsetResult = admin.listConsumerGroupOffsets(groupIds);

// 2. Future에서 오프셋 결과 Map<TopicPartition, OffsetAndMetadata> 추출
Map<TopicPartition, OffsetAndMetadata> offsets 
   = offsetResult.partitionsToOffsetAndMetadata().get(); 

// 3. 결과 출력
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
	TopicPartition tp = entry.getKey(); // 토픽명 + 파티션 번호
	OffsetAndMetadata metadata = entry.getValue(); // 오프셋 정보
	System.out.printf("▶ 토픽: %s, 파티션: %d, offset: %d%n", 
    tp.topic(), tp.partition(), metadata.offset());
}

== 결과 예시 ==
...setup...
[TEST] 컨슈머 그룹 Offset 조회 테스트...
▶ 토픽: topic-basic, 파티션: 2, offset: 0
▶ 토픽: topic-basic, 파티션: 1, offset: 0
▶ 토픽: topic-basic, 파티션: 0, offset: 7
...tearDown...

 

◆ 컨슈머 Offset 설정

  • 활용 예시: 특정 그룹의 오프셋을 0부터 다시 읽게 하고 싶을 때 사용(삭제는 불가능)
  • 컨슈머가 실행 중인 상태에서 오프셋을 변경하면, 변경 사항이 반영되지 않을 수 있습니다.
    → 보통 컨슈머를 종료한 뒤 오프셋을 변경하고 다시 실행합니다.
String consumerGroupId = "my-consumer-group";
// 1. 오프셋을 변경할 대상 토픽과 파티션 지정
TopicPartition targetPartition = new TopicPartition("topic-basic", 0);

// 2. 새 오프셋 값 설정 (여기서는 0부터 다시 읽도록 설정)
OffsetAndMetadata newOffset = new OffsetAndMetadata(0L); // 0: 처음부터

// 3. 오프셋 변경 요청용 Map 구성
Map<TopicPartition, OffsetAndMetadata> newOffsets = Map.of(targetPartition, newOffset);

// 4. 오프셋 변경 요청 전송
admin.alterConsumerGroupOffsets(consumerGroupId, newOffsets).all().get();

System.out.println("오프셋 변경 완료: group = " + consumerGroupId);

== 결과 예시 ==
...setup...
[TEST] 컨슈머 그룹 Offset 설정 테스트...
오프셋 변경 완료: group = my-consumer-group
...tearDown...

 

 

6. 토픽 파티션 재 할당

// 1. 대상 토픽과 파티션 설정
// "topic-basic"의 0번 파티션을 대상으로 재할당 진행
TopicPartition targetPartition = new TopicPartition("topic-basic", 0);

// 2. 새롭게 할당할 replica 브로커 목록 정의
// 이 예제에서는 브로커 ID 1, 2, 3을 새 replica로 지정
List<Integer> newReplicas = List.of(1, 2, 3);

// 3. 재할당 요청을 위한 Map 구성
// Map<파티션, 재할당 정보> 형태로 구성
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignment = Map.of(targetPartition,
		Optional.of(new NewPartitionReassignment(newReplicas)));

// 4. AdminClient를 통해 재할당 요청 전송
admin.alterPartitionReassignments(reassignment).all().get();

System.out.println("파티션 replica 재할당 요청 완료!");

== 실행 예시 ==
...setup...
[TEST] 토픽 파티션 수동 재 할당  테스트...
파티션 replica 재할당 요청 완료!
...tearDown...

 

재할당 시나리오별 예시

◆ 복제본을 3개로 늘리기 :  replication factor 1 → 3

// 기존: [1]
// 재할당: [1, 2, 3] → replication factor 3

 

  브로커 2가 다운되어, 해당 브로커가 포함된 partition을 재할당하는 경우

기존: [1, 2, 3]
변경: [1, 3, 4]  → 브로커 2 제거, 브로커 4 추가

 

  브로커 추가 후 로드 밸런싱

  • 브로커 4, 5를 새로 추가했지만 파티션이 아직 브로커 1~3에 집중됨
  • 일부 파티션을 브로커 4, 5로 이동시켜 분산 처리 성능 향상
이전: 파티션 0 → [1,2,3]  
변경: 파티션 0 → [2,4,5]

 


AdminClient의 전체 구조부터 토픽 관리, 컨슈머 그룹 관리, 파티션 재할당까지 다양한 기능을 확인하기 위한 예제 코드를 정리해 보았습니다.

전체 소스 코스는 아래에서 확인 할 수 있습니다.

examples/apache-kafka-example at master · cericube/examples

 

examples/apache-kafka-example at master · cericube/examples

Contribute to cericube/examples development by creating an account on GitHub.

github.com

 

 

관련 글 링크

 

728x90
반응형