ABOUT ME

-

Total
-
  • Kafka Slack API Consumer 만들기 (Python)
    컴퓨터/Kafka 2020. 12. 1. 19:31
    728x90
    반응형

    Kafka

    예제 프로젝트

     

    Apache Kafka

    Apache Kafka: A Distributed Streaming Platform.

    kafka.apache.org


     

    1. 소개

    이 예제에서 만들 결과물은 다음과 같다.

    Producer CLI를 통해서, SLACK-KAFKA 토픽에게 아래의 json 데이터를 보냈다.

    (Producer를 만들어서 전송해도 된다.)

    {"CLUB_STATUS":"platinum","EMAIL":"ikr@kakao.com","STARS":1,"MESSAGE":"Exceeded all my expectations!"}

     

    그러면, Consumer는 자동으로 EMAILMESSAGE를 읽어서,

    내가 원하는 Slack 채널에 다음과 같이 메시지를 남겨준다.

     

    ("EMAIL"이 나쁜 리뷰를 남겼어요 :(, 이 문제를 당장 해결할 수 있는지 확인해주세요!)

     

    2. Slack API Token

    "KafkaTest"란 봇을 만들어서 "kafka" 채널에 추가하고,

    kafka에 데이터가 전송되면, Consumer로 이 데이터를 json 파싱 하여,

    봇이 자동으로 위 메시지를 남겨주는 시스템을 만들 것이다.

     

    1. 우선, Slack App을 만들어준다. @링크

     

    2. 앱 생성 후, 앱을 클릭하여 OAuth & Permissions에 들어간다.

    3. chat:write scope 추가

    (**FAILED: missing_scope 오류 방지)

     

    4. workspace에 install 후 token 복사

     

    5. 채널에 봇 추가

    원하는 채널에 봇을 추가해준다.

    ("** FAILED: not_in_channel" 오류 방지)

    /INVITE @MY_APP_NAME

     

    3. Slack Kafka Consumer

    자신의 token을 입력 후 테스트해본다.

    from slack import WebClient
    from slack.errors import SlackApiError
    
    from confluent_kafka import Consumer, KafkaError
    import json
    import time
    
    # Bot User OAuth Access Token
    # Scope = chat:write
    token = "TOKEN"
    
    sc = WebClient(token)
    
    # Set 'auto.offset.reset': 'smallest' if you want to consume all messages
    # from the beginning of the topic
    settings = {
        "bootstrap.servers": "localhost:9092",
        "group.id": "python_kafka_notify.py",
        "default.topic.config": {"auto.offset.reset": "largest"},
    }
    c = Consumer(settings)
    
    # Topic = "SLACK-KAFKA"
    c.subscribe(["SLACK-KAFKA"])
    
    try:
        while True:
            msg = c.poll(0.1)
            time.sleep(10)
            if msg is None:
                continue
            elif not msg.error():
                print("Received a message: {0}".format(msg.value()))
                if msg.value() is None:
                    print("But the message value is empty.")
                    continue
    
                try:
                    app_msg = json.loads(msg.value().decode())
                except:
                    app_msg = json.loads(msg.value())
    
                email = app_msg["EMAIL"]
                message = app_msg["MESSAGE"]
                channel = "kafka"
                text = (
                    "`%s` just left a bad review :disappointed:\n> %s\n\n_Please contact them immediately and see if we can fix the issue *right here, right now*_"
                    % (email, message)
                )
                print('\nSending message "%s" to channel %s' % (text, channel))
                
                # 아래가 Slack API로 message 작성하는 코드
                try:
                    sc_response = sc.chat_postMessage(
                        channel=channel, text=text, as_user=True, username="댓글 봇"
                    )  # as_user은 new slack app에서 작동 안 함
                except SlackApiError as e:
                    assert e.response["ok"] is False
                    print("\t** FAILED: %s" % e.response["error"])
    
            elif msg.error().code() == KafkaError._PARTITION_EOF:
                print(
                    "End of partition reached {0}/{1}".format(msg.topic(), msg.partition())
                )
            else:
                print("Error occured: {0}".format(msg.error().str()))
    
    except Exception as e:
        print(e)
        print(dir(e))
        
    except KeyboardInterrupt:
        print("Pressed CTRL+C...")
    
    finally:
        c.close()
    

     

    4. 결과

    파이썬 코드를 실행 후, producer로 데이터를 전송한다.

     

    토픽은 아래와 같이 만들었다. (복제 계수 = 1, 파티션 = 3)

    kafka-topics --bootstrap-server localhost:9092 create --topic SLACK-KAFKA --create --partitions 3 --replication-factor 1

     

    예제 데이터

    {"CLUB_STATUS":"platinum","EMAIL":"ikr@kakao.com","STARS":1,"MESSAGE":"Exceeded all my expectations!"}

     

    오른쪽은 producer, 왼쪽은 consumer이다.

     

    VSCode 실행 결과 창

     

    kafka 채널

     

     

    Kafka 공부 Github (Java, Python)

     

    Alfex4936/kafka-Studies

    Apache Kafka 공부한 걸 Github에 정리

    github.com

     

    728x90

    댓글