1.시스템&인프라/Apache Kafka

21.Kafka Connect Source, Sink 실습: PostgreSQL 연동

쿼드큐브 2025. 4. 10. 13:26
728x90
반응형

Kafka Connect를 활용해 PostgreSQL 데이터 연동 예제를 소개합니다.
Source 테이블의 변경 사항을 Kafka로 수집하고, 이를 Sink 테이블에 저장하는 전체 과정을 다룹니다.
Kafka Connect 설정, JDBC 드라이버 설치, 커넥터 구성 및 실행까지 단계별로 실습할 수 있습니다.

 

Kafka Connect Source, Sink 실습:  PostgreSQL 연동

 

목차

1. 환경 준비 및 구성

2. Kafka Connect Plugin 설치

3. Kafka Connect 실행

4. PostgreSQL Source Connector 설정

5. PostgreSQL Sink Connector 설정

6. 테스트 및 데이터 흐름 확인

관련 글 링크

 

 

1. 환경 준비 및 구성

  • Kafka 3.9.0 
  • PostgreSQL (예: 12.x)
  • Kafka Connect (분산 모드)
  • 커넥터 플러그인: Debezium PostgreSQL Source, Confluent JDBC Sink
VM 역할 IP주소
host_56_101 Controller, Broker 192.168.56.101
host_56_102 Controller, Broker 192.168.56.102
host_56_103 Controller, Broker, Kafka Connect 192.168.56.103

 

◆ Connector 디렉토리 구조

/home/kafka/connectors/
└── plugins/
    ├── debezium-connector-postgres/...
    └── confluentinc-kafka-connect-jdbc-10.8.0/...

#참고
├── confluentinc-kafka-connect-jdbc-10.8.0
│   ├── assets
│   ├── doc
│   │   └── licenses
│   ├── etc
│   └── lib
└── debezium-connector-postgres

 

◆ PostgreSQL 설치

sudo apt update
sudo apt install postgresql postgresql-contrib

 

◆ Source 테이블과 Sink 테이블 생성

-- Source 테이블 (연속 업데이트할 테이블)
CREATE TABLE public.source_table (
    id SERIAL PRIMARY KEY,
    name TEXT,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Sink 테이블 (결과 저장용)
CREATE TABLE public.sink_table (
    id INTEGER PRIMARY KEY,
    name TEXT,
    updated_at TIMESTAMP
);

 

 postgresql.conf 수정:

Debezium은 WAL(Logical decoding)을 사용하므로 PostgreSQL 설정이 필요합니다.

wal_level = logical                     # WAL 로그를 logical 형식으로 기록
max_wal_senders = 10                    # WAL 송신자 프로세스 수 (복제용)
max_replication_slots = 10             # 복제 슬롯 개수 설정 (Debezium 1개 사용)
  • 설정 후 PostgreSQL을 반드시 재시작해야 적용됩니다.

 pg_hba.conf에 replication 권한 허용:

  • Kafka Connect가 PostgreSQL에 접속해 replication을 수행하려면, 해당 유저의 접속을 허용해야 합니다.
  • 설정 후 PostgreSQL 재시작 or pg_ctl reload 필요
# 모든 IP에서 replication 유저 접속 허용 (테스트용)
host    replication     replicator      0.0.0.0/0              md5
또는
# Kafka Connect 서버 IP만 허용 (예: 192.168.56.111)
host    replication     replicator      192.168.56.111/32      md5

#replication : logical replication을 허용하는 용도
#replicator  : replication 권한이 있는 사용자 이름
#0.0.0.0/0   : 모든 IP 허용 (보안상 운영환경에선 IP 제한 필수)

 

 Replication 유저 생성 및 권한 부여:

  • Debezium이 WAL 로그를 읽을 수 있는 권한이 필요합니다.
  • 일반 계정으로는 replication을 수행할 수 없습니다.
  • Source Connector, Sink Connector 설정에 따라 Table에 대한 권한을 추가로 부여해야 할 필요가 있습니다.
1. 계정 생성
#user : replicator
#passwd: replpass
CREATE ROLE replicator WITH REPLICATION LOGIN PASSWORD 'replpass';

2.DB 접속 권한
GRANT CONNECT ON DATABASE postgres TO replicator;
-- public 스키마 사용 허용
GRANT USAGE ON SCHEMA public TO replicator;

3.source_table 에 접근권한 부여
-- source_table에 대한 SELECT 권한
GRANT SELECT ON public.source_table TO replicator;
-- 앞으로 생성될 테이블에도 SELECT 자동 부여 (선택)
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replicator;

4.sink_table 에 권한 부여
-- 접속 권한
GRANT INSERT, UPDATE, DELETE ON public.sink_table TO replicator;
-- 소유권(ALTER 명령으로 칼럼 추가하기 위해)
ALTER TABLE public.sink_table OWNER TO replicator;

 

 Replication 생성:

  • 어떤 테이블의 변경 사항을 Kafka로 보낼지 PostgreSQL에게 알려주는 설정입니다.
  • dbz_publication 이라는 이름의 publication(발행)을 생성하며, public.source_table 테이블의 변경 사항을 포함한다.
CREATE PUBLICATION dbz_publication FOR TABLE public.source_table;

#FOR TABLE에 지정된 테이블만 Kafka로 전달됨
#여러 테이블도 가능: FOR TABLE table1, table2

 

 

2. Kakfa Connect Plugin 설치

Debezium PostgreSQL Source 설치:

https://debezium.io/documentation/reference/stable/install.html에서 Postgres Connector plugin archive를 다운받습니다.

# Kafka Connect 플러그인 디렉토리로 이동
cd /home/kafka/connectors
mkdir plugins
cd plugins

# Debezium PostgreSQL Connector 다운로드
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/3.1.0.Final/debezium-connector-postgres-3.1.0.Final-plugin.tar.gz

tar -xvzf debezium-connector-postgres-3.1.0.Final-plugin.tar.gz

# 참고
└── plugins
    └── debezium-connector-postgres

 

Confluent JDBC Sink 설치:

 

  • 또는 아래 예시 코드에서 wget 명령으로 파일을 다운로드 받을 수 있습니다.
cd /home/kafka/connectors
mkdir plugins
cd plugins

wget https://hub-downloads.confluent.io/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.8.2/confluentinc-kafka-connect-jdbc-10.8.2.zip

unzip confluentinc-kafka-connect-jdbc-10.8.2.zip

참고
.
├── confluentinc-kafka-connect-jdbc-10.8.2
│   ├── assets
│   ├── doc
│   │   └── licenses
│   ├── etc
│   └── lib
└── debezium-connector-postgres

 

 

3. Kafka Connect 실행

connect-distributed.properties 수정 (필수):

# ----------------------------------------
# Kafka Connect Distributed Mode 설정 파일
# ----------------------------------------

#############################
# Kafka 클러스터 연결 설정
#############################
# Kafka 브로커 목록 (쉼표로 구분)
bootstrap.servers=192.168.56.101:9092,192.168.56.102:9092,192.168.56.103:9092

#############################
# Connect 클러스터 그룹 ID
#############################
# Connect 클러스터를 식별하기 위한 고유 ID (중복 불가)
group.id=connect-cluster

###################################
# Kafka 메시지 포맷 변환 설정
###################################
# Kafka 메시지의 key/value 변환 포맷
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# JSON 메시지에 스키마 포함 여부 (Sink에서 schema 필요 시 true)
key.converter.schemas.enable=true
value.converter.schemas.enable=true

###################################
# 오프셋, 설정, 상태 저장용 토픽
###################################
# Source 커넥터 오프셋 저장용 토픽
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25

# 커넥터 구성 저장용 토픽
config.storage.topic=connect-configs
config.storage.replication.factor=3

# 커넥터 및 태스크 상태 저장용 토픽
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=10

###################################
# 오프셋 Flush 주기 (밀리초)
###################################
offset.flush.interval.ms=10000

###################################
# 플러그인 경로 설정
###################################
# 커넥터 JAR 또는 디렉토리가 위치한 경로 (쉼표로 구분)
plugin.path=/home/kafka/connectors/plugins

###################################
# REST API 서버 설정
###################################
# REST API 바인딩 주소 및 포트
listeners=HTTP://0.0.0.0:8083

# 다른 Connect 워커 노드가 접근할 수 있는 외부 IP/포트
rest.advertised.host.name=192.168.56.103
rest.advertised.port=8083

 

Kafka Connect 실행 (분산 모드):

bin/connect-distributed.sh config/connect-distributed.properties

 

 

4. PostgreSQL Source Connector 설정

Source Connector 설정 파일 (postgres-source.json):

{
  "name": "postgres-source",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",

    "database.hostname": "192.168.56.1",
    "database.port": "5432",
    "database.user": "replicator",
    "database.password": "replpass",
    "database.dbname": "postgres",

    "topic.prefix": "pgserver",
    "table.include.list": "public.source_table",
    "plugin.name": "pgoutput",

    "slot.name": "debezium_slot",
    "publication.autocreate.mode": "disabled",
    "tombstones.on.delete": "false",
    "delete.handling.mode": "rewrite",
    "include.schema.changes": "false",

    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": true,
    "value.converter.schemas.enable": true,

    "snapshot.mode": "initial",
    "time.precision.mode":"connect",
    "slot.drop.on.stop": "false",
    "heartbeat.interval.ms": "30000",
    "heartbeat.topics.prefix": "heartbeat",
    "max.batch.size": "2048",
    "max.queue.size": "8192"
  }
}

 

항목 설명
name 커넥터 이름. Kafka Connect에 등록될 고유한 이름입니다.
connector.class 사용할 커넥터 클래스. PostgreSQL용 Debezium 커넥터 (io.debezium.connector.postgresql.PostgresConnector)를 지정합니다.
topic.prefix Kafka에 생성될 토픽 이름의 접두어입니다. 테이블 이름이 붙어 최종 토픽 이름이 결정됩니다.
table.include.list 캡처 대상이 될 테이블 목록입니다. 쉼표로 여러 개 지정 가능합니다.
plugin.name PostgreSQL logical decoding plugin 이름입니다. (pgoutput, wal2json 등)
slot.name PostgreSQL replication slot의 이름입니다. WAL 로그 추적에 사용됩니다.
publication.autocreate.mode publication을 자동 생성할지 여부입니다. disabled, filtered, all_tables 중 설정 가능합니다.
tombstones.on.delete 삭제 이벤트 시 tombstone 메시지를 Kafka에 보낼지 여부입니다. (true 또는 false)
delete.handling.mode 삭제 이벤트 처리 방식입니다. rewrite를 사용하면 메시지에 __deleted: true 필드가 포함됩니다.
include.schema.changes PostgreSQL의 스키마 변경 이벤트를 Kafka로 보낼지 여부입니다. 보통 false로 설정합니다.
key.converter Kafka 메시지 key에 사용할 변환기입니다. 일반적으로 JSON 형식을 사용합니다.
value.converter Kafka 메시지 value에 사용할 변환기입니다. JSON으로 설정하면 메시지 파싱이 용이합니다.
key.converter.schemas.enable Kafka 메시지 key에 스키마 정보를 포함할지 여부입니다.
value.converter.schemas.enable Kafka 메시지 value에 스키마 정보를 포함할지 여부입니다.
snapshot.mode 초기 데이터 스냅샷을 수행할지 여부입니다. (initial, never, when_needed 등)
time.precision.mode 날짜 및 시간 필드의 Kafka 메시지 표현 방식을 지정합니다.  
slot.drop.on.stop 커넥터 중단 시 replication slot을 삭제할지 여부입니다. (true는 테스트 용도로 유용)
heartbeat.interval.ms heartbeat 메시지를 Kafka로 보낼 주기(ms)입니다.
heartbeat.topics.prefix heartbeat 메시지용 Kafka 토픽의 접두어입니다.
max.batch.size Kafka로 전송할 최대 레코드 수입니다.
max.queue.size Kafka Connect 내부 큐의 최대 크기입니다. 큐가 꽉 차면 메시지 수집이 지연될 수 있습니다.

 

  • time.precision.mode
항목 설명
용도 PostgreSQL의 시간 타입(TIMESTAMP, TIME 등)을 Kafka 메시지로 변환할 때 어떤 포맷과 정밀도로 보낼지 결정
값 종류 - connect: Kafka Connect 기본 논리 타입 사용 (밀리초 기준)
- adaptive_time_microseconds: 마이크로초 정밀도 기반 Debezium 타입 사용
- isostring: ISO-8601 문자열 포맷
- microseconds/nanoseconds: 초고정밀 숫자 기반 포맷
주의사항 connect는 JDBC Sink에서 가장 호환성 좋음
isostring은 사람이 읽기 쉽지만 변환 필요
권장값 - 일반 사용: connect
- 정밀 분석: adaptive_time_microseconds
- 시각화 또는 JSON 직렬화 목적: isostring
 
  • delete.handling.mode
항목 설명
용도 PostgreSQL에서 DELETE 발생 시, Kafka 메시지를 어떻게 전송할지 결정
값 종류 - none : 삭제 이벤트를 Kafka에 전송하지 않음

- drop : tombstone 메시지만 전송

- rewrite : Kafka 메시지에 __deleted=true 필드를 추가하여 보냄 (실무에서 자주 사용)
추천 rewrite 사용 → Sink에서 delete 판단/처리 가능
주의사항 Sink에서 __deleted=true 처리를 위한 설정 필요함 (transforms.unwrap.delete.handling.mode=rewrite)

 

  • publication.autocreate.mode
항목 설명
용도 Debezium이 PostgreSQL의 logical decoding용 publication을 자동 생성할지 여부
값 종류 - disabled : 자동 생성하지 않음 (사용자가 직접 CREATE PUBLICATION 해야 함)
- filtered :  table.include.list에 포함된 테이블만 포함하여 자동 생성
- all_tables : 전체 테이블을 포함하는 publication 자동 생성
주의사항 filtered, all_tables는 publication 생성 시 테이블의 owner 권한 필요
권장값 운영환경: disabled (보안상 안전)
개발/테스트: filtered (빠르게 설정 가능)

 

  • timestones.on.delete
항목 설명
용도 delete 이벤트 발생 시 null payload 메시지 (tombstone) 을 Kafka에 전송할지 여부
기본값 true
추천값 false (대부분의 경우 rewrite 방식과 함께 tombstone은 불필요)
Kafka Compact Topic 사용 시 주의 compact topic을 사용하는 경우 tombstone은 레코드 삭제 트리거 역할을 함 → 유지할 경우 주의 필요
Sink에서 영향 true일 경우 tombstone 메시지를 처리할 수 없는 Sink에서는 에러 발생 가능

 

커넥터 등록 및 확인:

curl -X POST -H "Content-Type: application/json" \
--data @postgres-source.json \
http://192.168.56.103:8083/connectors
curl -X GET  http://192.168.56.103:8083/connectors

["postgres-source"]

 

 

5. PostgreSQL Sink Connector 설정

Sink Connector 설정 파일 (postgres-sink.json):

{
  "name": "postgres-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

    "connection.url": "jdbc:postgresql://192.168.56.1:5432/postgres",
    "connection.user": "replicator",
    "connection.password": "replpass",

    "topics": "pgserver.public.source_table",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "pk.fields": "id",
    "table.name.format": "public.sink_table",

    "auto.create": "true",
    "auto.evolve": "true",

    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.unwrap.delete.handling.mode": "rewrite",

    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": true,
    "value.converter.schemas.enable": true,

    "delete.enabled": "true",
    "insert.retry.max.attempts": "5",
    "errors.log.enable": "true",
    "errors.deadletterqueue.topic.name": "dlq.postgres.sink"
  }
}

 

항목 설명
name 커넥터 이름. Kafka Connect에서 식별되는 고유 이름입니다.
connector.class 사용할 커넥터 클래스. PostgreSQL JDBC Sink의 경우 io.confluent.connect.jdbc.JdbcSinkConnector 사용.
topics Kafka에서 구독할 토픽 이름. source connector에서 발행한 토픽과 동일해야 합니다.
insert.mode Kafka 메시지를 DB에 삽입하는 방식: insert, update, upsert 중 선택 (upsert는 PK 충돌 시 update).
pk.mode Kafka 메시지에서 primary key를 추출하는 방식. record_key는 Kafka 메시지 key에서 추출.
pk.fields Primary key로 사용할 필드 이름. 복수 지정 가능. 예: id
table.name.format Kafka 메시지가 저장될 테이블 이름. 정적으로 지정하거나 토픽 기반 동적 생성 가능.
auto.create 테이블이 존재하지 않으면 자동 생성 여부. true 권장 (초기 개발 시 유용).
auto.evolve Kafka 메시지 필드가 바뀔 경우, 테이블 스키마 자동 수정 여부 (ALTER TABLE).
transforms 적용할 SMT(단계별 변환기) 목록. 쉼표로 구분. 예: unwrap
transforms.unwrap.type Debezium 메시지 envelope을 제거하고 payload만 추출하는 SMT 설정.
transforms.unwrap.drop.tombstones tombstone 메시지를 무시할지 여부. true로 설정 시 삭제 tombstone 무시.
transforms.unwrap.delete.handling.mode 삭제 이벤트 처리 방식. rewrite는 __deleted: true 필드로 표현.
key.converter Kafka 메시지 key의 포맷 변환기. 일반적으로 JSON 포맷 사용.
value.converter Kafka 메시지 value 포맷 변환기. Debezium은 JSON 기반 value 메시지 전송.
key.converter.schemas.enable Kafka 메시지 key에 schema 포함 여부. Debezium 사용 시 true 권장.
value.converter.schemas.enable Kafka 메시지 value에 schema 포함 여부. Sink에서 스키마 기반 매핑 시 필요.
delete.enabled __deleted=true 메시지를 실제 DB DELETE 쿼리로 처리할지 여부.
insert.retry.max.attempts DB insert 실패 시 재시도 횟수. 기본 10회, 여기선 5회 지정됨.
errors.log.enable 커넥터 처리 중 오류가 발생할 경우 로그 출력 여부.
errors.deadletterqueue.topic.name 처리 실패 메시지를 전송할 Kafka DLQ(Dead Letter Queue) 토픽 이름. 디버깅 및 재처리에 유용.

 

  • insert.mode
설명
insert 단순 insert. 같은 PK가 있으면 오류 발생
upsert 존재하면 update, 없으면 insert (PK 필수), 실시간 반영에 적합, CDC 환경에서 권장.
update 존재하는 행만 update, 없으면 무시

 

  • pk.mode
설명
none Primary Key를 설정하지 않음 (테이블에 PK 없음),  테스트 용도 또는 로그 테이블
record_key Kafka 메시지의 key에서 PK를 추출,  Debezium에서 id를 key로 보내는 경우
record_value Kafka 메시지의 value에서 PK 필드 추출, 일반 Kafka Producer 사용 시

 

  • auto.create
설명
true Kafka 메시지의 스키마를 기반으로 Sink 테이블을 자동 생성합니다.
false Sink 테이블은 직접 생성해야 하며, 일치하지 않으면 에러 발생

- 개발 초기에는 true로 설정하여 테이블을 자동 생성하는 것이 편리

- 운영 환경에서는 false로 설정 후 명시적 스키마 관리 권장

 

  • auto.evolve
설명
true Kafka 메시지에 새로운 필드가 추가되면 DB 테이블을 ALTER TABLE로 자동 진화
false 스키마가 불일치할 경우 에러 발생. 수동으로 테이블을 수정해야 함

- JSON 스키마 변경이 잦은 환경(예: Debezium CDC, 마이크로서비스)에서는 true가 편리

- 운영 환경에서는 스키마 고정 및 CI 기반 변경 관리가 좋음 (false 추천)

 

  • transforms
이름 설명
ExtractNewRecordState (unwrap) Debezium 메시지의 envelope (before, after, op, ts_ms...) 구조를 제거하고 after 값만 추출합니다.
Sink에서 메시지를 쉽게 처리할 수 있도록 함.
TimestampConverter (convert_ts) string 형식의 timestamp를 Java Timestamp 객체로 변환. 형식 오류 방지
SetSchemaMetadata, RegexRouter, ValueToKey 등 다양한 변형 작업 지원 (필드 이동, 주제 이름 변경, 키 지정 등)

- Kafka Connect에서 SMT(Single Message Transform) 를 적용할 수 있는 설정입니다.
- 여러 개의 transform을 쉼표로 나열하며, 적용 순서대로 실행됩니다.

1. ISO-8601 문자열을 Kafka Timestamp로 변환

"transforms.convert_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_ts.field": "updated_at",
"transforms.convert_ts.target.type": "Timestamp",
"transforms.convert_ts.format": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSX"

입력 예시: "updated_at": "2025-04-10T10:15:23.123456Z"
동작: 문자열을 Kafka Connect의 timestamp logical type (밀리초)로 변환하여 JDBC Sink 등에 매핑 가능
실패 조건: 값이 "1970-01-01T00:00:00Z" 등 포맷과 맞지 않으면 ParseException

 

건넥터 등록

curl -X POST -H "Content-Type: application/json" \
--data @jdbc-sink.json \
http://localhost:8083/connectors

 

 

6. 테스트 및 데이터 흐름 확인

Kafka Connect가 제대로 작동하는지 확인하려면 source_table에 데이터를 삽입한 뒤, sink_table에 같은 내용이 자동으로 들어오는지 확인하면 됩니다.

1. Source 테이블에 데이터 삽입

INSERT INTO public.source_table(name) VALUES ('1111');
  • 위 SQL은 PostgreSQL의 source_table에 "111"라는 값을 가진 행을 추가합니다.
  • id는 SERIAL 타입이기 때문에 자동 증가하고, updated_at은 현재 시간이 자동으로 입력됩니다.
  • Debezium Source Connector가 이 INSERT 이벤트를 감지하고, Kafka로 변경 로그 이벤트를 전송합니다.

 

2. Kafka → Sink 커넥터가 이벤트 수신

  • Kafka에 전송된 메시지는 pgserver.public.source_table이라는 토픽에 저장됩니다.
  • Sink Connector는 이 토픽을 구독하고 있고,
  • 메시지 내용 중 id, name, updated_at을 읽어, PostgreSQL의 sink_table에 upsert 방식으로 저장합니다.

3. Sink 테이블에서 데이터 확인

SELECT * FROM public.sink_table;
  • 위 SQL을 실행하면 sink_table에 Kafka를 거쳐 전송된 동일한 데이터가 삽입되어 있어야 합니다.

  • delete.handling.mode": "rewrite" 일 경우: 실제 레코드는 남겨두되 __deleted = true 로 마킹합니다.
  • __deleted 필드가 생기는 조건 요약
조건 설명
transforms.unwrap.delete.handling.mode="rewrite" 삭제 이벤트를 null 레코드로 바꾸지 않고, __deleted = "true" 로 마킹
transforms.unwrap.type = io.debezium.transforms.ExtractNewRecordState 메시지를 평평하게 만들고 delete도 처리
Sink 테이블이 auto.create 설정일 경우 Kafka 메시지에 있는 필드대로 테이블 생성 → __deleted 컬럼 자동 생성됨, 권한 필요
또는 sink 테이블에 __deleted 필드가 없으면 Sink Connector가 INSERT 시 테이블에 컬럼 없다고 오류 발생함

 

4. consumer.sh 로 데이터 수신 확인하기

 $KAFKA_HOME/bin/kafka-console-consumer.sh \
  --bootstrap-server 192.168.56.102:9092 \
  --topic pgserver.public.source_table \
  --from-beginning \
  --property print.key=true \
  --property print.value=true \
  --property key.separator=" | "

 


Kafka Connect를 사용해 PostgreSQL Source & Sink 연동 예제를 실습해보았습니다.
Debezium PostgreSQL 커넥터를 통해 변경 데이터를 Kafka로 보내고, JDBC Sink 커넥터로 다시 PostgreSQL로 전달할 수 있습니다.

 

 

관련 글 링크

10. Kafka 3.9 KRaft 모드 설치 (JDK 17 + 단일 노드 구성)

20.Kafka Connect 이해 및 서버 설정하기 : REST API 명령어

11. Kafka 3.9 노드 구성별 server.properties 예시: KRaft 모드

https://docs.confluent.io/kafka-connectors/jdbc/current/overview.html

 

JDBC Source and Sink Connector for Confluent Platform | Confluent Documentation

JDBC Source and Sink Connector for Confluent Platform The JDBC connectors allow data transfer between relational databases and Apache Kafka®. The JDBC source connector allows you to import data from any relational database with a JDBC driver into Kafka to

docs.confluent.io

 

https://debezium.io/documentation/reference/stable/connectors/postgresql.html

 

Debezium connector for PostgreSQL :: Debezium Documentation

Tombstone events When a row is deleted, the delete event value still works with log compaction, because Kafka can remove all earlier messages that have that same key. However, for Kafka to remove all messages that have that same key, the message value must

debezium.io

 

728x90
반응형