-
Apache Kafka Connect 사용해보기컴퓨터/Kafka 2021. 1. 7. 13:34728x90반응형
Kafka Connect
Apache Kafka
A Distributed Streaming Platform.
kafka.apache.org
1. Kafka Connect란
Apache Kafka Connect란, http://, sql server, mysql, syslog, csv, json 등 소스 데이터를 Kafka에 전송한 후 (source)
데이터를 변형 후 mongoDB, elasticserach, mysql, postgres, hadoop 등으로 데이터를 쉽게 보내는 역할을 한다. (sink)
ex) RDBMS -> Kafka Connect -> Kafka -> Kafka Connect -> Amazon S3, HDFS
ex) 애플리케이션 <-> 카프카 -> Kafka Connect -> 데이터 저장소
By rmoff @LJC Virtual Event 보면 데이터를 받을 때도 Connect 보낼 때도 Connect를 이용하는데 그 이유는,
카프카에 데이터는 바이너리 형태기 때문에 알맞은 형식으로 다시 파싱 해야 한다. (schemas)
그리고 프로듀서랑 컨슈머에서 작동하게 하면 되는 거 아닌가 할 수 있지만,
프로듀서랑 컨슈머의 코드가 길어지면 길어질수록 디버깅이랑 관리가 힘들어 질 것이다.
예를 들어 Twitter 트윗을 읽고 원하는 데이터를 필터링하여 사용한다고 할 때, 처음부터 작성할 수도 있지만
이미 누군가가 쉽게 Kafka Connect 패키지로 만들어 놓았을 가능성이 높다.
다음 Kafka Connect Twitter 예제를 보면, 설정 몇 줄만 적으면 알아서 realtime으로 트위터 데이터를 읽어준다.
jcustenborder/kafka-connect-twitter
Kafka Connect connector to stream data in real time from Twitter. - jcustenborder/kafka-connect-twitter
github.com
2. 예제
confluentinc/demo-scene
Scripts and samples to support Confluent Platform talks. May be rough around the edges. For automated tutorials and QA'd code, see https://github.com/confluentinc/examples/ - confluentinc/demo-...
github.com
사용할 예제는, docker 컨테이너에서 작동하며 계속해서 만들어지는 샘플 데이터를
Debezium MySQL 커넥터를 이용해 카프카 토픽에 전송한다.
샘플 데모 프로젝트 다운로드
docker가 설치되어 있어야 함
git clone https://github.com/confluentinc/demo-scene.git cd kafka-connect-zero-to-hero docker-compose up -d
Kafka Connect REST 서버 실행 확인
bash -c ' \ echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n" while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)" sleep 5 done echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n" '
내장된 Connector들 있나 확인하기
MySQL, Elastic, Neo 커넥터가 들어있다.
curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'Neo4jSinkConnector|MySqlConnector|ElasticsearchSinkConnector'
Docker MySQL demo 데이터베이스 불러오기
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
여기서 demo 데이터베이스가 없다고 나올 수 있다.
분명 docker-compose 셋업에서 data/mysql을 docker-entrypoint-initdb.d로 되어있음에도 작동이 잘 안 됐다.
그럴 땐 수동으로 sql 스크립트를 실행해야 한다.
아래 명령어를 3번 이용해 3개의 sql 스크립트를 실행해준다.
docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD < ./data/mysql/00.sql'
데이터베이스 데이터 ingest
SELECT * FROM ORDERS ORDER BY CREATE_TS DESC LIMIT 1\G
실시간 샘플 데이터 생성기 실행
docker exec mysql /data/02_populate_more_orders.sh
새롭게 생성되는 데이터 스트림
watch -n 1 -x docker exec -t mysql bash -c 'echo "SELECT * FROM ORDERS ORDER BY CREATE_TS DESC LIMIT 1 \G" | mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
Kafka 데이터 ElasticsSearch에 전송하기
curl -i -X PUT -H "Content-Type:application/json" \ http://localhost:8083/connectors/sink-elastic-orders-00/config \ -d '{ "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "topics": "mysql-debezium-asgard.demo.ORDERS", "connection.url": "http://elasticsearch:9200", "type.name": "type.name=kafkaconnect", "key.ignore": "true", "schema.ignore": "true" }'
Kibana에서 필드 강제 새로고침
curl -s 'http://localhost:5601/api/saved_objects/_bulk_get' \ -H 'kbn-xsrf: nevergonnagiveyouup' \ -H 'Content-Type: application/json' \ -d '[{"id":"mysql-debezium-asgard.demo.orders","type":"index-pattern"}]'
Elasticsearch 데이터 뷰어
curl -s http://localhost:9200/mysql-debezium-asgard.demo.orders/_search \ -H 'content-type: application/json' \ -d '{ "size": 5, "sort": [ { "CREATE_TS": { "order": "desc" } } ] }' |\ jq '.hits.hits[]._source | .id, .CREATE_TS'
Kafka Connect Hub
이미 존재하는 Connect들을 보고 싶으면 Confluent Kafka Connect Hub으로 가면 된다.
S3 커넥터, GCD 커넥터, Elasticsearch 커넥터 등 웬만한 데이터 저장소는 다 있다.
Home
Confluent, founded by the creators of Apache Kafka, delivers a complete execution of Kafka for the Enterprise, to help you run your business in real time.
www.confluent.io
참고
From Zero to Hero Apache Kafka Connect by rmoff @LJC VIRTUAL MEETUP
728x90'컴퓨터 > Kafka' 카테고리의 다른 글
Pinterest의 Apache Kafka 팁, 사용법 (0) 2021.01.27 Apache Kafka 서버 보안 (security) (0) 2020.12.30 Python Kafka: Cloud Karafka 이용하기 (0) 2020.12.24