ABOUT ME

-

Total
-
  • Kafka Slack API Producer 만들기 (Python)
    컴퓨터/Kafka 2020. 12. 3. 15:56
    728x90
    반응형

    Kafka

    예제 프로젝트

     

    Apache Kafka

    A Distributed Streaming Platform.

    kafka.apache.org

    이전 글: Kafka Slack API Consumer 만들기


     

    1. 소개

    이 예제에서 만들 결과물은 다음과 같다.

    Producer를 통해 이전 글(Consumer)에게, 매 5초마다 Slack 채널 "일반"에서 메시지를 읽고,

    누군가 글을 남겼는데 "bug"란 단어를 포함한 글이면, json으로 dump 시켜서 Consumer에게 전송한다.

    {"USER": "ikr", "TEXT": "I found a bug, where I can copy my items over and over."}

     

    그러면, Consumer는 자동으로 USER과 TEXT를 읽어서,

    버그 찾았네요, 제 아이템 계속 복사 가능합니다.

     

    내가 원하는 Slack 채널에 다음과 같이 메시지를 남겨준다.

    유저 "ikr"이 버그를 찾았네요: 메시지, 지금 고칠 수 있는지 확인해주세요.

     

    2. Slack API Token

    "KafkaTest"란 봇을 만들어서 "kafka" 채널에 추가하고,

    kafka에 데이터가 전송되면, Consumer로 이 데이터를 json 파싱 하여,

    봇이 자동으로 위 메시지를 남겨주는 시스템을 만들 것이다.

     

    1. 우선, Slack App을 만들어준다. @링크

     

    2. 앱 생성 후, 앱을 클릭하여 OAuth & Permissions에 들어간다.

    3. scope 추가

    이 Producer는 (users:read, channels:history, channels:read, im:history, mpim:history)

    위 scopes들을 사용할 것이다.

     

     

    4. workspace에 install 후 token 복사

     

    5. 채널에 봇 추가

    유저가 있는 채널과 컨슈머가 메시지를 보낼 채널에 봇을 추가해준다.

    ("** FAILED: not_in_channel" 오류 방지)

    /INVITE @MY_APP_NAME

     

    3. Slack Kafka Producer

    자신의 token을 입력 후 테스트해본다.

    import json
    import time
    
    from config import Config
    from confluent_kafka import Producer
    from slack import WebClient
    from slack.errors import SlackApiError
    
    
    # Bot User OAuth Access Token
    token = ""
    
    # Slack API 초기화
    sc = WebClient(token)
    
    # Kafka Producer 만들기  "localhost:9092"
    settings = {"bootstrap.servers": Config.MY_SERVER}
    p = Producer(settings)
    
    
    def acked(err, msg):  # callback
        if err is not None:
            print("Failed to deliver message: {0}: {1}".format(msg.value(), err.str()))
        else:
            print("Message produced: {0}".format(msg.value()))  # binary
    
    
    channel = "C01FVD0QD42"  # 아래 sc.conversations_list로 id를 확인
    
    
    # channel_name = "일반"
    # try:
    #     sc_response = sc.conversations_list(channel=channel)
    #     # for channel in sc_response["channels"]:
    #     #     print(channel["name"])
    #     # if channel["name"] == channel_name:
    #     #     channel_id = channel["id"]
    # except SlackApiError as e:
    #     assert e.response["ok"] is False
    #     print("\t** FAILED: %s" % e.response["error"])
    
    
    posts = {}  # 켤 때마다 중복 메시지 받음, 파일에 저장하는 형식으로 하면 더 나음.
    
    # 매 5초마다 메시지를 계속 읽어옴.
    # ratelimited 에러가 발생하면, 시간대를 늘려야 함.
    try:
        time.sleep(5)
        while True:
            try:
                sc_response = sc.conversations_history(channel=channel)
                for msg in sc_response["messages"]:
                    if msg["ts"] not in posts:  # 없는 메시지
                        posts[msg["ts"]] = True
                        if "bug" in msg["text"].lower():  # bug를 포함한 글임
                            print("Someone posted a bug...")
                            name = sc.users_info(user=msg["user"])["user"][
                                "name"
                            ]  # user id를 name으로 변환
                            data = {"USER": name, "TEXT": msg["text"]}
    
                            # 데이터 Consumer에게 전송
                            p.produce(
                                Config.SLACK_TOPID_ID,
                                value=json.dumps(data),
                                callback=acked,
                            )
                            p.poll(0.5)
                        else:
                            # 파일에 저장할 수도
                            continue
            except SlackApiError as e:
                assert e.response["ok"] is False
                print("\t** FAILED: %s" % e.response["error"])
    
    
    except Exception as e:
        print(type(e))
        print(dir(e))
    
    finally:
        print("Exiting...")
        p.flush(100)
    

     

    4. 결과

    powershell 창을 2개 띄운 후 (Producer과 Consumer), 작동을 확인한다.

     

    #일반 채널에서 글을 올림

    버그 찾았네요, 제 아이템 계속 복사 가능합니다.

     

    Producer.py

     

    Consumer.py

     

    #kafka 채널 결과

    유저 "ikr"이 버그를 찾았네요: 메시지, 지금 고칠 수 있는지 확인해주세요.

     

    Kafka 공부 Github (Java, Python)

     

    Alfex4936/kafka-Studies

    Apache Kafka 공부한 걸 Github에 정리

    github.com

     

    728x90

    '컴퓨터 > Kafka' 카테고리의 다른 글

    Kafdrop: Kafka 웹 UI  (0) 2020.12.04
    Kafka Slack API Consumer 만들기 (Python)  (0) 2020.12.01
    [#3] Kafka 이론 (Producer, Consumer, Zookeeper)  (0) 2020.11.29

    댓글