-
Apache Kafka Connect 사용해보기컴퓨터/Kafka 2021. 1. 7. 13:34728x90반응형
Kafka Connect
1. Kafka Connect란
Apache Kafka Connect란, http://, sql server, mysql, syslog, csv, json 등 소스 데이터를 Kafka에 전송한 후 (source)
데이터를 변형 후 mongoDB, elasticserach, mysql, postgres, hadoop 등으로 데이터를 쉽게 보내는 역할을 한다. (sink)
ex) RDBMS -> Kafka Connect -> Kafka -> Kafka Connect -> Amazon S3, HDFS
ex) 애플리케이션 <-> 카프카 -> Kafka Connect -> 데이터 저장소
보면 데이터를 받을 때도 Connect 보낼 때도 Connect를 이용하는데 그 이유는,
카프카에 데이터는 바이너리 형태기 때문에 알맞은 형식으로 다시 파싱 해야 한다. (schemas)
그리고 프로듀서랑 컨슈머에서 작동하게 하면 되는 거 아닌가 할 수 있지만,
프로듀서랑 컨슈머의 코드가 길어지면 길어질수록 디버깅이랑 관리가 힘들어 질 것이다.
예를 들어 Twitter 트윗을 읽고 원하는 데이터를 필터링하여 사용한다고 할 때, 처음부터 작성할 수도 있지만
이미 누군가가 쉽게 Kafka Connect 패키지로 만들어 놓았을 가능성이 높다.
다음 Kafka Connect Twitter 예제를 보면, 설정 몇 줄만 적으면 알아서 realtime으로 트위터 데이터를 읽어준다.
2. 예제
사용할 예제는, docker 컨테이너에서 작동하며 계속해서 만들어지는 샘플 데이터를
Debezium MySQL 커넥터를 이용해 카프카 토픽에 전송한다.
샘플 데모 프로젝트 다운로드
docker가 설치되어 있어야 함
git clone https://github.com/confluentinc/demo-scene.git cd kafka-connect-zero-to-hero docker-compose up -d
Kafka Connect REST 서버 실행 확인
bash -c ' \ echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n" while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)" sleep 5 done echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n" '
내장된 Connector들 있나 확인하기
MySQL, Elastic, Neo 커넥터가 들어있다.
curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'Neo4jSinkConnector|MySqlConnector|ElasticsearchSinkConnector'
Docker MySQL demo 데이터베이스 불러오기
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
여기서 demo 데이터베이스가 없다고 나올 수 있다.
분명 docker-compose 셋업에서 data/mysql을 docker-entrypoint-initdb.d로 되어있음에도 작동이 잘 안 됐다.
그럴 땐 수동으로 sql 스크립트를 실행해야 한다.
아래 명령어를 3번 이용해 3개의 sql 스크립트를 실행해준다.
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD < ./data/mysql/00.sql'
데이터베이스 데이터 ingest
SELECT * FROM ORDERS ORDER BY CREATE_TS DESC LIMIT 1\G
실시간 샘플 데이터 생성기 실행
docker exec mysql /data/02_populate_more_orders.sh
새롭게 생성되는 데이터 스트림
watch -n 1 -x docker exec -t mysql bash -c 'echo "SELECT * FROM ORDERS ORDER BY CREATE_TS DESC LIMIT 1 \G" | mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
Kafka 데이터 ElasticsSearch에 전송하기
curl -i -X PUT -H "Content-Type:application/json" \ http://localhost:8083/connectors/sink-elastic-orders-00/config \ -d '{ "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "topics": "mysql-debezium-asgard.demo.ORDERS", "connection.url": "http://elasticsearch:9200", "type.name": "type.name=kafkaconnect", "key.ignore": "true", "schema.ignore": "true" }'
Kibana에서 필드 강제 새로고침
curl -s 'http://localhost:5601/api/saved_objects/_bulk_get' \ -H 'kbn-xsrf: nevergonnagiveyouup' \ -H 'Content-Type: application/json' \ -d '[{"id":"mysql-debezium-asgard.demo.orders","type":"index-pattern"}]'
Elasticsearch 데이터 뷰어
curl -s http://localhost:9200/mysql-debezium-asgard.demo.orders/_search \ -H 'content-type: application/json' \ -d '{ "size": 5, "sort": [ { "CREATE_TS": { "order": "desc" } } ] }' |\ jq '.hits.hits[]._source | .id, .CREATE_TS'
Kafka Connect Hub
이미 존재하는 Connect들을 보고 싶으면 Confluent Kafka Connect Hub으로 가면 된다.
S3 커넥터, GCD 커넥터, Elasticsearch 커넥터 등 웬만한 데이터 저장소는 다 있다.
참고
From Zero to Hero Apache Kafka Connect by rmoff @LJC VIRTUAL MEETUP
728x90'컴퓨터 > Kafka' 카테고리의 다른 글
Pinterest의 Apache Kafka 팁, 사용법 (0) 2021.01.27 Apache Kafka 서버 보안 (security) (0) 2020.12.30 Python Kafka: Cloud Karafka 이용하기 (0) 2020.12.24