ABOUT ME

-

Total
-
  • Apache Kafka Connect 사용해보기
    컴퓨터/Kafka 2021. 1. 7. 13:34
    728x90
    반응형

    Kafka Connect

     

    Apache Kafka

    A Distributed Streaming Platform.

    kafka.apache.org


     

    1. Kafka Connect란

    Apache Kafka Connect란, http://, sql server, mysql, syslog, csv, json 등 소스 데이터를 Kafka에 전송한 후 (source)

    데이터를 변형 후 mongoDB, elasticserach, mysql, postgres, hadoop 등으로 데이터를 쉽게 보내는 역할을 한다. (sink)

     

    ex) RDBMS -> Kafka Connect -> Kafka -> Kafka Connect -> Amazon S3, HDFS

    ex) 애플리케이션 <-> 카프카 -> Kafka Connect -> 데이터 저장소

     

    By rmoff @LJC Virtual Event

    보면 데이터를 받을 때도 Connect 보낼 때도 Connect를 이용하는데 그 이유는,

    카프카에 데이터는 바이너리 형태기 때문에 알맞은 형식으로 다시 파싱 해야 한다. (schemas)

     

    그리고 프로듀서랑 컨슈머에서 작동하게 하면 되는 거 아닌가 할 수 있지만,

    프로듀서랑 컨슈머의 코드가 길어지면 길어질수록 디버깅이랑 관리가 힘들어 질 것이다.

     

    예를 들어 Twitter 트윗을 읽고 원하는 데이터를 필터링하여 사용한다고 할 때, 처음부터 작성할 수도 있지만

    이미 누군가가 쉽게 Kafka Connect 패키지로 만들어 놓았을 가능성이 높다.

    다음 Kafka Connect Twitter 예제를 보면, 설정 몇 줄만 적으면 알아서 realtime으로 트위터 데이터를 읽어준다.

     

    jcustenborder/kafka-connect-twitter

    Kafka Connect connector to stream data in real time from Twitter. - jcustenborder/kafka-connect-twitter

    github.com

     

    2. 예제

     

    confluentinc/demo-scene

    Scripts and samples to support Confluent Platform talks. May be rough around the edges. For automated tutorials and QA'd code, see https://github.com/confluentinc/examples/ - confluentinc/demo-...

    github.com

    사용할 예제는, docker 컨테이너에서 작동하며 계속해서 만들어지는 샘플 데이터를

    Debezium MySQL 커넥터를 이용해 카프카 토픽에 전송한다.

     

    샘플 데모 프로젝트 다운로드

    docker가 설치되어 있어야 함

    git clone https://github.com/confluentinc/demo-scene.git
    cd kafka-connect-zero-to-hero
    docker-compose up -d

     

    Kafka Connect REST 서버 실행 확인

    bash -c ' \
    echo -e "\n\n=============\nWaiting for Kafka Connect to start listening on localhost ⏳\n=============\n"
    while [ $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) -ne 200 ] ; do
      echo -e "\t" $(date) " Kafka Connect listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) " (waiting for 200)"
      sleep 5
    done
    echo -e $(date) "\n\n--------------\n\o/ Kafka Connect is ready! Listener HTTP state: " $(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors) "\n--------------\n"
    '

     

    내장된 Connector들 있나 확인하기

    MySQL, Elastic, Neo 커넥터가 들어있다.

    curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep 'Neo4jSinkConnector|MySqlConnector|ElasticsearchSinkConnector'

     

    Docker MySQL demo 데이터베이스 불러오기

    docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'

     

    여기서 demo 데이터베이스가 없다고 나올 수 있다.

    분명 docker-compose 셋업에서 data/mysql을 docker-entrypoint-initdb.d로 되어있음에도 작동이 잘 안 됐다.

    그럴 땐 수동으로 sql 스크립트를 실행해야 한다.

     

    아래 명령어를 3번 이용해 3개의 sql 스크립트를 실행해준다.

    docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD < ./data/mysql/00.sql'

     

    데이터베이스 데이터 ingest

    SELECT * FROM ORDERS ORDER BY CREATE_TS DESC LIMIT 1\G

     

    실시간 샘플 데이터 생성기 실행

    docker exec mysql /data/02_populate_more_orders.sh

     

    새롭게 생성되는 데이터 스트림

    watch -n 1 -x docker exec -t mysql bash -c 'echo "SELECT * FROM ORDERS ORDER BY CREATE_TS DESC LIMIT 1 \G" | mysql -u root -p$MYSQL_ROOT_PASSWORD demo'

     

    Kafka 데이터 ElasticsSearch에 전송하기

    curl -i -X PUT -H  "Content-Type:application/json" \
        http://localhost:8083/connectors/sink-elastic-orders-00/config \
        -d '{
            "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
            "topics": "mysql-debezium-asgard.demo.ORDERS",
            "connection.url": "http://elasticsearch:9200",
            "type.name": "type.name=kafkaconnect",
            "key.ignore": "true",
            "schema.ignore": "true"
        }'

    Kibana에서 필드 강제 새로고침

    curl -s 'http://localhost:5601/api/saved_objects/_bulk_get' \
      -H 'kbn-xsrf: nevergonnagiveyouup' \
      -H 'Content-Type: application/json' \
      -d '[{"id":"mysql-debezium-asgard.demo.orders","type":"index-pattern"}]'

     

    Elasticsearch 데이터 뷰어

    curl -s http://localhost:9200/mysql-debezium-asgard.demo.orders/_search \
        -H 'content-type: application/json' \
        -d '{ "size": 5, "sort": [ { "CREATE_TS": { "order": "desc" } } ] }' |\
        jq '.hits.hits[]._source | .id, .CREATE_TS'

     

    Kafka Connect Hub

    이미 존재하는 Connect들을 보고 싶으면 Confluent Kafka Connect Hub으로 가면 된다.

    S3 커넥터, GCD 커넥터, Elasticsearch 커넥터 등 웬만한 데이터 저장소는 다 있다.

     

    Home

    Confluent, founded by the creators of Apache Kafka, delivers a complete execution of Kafka for the Enterprise, to help you run your business in real time.

    www.confluent.io

     

    참고

    From Zero to Hero Apache Kafka Connect by rmoff @LJC VIRTUAL MEETUP

     

    728x90

    '컴퓨터 > Kafka' 카테고리의 다른 글

    Pinterest의 Apache Kafka 팁, 사용법  (0) 2021.01.27
    Apache Kafka 서버 보안 (security)  (0) 2020.12.30
    Python Kafka: Cloud Karafka 이용하기  (0) 2020.12.24

    댓글