1.시스템&인프라/Apache Kafka

20.Kafka Connect 이해 및 서버 설정하기 : REST API 명령어

쿼드큐브 2025. 4. 8. 15:47
728x90

Kafka Connect는 Kafka와 외부 시스템 간 실시간 데이터 연동을 손쉽게 구현할 수 있는 강력한 도구입니다. Kafka Connect의 개념, 아키텍처 구성, 그리고 서버 측 설정 방법을 단계별로 자세히 소개합니다

 

Kafka Connect 이해 및 서버 설정하기 : REST API 명령어

 

목차

1. Kafka Connect 개념

2. Kafka 아키텍처

3. Standalone vs Distributed

4. Kafka Connect 설치 및 실행

5. Kafka 서버 설정 하기

6. REST API를 이용한 Connector 관리

관련 글 링크

 

 

1. Kafka Connect 개념

Kafka Connect는 Apache Kafka®의 구성 요소 중 하나로, Kafka와 데이터베이스, 클라우드 서비스, 검색 인덱스, 파일 시스템, 키-값 저장소 등 다양한 시스템 간의 스트리밍 통합을 수행하는 데 사용됩니다.

Kafka Connect는 다양한 소스(Source)에서 Kafka로 데이터를 스트리밍하거나, Kafka에서 다양한 타겟(Sink)으로 데이터를 스트리밍하는 작업을 쉽게 만들어줍니다.

 

특히 많이 사용되는 커넥터는 다음과 같습니다:

  • 관계형 데이터베이스(RDBMS): Oracle, SQL Server, Db2, Postgres, MySQL
  • 클라우드 객체 저장소: Amazon S3, Azure Blob Storage, Google Cloud Storage
  • 메시지 큐: ActiveMQ, IBM MQ, RabbitMQ
  • NoSQL 및 문서 기반 저장소: Elasticsearch, MongoDB, Cassandra
  • 클라우드 데이터 웨어하우스: Snowflake, Google BigQuery, Amazon Redshift

apache kafka 개념
출처: https://developer.confluent.io/courses/kafka-connect/intro/

 

Kafka Connect의 가장 큰 장점은 프로그래밍 없이 사용할 수 있다는 점입니다.

완전히 설정 기반(configuration-based)으로 동작하기 때문에, 개발자뿐 아니라 다양한 사용자들이 손쉽게 사용할 수 있습니다.

또한, 데이터의 수집(Ingest) 및 전송(Egress)뿐 아니라, Kafka Connect는 데이터가 흐르는 중간에 가벼운 변환 작업(Transformation)도 수행할 수 있습니다.

 

2. Kafka 아키텍처

Kafka Connect는 크게 다음과 같은 구성 요소로 동작합니다:

apache kafka connect architecture
출처: https://www.instaclustr.com/blog/apache-kafka-connect-architecture-overview/

 

◆ Connectors

  • 외부 시스템과 Kafka 간 데이터 이동을 정의하는 논리적 단위
  • Source Connector: 외부 시스템(예: PostgreSQL, MySQL, MongoDB 등)에서 Kafka로 데이터를 수집
  • Sink Connector: Kafka에서 데이터를 받아 외부 시스템(예: Elasticsearch, S3, HDFS 등)으로 전송
  • 데이터는 직접 처리하지 않고, 실제 작업은 Task가 수행 합니다.

◆  Tasks

  • Source Task: 외부 시스템에서 데이터를 pull()하여 Kafka로 보냅니다.
  • Sink Task: Kafka에서 데이터를 읽고 put()을 통해 외부 시스템으로 전송합니다.
  • 각 Task는 별도의 스레드로 실행되며, Sink Task는 동일한 consumer group으로 묶입니다.
  • Task 수는 Connector 설정에서 지정할 수 있으며, 시스템 자원과 처리 성능을 고려해 조정합니다.

◆  Workers

  • 실제로 Connectors와 Tasks를 실행하는 프로세스입니다.
  • REST API를 통해 커넥터 정의 및 설정을 받고 실행합니다.
  • java 애플리케이션이며, connect-distributed.sh 또는 connect-standalone.sh로 실행됨
  • 분산 모드에서는 하나의 Worker가 죽으면 나머지 Worker가 작업을 이어받습니다.

◆  Connect Cluster

  • 여러 개의 Worker 인스턴스가 함께 동작하는 분산 환경 (Distributed Mode)
  • Worker 간에 작업을 자동 분산 (분산 로드 밸런싱)
  • Worker 중 하나가 다운되면 나머지 Worker가 Task를 자동 재할당 → 내결함성 제공
  • Kafka의 내부 토픽(connect-configs, connect-offsets, connect-status)을 통해 작업 상태와 설정을 공유
Kafka Connect Cluster는 단순히 "Kafka 클러스터"와는 다릅니다.
Kafka Connect 클러스터는 Connect Worker 인스턴스들의 집합이며,
Kafka 클러스터는 데이터를 저장하는 서버 집합 입니다.

 

 

3. Standalone vs Distributed

모드 설명 사용 예시
Standalone 단일 프로세스에서 실행되며 테스트/개발용 로컬 테스트, POC
Distributed  여러 노드에 Worker 분산 실행 운영 환경, 고가용성 필요 시

운영 환경에서는 반드시 Distributed Mode 사용을 권장합니다.
이는 작업의 부하 분산과 장애 복구 측면에서 매우 중요합니다.

 

 

4. Kafka Connect 설치 및 실행

afka Connect는 Apache Kafka 배포판에 기본 포함되어 있습니다.

Kafka Connect는 Kafka에 의존적인 서비스이므로 반드시 Kafka 클러스터가 먼저 실행 중이어야 합니다.

Kafka Connect는 다양한 Source/Sink 플러그인을 사용할 수 있습니다.

  • 커넥터는 .jar 형태로 제공됨
  • 기본 경로는 libs/ 또는 plugin.path 설정 경로
  • 커넥터 다운로드는 Confluent Hub 이용 가능
kafka/
├── libs/
│   ├── kafka-connect-jdbc-10.7.4.jar
│   ├── postgresql-42.6.0.jar

 

운영 환경에서는 반드시 Distributed Mode를 사용합니다.

 

1. Kafka Connect 실행

# Kafka 설치 디렉토리로 이동 후 실행
cd /home/kafka/kafka_2.13-3.9.0

# Distributed Mode 실행
bin/connect-distributed.sh config/connect-distributed.properties

 

이 명령어는 다음과 같은 작업을 수행합니다.

단계  내용
1 connect-distributed.sh 스크립트 실행
2 설정 파일(connect-distributed.properties) 로드
3 Kafka와 통신하며 Connector 및 Task 실행 환경 구성
4 REST API 서버 실행 (localhost:8083 기본)

 

REST API 서버는 실행 후 자동으로 활성화되며, HTTP 요청을 통해 Connector 관리가 가능합니다.

 

2. Kafka Connect가 정상적으로 실행되면 다음 메시지가 콘솔에 출력됩니다:

INFO Worker started (org.apache.kafka.connect.runtime.Connect)
INFO Herder started (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO REST server listening at http://localhost:8083
 
3. 이제 http://localhost:8083 으로 접속하거나, curl 명령으로 상태를 확인할 수 있습니다:
 
curl http://localhost:8083/connectors
 

 

5. Kafka 서버 설정 하기

Kafka Connect의 핵심 설정 파일은 connect-distributed.properties입니다. 주요 설정 항목은 아래와 같습니다:

항목 역할 예시
bootstrap.servers Kafka 브로커 주소 localhost:9092
group.id Connect 클러스터 ID connect-cluster
key/value.converter Kafka 메시지 포맷 JsonConverter, AvroConverter 등
offset.storage.topic 오프셋 저장 토픽 connect-offsets
config.storage.topic 커넥터 설정 저장 토픽 connect-configs
status.storage.topic 커넥터 상태 저장 토픽 connect-status
plugin.path 커넥터 플러그인 디렉터리 /opt/kafka/plugins
listeners REST API 서버 포트 HTTP://:8083

 

설정 예

# ----------------------------------------
# Kafka Connect Distributed Mode 설정 파일
# ----------------------------------------

#############################
# Kafka 클러스터 연결 설정
#############################
# Kafka 브로커 목록 (쉼표로 구분)
bootstrap.servers=192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092

#############################
# Connect 클러스터 그룹 ID
#############################
# Connect 클러스터를 식별하기 위한 고유 ID (중복 불가)
group.id=connect-cluster

###################################
# Kafka 메시지 포맷 변환 설정
###################################
# Kafka 메시지의 key/value 변환 포맷
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# JSON 메시지에 스키마 포함 여부 (Sink에서 schema 필요 시 true)
key.converter.schemas.enable=true
value.converter.schemas.enable=true

###################################
# 오프셋, 설정, 상태 저장용 토픽
###################################
# Source 커넥터 오프셋 저장용 토픽
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25

# 커넥터 구성 저장용 토픽
config.storage.topic=connect-configs
config.storage.replication.factor=3

# 커넥터 및 태스크 상태 저장용 토픽
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=10

###################################
# 오프셋 Flush 주기 (밀리초)
###################################
offset.flush.interval.ms=10000

###################################
# 플러그인 경로 설정
###################################
# 커넥터 JAR 또는 디렉토리가 위치한 경로 (쉼표로 구분)
plugin.path=/home/kafka/connectors/debezium,/home/kafka/connectors/jdbc

###################################
# REST API 서버 설정
###################################
# REST API 바인딩 주소 및 포트
listeners=HTTP://0.0.0.0:8083

# 다른 Connect 워커 노드가 접근할 수 있는 외부 IP/포트
rest.advertised.host.name=192.168.56.111
rest.advertised.port=8083

 

 

6. REST API를 이용 Connector 관리

Kafka Connect의 REST API는 커넥터를 관리할 수 있는 핵심 인터페이스입니다.
Kafka Connect는 실행되면 자동으로 REST API 서버를 8083 포트 (기본값) 에서 실행하며, 이를 통해 다음과 같은 작업을 수행할 수 있습니다:

  • 커넥터 등록 (POST)
  • 커넥터 상태 조회 (GET)
  • 커넥터 설정 조회 및 변경 (GET / PUT)
  • 커넥터 삭제 (DELETE)
  • 커넥터 재시작 / 일시정지 / 재개 (POST)
  • 전체 커넥터 목록 조회

주요 REST API 명령어

작업 명령어 예시
커넥터 전체 목록 GET /connectors
커넥터 등록 POST /connectors
커넥터 설정 조회 GET /connectors/{name}/config
커넥터 설정 수정 PUT /connectors/{name}/config
커넥터 상태 확인 GET /connectors/{name}/status
커넥터 삭제 DELETE /connectors/{name}
커넥터 재시작 POST /connectors/{name}/restart
커넥터 일시정지/재개 PUT /pause, PUT /resume
Task 상태 확인 GET /connectors/{name}/tasks/{id}/status

 

1. 전체 커넥터 목록 조회

현재 Kafka Connect 클러스터에 등록된 모든 커넥터 이름 리스트를 반환합니다.

예시: 
curl http://localhost:8083/connectors

응답:
["my-postgres-source", "my-elasticsearch-sink"]

 

2. 커넥터 등록 (생성)

새로운 Source 또는 Sink Connector를 등록합니다.

요청 본문에는 name과 config 필드가 포함되어야 합니다.

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "my-jdbc-source",
    "config": {
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "connection.url": "jdbc:mysql://db:3306/mydb",
      ...
    }
  }'

 

3. 커넥터 설정 조회

등록된 커넥터의 현재 설정을 확인합니다.

예시:
curl http://localhost:8083/connectors/my-jdbc-source/config

응답:
{
  "connector.class": "...",
  "connection.url": "...",
  ...
}

 

4. 커넥터 설정 수정

기존 커넥터의 설정을 변경합니다.

모든 설정을 한 번에 보내야 하며, 일부만 보내면 누락된 설정은 제거됩니다.

curl -X PUT http://localhost:8083/connectors/my-jdbc-source/config \
  -H "Content-Type: application/json" \
  -d '{
    "connector.class": "...",
    "connection.url": "...",
    "mode": "timestamp",
    ...
  }'

 

5. 커넥터 상태 확인

커넥터 및 각 Task의 현재 실행 상태를 조회합니다.

예시:
curl http://localhost:8083/connectors/my-jdbc-source/status

응답:
{
  "name": "my-jdbc-source",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.56.111:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "192.168.56.111:8083"
    }
  ],
  "type": "source"
}

 

6. 커넥터 삭제

해당 이름을 가진 커넥터를 제거하고 Task도 함께 중지합니다.

curl -X DELETE http://localhost:8083/connectors/my-jdbc-source

 

7. 특정 Task 상태 확인

curl http://localhost:8083/connectors/my-jdbc-source/tasks/0/status

 

8. 커넥터 재시작 / 일시정지 / 재개

기능 메소드 URL
커넥터 재시작 POST /connectors/{name}/restart
일시정지 PUT /connectors/{name}/pause
재개 PUT /connectors/{name}/resume
특정 Task 재시작 POST /connectors/{name}/tasks/{task-id}/restart

 


Kafka Connect의 개념부터 아키텍처, 설치 및 실행, 서버 설정, REST API 활용법까지 전반적인 내용을 살펴보았습니다.
Kafka Connect는 설정 기반으로 손쉽게 운영 가능한 구조를 제공하며, 고가용성과 확장성까지 갖춘 매우 강력한 데이터 통합 플랫폼입니다.

 

 

 

관련 글 링크

https://www.instaclustr.com/blog/apache-kafka-connect-architecture-overview/

 

Apache Kafka® Connect Architecture Overview

An overview of main Kafka Connect components & their relationships. Source & Sink Connectors; Connectors, Plugins, Tasks & Workers; Clusters; & Converters.

www.instaclustr.com

https://developer.confluent.io/courses/kafka-connect/intro

 

Kafka Connect Tutorial: How Connectors, Sinks & Sources Work

Kafka Connect is a component of Apache Kafka® that’s used to perform streaming integration between Kafka and other systems such as databases, cloud services, and more.

developer.confluent.io

 

728x90