Kafka Connect는 Kafka와 외부 시스템 간 실시간 데이터 연동을 손쉽게 구현할 수 있는 강력한 도구입니다. Kafka Connect의 개념, 아키텍처 구성, 그리고 서버 측 설정 방법을 단계별로 자세히 소개합니다
Kafka Connect 이해 및 서버 설정하기 : REST API 명령어
목차
1. Kafka Connect 개념
Kafka Connect는 Apache Kafka®의 구성 요소 중 하나로, Kafka와 데이터베이스, 클라우드 서비스, 검색 인덱스, 파일 시스템, 키-값 저장소 등 다양한 시스템 간의 스트리밍 통합을 수행하는 데 사용됩니다.
Kafka Connect는 다양한 소스(Source)에서 Kafka로 데이터를 스트리밍하거나, Kafka에서 다양한 타겟(Sink)으로 데이터를 스트리밍하는 작업을 쉽게 만들어줍니다.
특히 많이 사용되는 커넥터는 다음과 같습니다:
- 관계형 데이터베이스(RDBMS): Oracle, SQL Server, Db2, Postgres, MySQL
- 클라우드 객체 저장소: Amazon S3, Azure Blob Storage, Google Cloud Storage
- 메시지 큐: ActiveMQ, IBM MQ, RabbitMQ
- NoSQL 및 문서 기반 저장소: Elasticsearch, MongoDB, Cassandra
- 클라우드 데이터 웨어하우스: Snowflake, Google BigQuery, Amazon Redshift
Kafka Connect의 가장 큰 장점은 프로그래밍 없이 사용할 수 있다는 점입니다.
완전히 설정 기반(configuration-based)으로 동작하기 때문에, 개발자뿐 아니라 다양한 사용자들이 손쉽게 사용할 수 있습니다.
또한, 데이터의 수집(Ingest) 및 전송(Egress)뿐 아니라, Kafka Connect는 데이터가 흐르는 중간에 가벼운 변환 작업(Transformation)도 수행할 수 있습니다.
2. Kafka 아키텍처
Kafka Connect는 크게 다음과 같은 구성 요소로 동작합니다:
◆ Connectors
- 외부 시스템과 Kafka 간 데이터 이동을 정의하는 논리적 단위
- Source Connector: 외부 시스템(예: PostgreSQL, MySQL, MongoDB 등)에서 Kafka로 데이터를 수집
- Sink Connector: Kafka에서 데이터를 받아 외부 시스템(예: Elasticsearch, S3, HDFS 등)으로 전송
- 데이터는 직접 처리하지 않고, 실제 작업은 Task가 수행 합니다.
◆ Tasks
- Source Task: 외부 시스템에서 데이터를 pull()하여 Kafka로 보냅니다.
- Sink Task: Kafka에서 데이터를 읽고 put()을 통해 외부 시스템으로 전송합니다.
- 각 Task는 별도의 스레드로 실행되며, Sink Task는 동일한 consumer group으로 묶입니다.
- Task 수는 Connector 설정에서 지정할 수 있으며, 시스템 자원과 처리 성능을 고려해 조정합니다.
◆ Workers
- 실제로 Connectors와 Tasks를 실행하는 프로세스입니다.
- REST API를 통해 커넥터 정의 및 설정을 받고 실행합니다.
- java 애플리케이션이며, connect-distributed.sh 또는 connect-standalone.sh로 실행됨
- 분산 모드에서는 하나의 Worker가 죽으면 나머지 Worker가 작업을 이어받습니다.
◆ Connect Cluster
- 여러 개의 Worker 인스턴스가 함께 동작하는 분산 환경 (Distributed Mode)
- Worker 간에 작업을 자동 분산 (분산 로드 밸런싱)
- Worker 중 하나가 다운되면 나머지 Worker가 Task를 자동 재할당 → 내결함성 제공
- Kafka의 내부 토픽(connect-configs, connect-offsets, connect-status)을 통해 작업 상태와 설정을 공유
Kafka Connect Cluster는 단순히 "Kafka 클러스터"와는 다릅니다.
Kafka Connect 클러스터는 Connect Worker 인스턴스들의 집합이며,
Kafka 클러스터는 데이터를 저장하는 서버 집합 입니다.
3. Standalone vs Distributed
모드 | 설명 | 사용 예시 |
Standalone | 단일 프로세스에서 실행되며 테스트/개발용 | 로컬 테스트, POC |
Distributed | 여러 노드에 Worker 분산 실행 | 운영 환경, 고가용성 필요 시 |
운영 환경에서는 반드시 Distributed Mode 사용을 권장합니다.
이는 작업의 부하 분산과 장애 복구 측면에서 매우 중요합니다.
4. Kafka Connect 설치 및 실행
afka Connect는 Apache Kafka 배포판에 기본 포함되어 있습니다.
Kafka Connect는 Kafka에 의존적인 서비스이므로 반드시 Kafka 클러스터가 먼저 실행 중이어야 합니다.
Kafka Connect는 다양한 Source/Sink 플러그인을 사용할 수 있습니다.
- 커넥터는 .jar 형태로 제공됨
- 기본 경로는 libs/ 또는 plugin.path 설정 경로
- 커넥터 다운로드는 Confluent Hub 이용 가능
kafka/
├── libs/
│ ├── kafka-connect-jdbc-10.7.4.jar
│ ├── postgresql-42.6.0.jar
운영 환경에서는 반드시 Distributed Mode를 사용합니다.
1. Kafka Connect 실행
# Kafka 설치 디렉토리로 이동 후 실행
cd /home/kafka/kafka_2.13-3.9.0
# Distributed Mode 실행
bin/connect-distributed.sh config/connect-distributed.properties
이 명령어는 다음과 같은 작업을 수행합니다.
단계 | 내용 |
1 | connect-distributed.sh 스크립트 실행 |
2 | 설정 파일(connect-distributed.properties) 로드 |
3 | Kafka와 통신하며 Connector 및 Task 실행 환경 구성 |
4 | REST API 서버 실행 (localhost:8083 기본) |
REST API 서버는 실행 후 자동으로 활성화되며, HTTP 요청을 통해 Connector 관리가 가능합니다.
2. Kafka Connect가 정상적으로 실행되면 다음 메시지가 콘솔에 출력됩니다:
INFO Worker started (org.apache.kafka.connect.runtime.Connect)
INFO Herder started (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
INFO REST server listening at http://localhost:8083
curl http://localhost:8083/connectors
5. Kafka 서버 설정 하기
Kafka Connect의 핵심 설정 파일은 connect-distributed.properties입니다. 주요 설정 항목은 아래와 같습니다:
항목 | 역할 | 예시 |
bootstrap.servers | Kafka 브로커 주소 | localhost:9092 |
group.id | Connect 클러스터 ID | connect-cluster |
key/value.converter | Kafka 메시지 포맷 | JsonConverter, AvroConverter 등 |
offset.storage.topic | 오프셋 저장 토픽 | connect-offsets |
config.storage.topic | 커넥터 설정 저장 토픽 | connect-configs |
status.storage.topic | 커넥터 상태 저장 토픽 | connect-status |
plugin.path | 커넥터 플러그인 디렉터리 | /opt/kafka/plugins |
listeners | REST API 서버 포트 | HTTP://:8083 |
설정 예
# ----------------------------------------
# Kafka Connect Distributed Mode 설정 파일
# ----------------------------------------
#############################
# Kafka 클러스터 연결 설정
#############################
# Kafka 브로커 목록 (쉼표로 구분)
bootstrap.servers=192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092
#############################
# Connect 클러스터 그룹 ID
#############################
# Connect 클러스터를 식별하기 위한 고유 ID (중복 불가)
group.id=connect-cluster
###################################
# Kafka 메시지 포맷 변환 설정
###################################
# Kafka 메시지의 key/value 변환 포맷
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# JSON 메시지에 스키마 포함 여부 (Sink에서 schema 필요 시 true)
key.converter.schemas.enable=true
value.converter.schemas.enable=true
###################################
# 오프셋, 설정, 상태 저장용 토픽
###################################
# Source 커넥터 오프셋 저장용 토픽
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
# 커넥터 구성 저장용 토픽
config.storage.topic=connect-configs
config.storage.replication.factor=3
# 커넥터 및 태스크 상태 저장용 토픽
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=10
###################################
# 오프셋 Flush 주기 (밀리초)
###################################
offset.flush.interval.ms=10000
###################################
# 플러그인 경로 설정
###################################
# 커넥터 JAR 또는 디렉토리가 위치한 경로 (쉼표로 구분)
plugin.path=/home/kafka/connectors/debezium,/home/kafka/connectors/jdbc
###################################
# REST API 서버 설정
###################################
# REST API 바인딩 주소 및 포트
listeners=HTTP://0.0.0.0:8083
# 다른 Connect 워커 노드가 접근할 수 있는 외부 IP/포트
rest.advertised.host.name=192.168.56.111
rest.advertised.port=8083
6. REST API를 이용 Connector 관리
Kafka Connect의 REST API는 커넥터를 관리할 수 있는 핵심 인터페이스입니다.
Kafka Connect는 실행되면 자동으로 REST API 서버를 8083 포트 (기본값) 에서 실행하며, 이를 통해 다음과 같은 작업을 수행할 수 있습니다:
- 커넥터 등록 (POST)
- 커넥터 상태 조회 (GET)
- 커넥터 설정 조회 및 변경 (GET / PUT)
- 커넥터 삭제 (DELETE)
- 커넥터 재시작 / 일시정지 / 재개 (POST)
- 전체 커넥터 목록 조회
주요 REST API 명령어
작업 | 명령어 예시 |
커넥터 전체 목록 | GET /connectors |
커넥터 등록 | POST /connectors |
커넥터 설정 조회 | GET /connectors/{name}/config |
커넥터 설정 수정 | PUT /connectors/{name}/config |
커넥터 상태 확인 | GET /connectors/{name}/status |
커넥터 삭제 | DELETE /connectors/{name} |
커넥터 재시작 | POST /connectors/{name}/restart |
커넥터 일시정지/재개 | PUT /pause, PUT /resume |
Task 상태 확인 | GET /connectors/{name}/tasks/{id}/status |
1. 전체 커넥터 목록 조회
현재 Kafka Connect 클러스터에 등록된 모든 커넥터 이름 리스트를 반환합니다.
예시:
curl http://localhost:8083/connectors
응답:
["my-postgres-source", "my-elasticsearch-sink"]
2. 커넥터 등록 (생성)
새로운 Source 또는 Sink Connector를 등록합니다.
요청 본문에는 name과 config 필드가 포함되어야 합니다.
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "my-jdbc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://db:3306/mydb",
...
}
}'
3. 커넥터 설정 조회
등록된 커넥터의 현재 설정을 확인합니다.
예시:
curl http://localhost:8083/connectors/my-jdbc-source/config
응답:
{
"connector.class": "...",
"connection.url": "...",
...
}
4. 커넥터 설정 수정
기존 커넥터의 설정을 변경합니다.
모든 설정을 한 번에 보내야 하며, 일부만 보내면 누락된 설정은 제거됩니다.
curl -X PUT http://localhost:8083/connectors/my-jdbc-source/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "...",
"connection.url": "...",
"mode": "timestamp",
...
}'
5. 커넥터 상태 확인
커넥터 및 각 Task의 현재 실행 상태를 조회합니다.
예시:
curl http://localhost:8083/connectors/my-jdbc-source/status
응답:
{
"name": "my-jdbc-source",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.56.111:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "192.168.56.111:8083"
}
],
"type": "source"
}
6. 커넥터 삭제
해당 이름을 가진 커넥터를 제거하고 Task도 함께 중지합니다.
curl -X DELETE http://localhost:8083/connectors/my-jdbc-source
7. 특정 Task 상태 확인
curl http://localhost:8083/connectors/my-jdbc-source/tasks/0/status
8. 커넥터 재시작 / 일시정지 / 재개
기능 | 메소드 | URL |
커넥터 재시작 | POST | /connectors/{name}/restart |
일시정지 | PUT | /connectors/{name}/pause |
재개 | PUT | /connectors/{name}/resume |
특정 Task 재시작 | POST | /connectors/{name}/tasks/{task-id}/restart |
Kafka Connect의 개념부터 아키텍처, 설치 및 실행, 서버 설정, REST API 활용법까지 전반적인 내용을 살펴보았습니다.
Kafka Connect는 설정 기반으로 손쉽게 운영 가능한 구조를 제공하며, 고가용성과 확장성까지 갖춘 매우 강력한 데이터 통합 플랫폼입니다.
관련 글 링크
https://www.instaclustr.com/blog/apache-kafka-connect-architecture-overview/
Apache Kafka® Connect Architecture Overview
An overview of main Kafka Connect components & their relationships. Source & Sink Connectors; Connectors, Plugins, Tasks & Workers; Clusters; & Converters.
www.instaclustr.com
https://developer.confluent.io/courses/kafka-connect/intro
Kafka Connect Tutorial: How Connectors, Sinks & Sources Work
Kafka Connect is a component of Apache Kafka® that’s used to perform streaming integration between Kafka and other systems such as databases, cloud services, and more.
developer.confluent.io
'1.시스템&인프라 > Apache Kafka' 카테고리의 다른 글
21.Kafka Connect Source, Sink 실습: PostgreSQL 연동 (0) | 2025.04.10 |
---|---|
19.Kafka Stream 핵심 개념 이해하기 (0) | 2025.04.07 |
18.Kafka Consumer API 사용법(subscribe vs assign, 수동 커밋) (0) | 2025.04.04 |
17.Kafka Producer API 사용법(비동기 전송, Transaction, 예제) (0) | 2025.04.04 |
16. Kafka AdminClient API 예제: Topic,Cluster,ConsumerGroups (0) | 2025.04.03 |