1.시스템&인프라/Apache Kafka

17.Kafka Producer API 사용법(비동기 전송, Transaction, 예제)

쿼드큐브 2025. 4. 4. 09:27
728x90
반응형

KafkaProducer API는 Kafka 브로커에 메시지를 전송하는 핵심 도구입니다. 비동기/동기 전송, 트랜잭션 처리, 다양한 설정을 지원하며, 고성능 메시징 시스템을 구현할 수 있게 해줍니다. 이 글에서는 KafkaProducer의 구조와 설정 방법, 전송 방식, 트랜잭션 처리까지 단계별로 알아봅니다.

 

Kafka Producer API 사용법(비동기 전송, Transaction, 예제)

 

목차

1. Kafka Producer API 개요

2. KafkaProducer 클래스와 설정 방법

3. ProducerRecord 구조 및 생성 방법

4. 메시지 전송 방식: 비동기 vs 동기

5. Kafka 트랜잭션(Transaction) 처리

관련 글 링크

 

 

1. Kafka Producer API 개요

Kafka에서 Producer는 Kafka에 메시지를 보내는 주체입니다.

데이터를 생성하는 애플리케이션이 Producer 역할을 하며, 보낸 데이터는 Kafka 브로커에 저장되고, 이후 Consumer가 가져갑니다.

주요기능 설명
비동기 전송 메시지를 send()로 전송하면 백그라운드에서 전송됨
콜백 처리 메시지 전송 성공/실패 여부를 비동기로 확인 가능
자동 배치 메시지를 일정량 쌓아서 한 번에 전송 (성능 향상)
직렬화 지원 키와 값을 자동으로 Byte 배열로 변환
압축 가능 gzip, snappy 등 압축 방식 지정 가능
트랜잭션 지원 정확히 한 번 전송(Exactly Once)을 위한 트랜잭션 처리 가능

 

◆ KafkaProducer 클래스

  • Kafka에서 메시지를 보내기 위해 사용되는 핵심 객체입니다.
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
  • 내부적으로 Kafka 서버와 네트워크 연결을 맺고, 메시지를 전송할 준비를 합니다.
  • 스레드 안전(Thread-safe)하므로 한 번 생성해서 여러 스레드에서 공유 사용 가능.

◆ 메시지 전송 단위: ProducerRecord

  • KafkaProducer는 메시지를 ProducerRecord 객체로 포장해서 Kafka에 보냅니다.
  • key가 null이면 Kafka는 라운드로빈 방식으로 파티션을 순회하며 분산 전송합니다.
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record);
  • topic: 메시지를 보낼 대상, 이미 생성된 topic이 없으면 새로운 topic을 생성
  • key: 메시지 키 (같은 키는 같은 파티션에 전송됨,  partition = hash(key) % numPartitions )
  • value: 전송할 실제 메시지

◆ 동작 흐름 요약

1. 설정(Properties) 준비
2. KafkaProducer 생성
3. ProducerRecord 생성
4. send()로 메시지 전송 (비동기)
5. flush() 또는 close()로 전송 마무리

 

◆ 예시

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", "key", "value");
producer.send(record);
producer.close();

 

 

2. KafkaProducer 클래스와 설정 방법

KafkaProducer는 메시지를 Kafka에 전송하는 핵심 객체입니다.

스레드 안전(thread-safe)에 안전한 구조로 한번 생성하면 여러 스레드에서 재사용이 가능합니다.

 

KafkaProducer(Properties properties)

  • 가장 많이 쓰이는 기본 생성자입니다.
  • Properties 또는 Map<String, Object>에 모든 설정을 포함시켜 KafkaProducer를 초기화합니다.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

 

◆ KafkaProducer(Map<String, Object>, Serializer, Serializer)

  • 직렬화 인스턴스를 직접 주입하고 싶은 경우 사용합니다.
KafkaProducer<String, String> producer = new KafkaProducer<>(
    Map.of("bootstrap.servers", "localhost:9092"),
    new StringSerializer(),
    new StringSerializer()
);

 

◆ KafkaProducer  설정 항목

  • 필수 항목
설정 키  설명 예시값 비고
bootstrap.servers Kafka 브로커 주소 목록 localhost:9092 필수
key.serializer 메시지 키 직렬화 클래스 StringSerializer 필수
value.serializer 메시지 값 직렬화 클래스 StringSerializer 필수
  • 전송 신뢰성 설정
설정 키  설명 예시 비고
acks 브로커 확인 수준 (0, 1, all) all 고신뢰 필요 시 all
retries 실패 시 재시도 횟수 3 필수 아님 (권장)
enable.idempotence 중복 방지 설정 true Exactly Once 보장 시 필요
delivery.timeout.ms 전체 전송 타임아웃(ms) 120000 기본값 OK
request.timeout.ms 브로커 응답 대기시간(ms) 30000 기본값 OK
  • 성능 최적화 설정
설정 키 설명 예시 비고
batch.size 배치 전송 단위 크기 (byte) 16384 (16KB) 전송 효율 향상
linger.ms 배치 지연 시간(ms) 5, 10 메시지 더 모아서 보냄
buffer.memory 전체 버퍼 크기(byte) 33554432 (32MB) 메시지 대기 공간
compression.type 압축 방식 (none, gzip, snappy, lz4, zstd) gzip 네트워크 사용량 절감
  • 기타
설정 키 설명 예시 비고
client.id 클라이언트 식별자 (모니터링용) my-producer-01 로그와 모니터링에 표시됨
max.in.flight.requests.per.connection 브로커에 동시에 전송할 수 있는 메시지 수 5 순서 보장 필요 시 1 권장
connections.max.idle.ms 유휴 연결 유지 시간(ms) 540000 커넥션 재사용 최적화
  • 설정 예시
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

//신뢰성
props.put("acks", "all");                    // 안정성 강화
props.put("retries", 3);                     // 실패 시 재시도
props.put("enable.idempotence", "true");     // 중복 메시지 방지

//성능
props.put("batch.size", 16384);              // 배치 크기 설정
props.put("linger.ms", 10);                  // 전송 효율 향상
props.put("compression.type", "gzip");       // 네트워크 사용 최소화

props.put("client.id", "my-producer");       // 클라이언트 식별자

 

 

3. ProducerRecord 구조 및 생성 방법

KafkaProducer가 브로커에게 보낼 메시지를 캡슐화한 클래스입니다.

이 객체는 토픽과 메시지의 핵심 정보를 담고 있습니다.

 

◆  생성자 

// 최소 구성: topic + value
new ProducerRecord<>(String topic, V value);

// topic + key + value
new ProducerRecord<>(String topic, K key, V value);

// topic + partition + key + value
new ProducerRecord<>(String topic, Integer partition, K key, V value);

// topic + partition + timestamp + key + value
new ProducerRecord<>(String topic, Integer partition, Long timestamp, K key, V value);

// topic + partition + timestamp + key + value + headers
new ProducerRecord<>(String topic, Integer partition, Long timestamp, K key, V value, Headers headers);

 

◆  생성자 필드 설명

필드 설명
topic 메시지를 보낼 Kafka 토픽 이름입니다. (필수)
partition 보낼 파티션 번호. 생략 시 Kafka가 파티셔너 로직에 따라 자동 할당
timestamp 레코드 타임스탬프. 생략 시 시스템 시간 또는 log.append.time 사용
key 메시지 키. 같은 키는 같은 파티션으로 매핑되도록 보장됨 (파티셔닝에 영향)
value 실제 메시지 내용. Kafka에 전송될 주요 데이터
headers Kafka Header. Key-Value 형식으로 메타데이터 전달 가능 (선택)

 

◆  직렬화와 ProducerRecord

ProducerRecord의 key와 value는 KafkaProducer 설정의 key.serializer, value.serializer에 의해 직렬화됨

key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

 

◆  사용 예시

// "my-topic"이라는 토픽에 "my-key"라는 키로 
//"Hello Kafka!"라는 내용을 Kafka 브로커에 전송하려고 생성한 것입니다.
ProducerRecord<String, String> record =
    new ProducerRecord<>("my-topic", "my-key", "Hello Kafka!");

//send()는 즉시 리턴되며, 메시지는 나중에 실제로 전송됩니다.
//실패/성공 여부를 확인하고 싶다면 콜백(callback) 을 붙이거나 
//Future.get() 으로 동기 방식으로 처리할 수 있습니다.

producer.send(record);

 

  • 파티션 지정 예:
ProducerRecord<String, String> record =
    new ProducerRecord<>("my-topic", 2, System.currentTimeMillis(), "my-key", "message");

producer.send(record);
항목  의미
토픽 "my-topic" Kafka에 보낼 대상 토픽, 없으면 신규 생성
파티션 2 직접 지정한 파티션 번호
타임스탬프 System.currentTimeMillis() 메시지 생성 시간
"my-key" 메시지 키 (파티셔닝 또는 정렬 기준), null 이면 임의로 지정
"message" 실제로 Kafka에 보내는 내용

 

  • Header 포함 예:
ProducerRecord<String, String> record =
    new ProducerRecord<>("my-topic", null, null, "my-key", "value",
        new RecordHeaders().add("trace-id", "1234".getBytes()));

producer.send(record);

//Header(헤더) 를 사용하는 이유는 
//"메시지 본문(payload)과는 별개로 
//부가적인 메타데이터를 함께 전달"하기 위해서입니다.

 

 

4. 메시지 전송 방식 : 비동기 vs 동기

실무에서는 주로 비동기 + 콜백을 사용하고, 테스트나 중요한 메시지 전송만 동기로 처리합니다.

◆  비동기 전송

  • producer.send()는 비동기 메시지 전송이며, Kafka는 send() 요청을 즉시 큐에 넣고, 백그라운드에서 전송합니다.
// Kafka에 보낼 메시지를 생성함
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key1", "Hello Kafka!");

// Kafka에 "이 메시지 좀 보내줘" 하고 요청만 함
// 실제 전송은 백그라운드 스레드가 알아서 처리함
// 이 줄은 기다리지 않고 즉시 리턴됨
producer.send(record);  // 전송 요청만 하고 즉시 리턴
[App] → producer.send() → [Kafka 내부 전송 큐] → [I/O 스레드] → [Kafka 브로커]

 

◆  비동기 + 콜백 처리

  • 메시지를 보낸 후, 성공했는지 실패했는지를 비동기적으로 나중에 처리하는 구조입니다
  • 콜백(callback): 전송이 나중에 끝났을 때, Kafka가 대신 알려주는 함수를 등록하는 것.
// 비동기 전송 + 콜백 함수 구현
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            // 메시지 전송 중 예외가 발생한 경우
            exception.printStackTrace();
        } else {
            // 메시지 전송이 성공한 경우
            System.out.println("Success: " + metadata.topic() +
                "-" + metadata.partition() +
                "@" + metadata.offset());
        }
    }
});

//metadata.topic()         // 메시지가 저장된 토픽 이름
//metadata.partition()     // 저장된 파티션 번호
//metadata.offset()        // 해당 파티션 내 위치 (오프셋)

 

◆  동기 전송(Future.get)

  • 전송이 끝날 때까지 기다리는 방식입니다.
  • 즉, 메시지가 Kafka 브로커에 성공적으로 저장되거나 오류가 발생할 때까지 코드가 멈춰 있습니다.
// 전송이 완료될 때까지 블로킹
producer.send(record).get();
//주의: .get()은 성능 저하가 있으므로 반드시 필요한 경우만 사용하세요.
  • producer.send(record)는 Future<RecordMetadata>를 반환합니다.
  • 이는 나중에 메시지 전송 결과를 가져올 수 있다는 뜻입니다.
  • send(...).get()은 이 Future 객체를 즉시 블로킹(기다리게) 하여 전송이 끝나고 결과가 리턴될 때까지 기다립니다.

◆  flush()와 close()

// 메시지를 즉시 전송하도록 강제합니다.
producer.flush(); 

// 연결 종료 및 자원 해제 (필수)
producer.close();

 

◆  예제 코드

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        // Kafka 설정 객체 생성
        Properties props = new Properties();
        // Kafka 브로커 주소 설정 (여러 개일 경우 쉼표로 구분)
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 메시지 키 직렬화 클래스 설정 (문자열 기준)
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 메시지 값 직렬화 클래스 설정 (문자열 기준)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 메시지 전송 신뢰성 설정 (all = 모든 복제본이 ack 해야 성공 처리)
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        // try-with-resources 문으로 KafkaProducer 자동 종료 처리
        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            // 전송할 Kafka 토픽 이름
            String topic = "test-topic";
            // 메시지 키 지정 (같은 키는 같은 파티션으로 전송됨)
            String key = "key1";
            // 전송할 메시지 본문
            String value = "Hello Kafka!";

            // ProducerRecord 객체 생성 (토픽, 키, 값 포함)
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

            // 비동기 전송 요청 + 콜백 등록 (성공/실패 결과 처리)
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    // 전송 실패 시 예외 출력
                    if (exception != null) {
                        System.err.println("메시지 전송 실패:");
                        exception.printStackTrace();
                    } else {
                        // 전송 성공 시 토픽, 파티션, 오프셋 정보 출력
                        System.out.printf("메시지 전송 성공: %s-%d@%d%n",
                                metadata.topic(),
                                metadata.partition(),
                                metadata.offset());
                    }
                }
            });

            // 전송 대기 중인 메시지를 강제로 모두 전송(flush)
            producer.flush();

        } catch (Exception e) {
            // KafkaProducer 생성 또는 전송 중 예외 발생 시 처리
            System.err.println("Producer 동작 중 예외 발생:");
            e.printStackTrace();
        }
    }
}

 

 

5. Kafka 트랜잭션(Transaction) 처리

Kafka에서 트랜잭션은 하나 이상의 메시지를 하나의 단위로 묶어 처리하고, 전송 중 오류가 발생하면 모두 롤백하거나, 전송이 성공하면 한꺼번에 커밋하는 방식입니다.

 

◆ 트랜잭션(Transaction) 처리 흐름

initTransactions()         ← 최초 Producer 초기화 시 1회만 호출
   ↓
beginTransaction()         ← 트랜잭션 시작
   ↓
send(record)               ← 여러 메시지 전송
sendOffsetsToTransaction() ← (옵션) consumer offset 함께 커밋
   ↓
commitTransaction()        ← 성공 시 커밋
(또는)
abortTransaction()         ← 실패 시 롤백

 

◆ 트랜잭션(Transaction) 설정 항목 : Properties

props.put("enable.idempotence", "true");               // 중복 방지 (필수)
props.put("transactional.id", "tx-producer-01");       // 고유한 트랜잭션 ID (필수)
props.put("acks", "all");                              // 안정성 강화
props.put("retries", Integer.MAX_VALUE);               // 재시도 무제한
props.put("max.in.flight.requests.per.connection", 1); // 순서 보장
  • transactional.id는 반드시 유일한 값이어야 하며,Producer 인스턴스 재시작 시에도 동일해야 합니다.

◆ 트랜잭션(Transaction) 처리 예제 코드

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-producer-01");

try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {

    // 트랜잭션 준비 (초기 1회 호출)
    producer.initTransactions();

    try {
        // 트랜잭션 시작
        producer.beginTransaction();

        // 메시지 여러 개 전송
        producer.send(new ProducerRecord<>("tx-topic", "key1", "message-1"));
        producer.send(new ProducerRecord<>("tx-topic", "key2", "message-2"));

        // (선택) Consumer offset도 함께 트랜잭션에 포함 가능
        // producer.sendOffsetsToTransaction(offsets, groupMetadata);

        // 트랜잭션 커밋
        producer.commitTransaction();

        System.out.println("트랜잭션 커밋 완료");

    } catch (Exception e) {
        // 예외 발생 시 트랜잭션 롤백
        producer.abortTransaction();
        System.err.println("트랜잭션 롤백: " + e.getMessage());
    }

}

 


KafkaProducer API는 Kafka 클러스터에 데이터를 전송하는 핵심 인터페이스입니다.
비동기 전송, 콜백 처리, 트랜잭션 기능을 통해 고성능 데이터 파이프라인을 구축할 수 있습니다.

 

 

 

관련 글 링크

13.Kafka 명령어 예제 정리: Topic, Producer, Consumer

5. Kafka 리더 장애 발생 시 Failover를 위한 Producer와 Consumer 설정

16. Kafka AdminClient API 예제: Topic,Cluster,ConsumerGroups

 

728x90
반응형