ABOUT ME

-

  • Kafka Slack API Producer 만들기 (Python)
    컴퓨터/Kafka 2020. 12. 3. 15:56
    728x90
    반응형

    Kafka

    예제 프로젝트

     

    Apache Kafka

    A Distributed Streaming Platform.

    kafka.apache.org

    이전 글: Kafka Slack API Consumer 만들기


     

    1. 소개

    이 예제에서 만들 결과물은 다음과 같다.

    Producer를 통해 이전 글(Consumer)에게, 매 5초마다 Slack 채널 "일반"에서 메시지를 읽고,

    누군가 글을 남겼는데 "bug"란 단어를 포함한 글이면, json으로 dump 시켜서 Consumer에게 전송한다.

    json
    {"USER": "ikr", "TEXT": "I found a bug, where I can copy my items over and over."}

     

    그러면, Consumer는 자동으로 USER과 TEXT를 읽어서,

    버그 찾았네요, 제 아이템 계속 복사 가능합니다.

     

    내가 원하는 Slack 채널에 다음과 같이 메시지를 남겨준다.

    유저 "ikr"이 버그를 찾았네요: 메시지, 지금 고칠 수 있는지 확인해주세요.

     

    2. Slack API Token

    "KafkaTest"란 봇을 만들어서 "kafka" 채널에 추가하고,

    kafka에 데이터가 전송되면, Consumer로 이 데이터를 json 파싱 하여,

    봇이 자동으로 위 메시지를 남겨주는 시스템을 만들 것이다.

     

    1. 우선, Slack App을 만들어준다. @링크

     

    2. 앱 생성 후, 앱을 클릭하여 OAuth & Permissions에 들어간다.

    3. scope 추가

    이 Producer는 (users:read, channels:history, channels:read, im:history, mpim:history)

    위 scopes들을 사용할 것이다.

     

     

    4. workspace에 install 후 token 복사

     

    5. 채널에 봇 추가

    유저가 있는 채널과 컨슈머가 메시지를 보낼 채널에 봇을 추가해준다.

    ("** FAILED: not_in_channel" 오류 방지)

    powershell
    /INVITE @MY_APP_NAME

     

    3. Slack Kafka Producer

    자신의 token을 입력 후 테스트해본다.

    python
    import json import time from config import Config from confluent_kafka import Producer from slack import WebClient from slack.errors import SlackApiError # Bot User OAuth Access Token token = "" # Slack API 초기화 sc = WebClient(token) # Kafka Producer 만들기 "localhost:9092" settings = {"bootstrap.servers": Config.MY_SERVER} p = Producer(settings) def acked(err, msg): # callback if err is not None: print("Failed to deliver message: {0}: {1}".format(msg.value(), err.str())) else: print("Message produced: {0}".format(msg.value())) # binary channel = "C01FVD0QD42" # 아래 sc.conversations_list id 확인 # channel_name = "일반" # try: # sc_response = sc.conversations_list(channel=channel) # # for channel in sc_response["channels"]: # # print(channel["name"]) # # if channel["name"] == channel_name: # # channel_id = channel["id"] # except SlackApiError as e: # assert e.response["ok"] is False # print("\t** FAILED: %s" % e.response["error"]) posts = {} # 때마다 중복 메시지 받음, 파일에 저장하는 형식으로 하면 나음. # 5초마다 메시지를 계속 읽어옴. # ratelimited 에러가 발생하면, 시간대를 늘려야 . try: time.sleep(5) while True: try: sc_response = sc.conversations_history(channel=channel) for msg in sc_response["messages"]: if msg["ts"] not in posts: # 없는 메시지 posts[msg["ts"]] = True if "bug" in msg["text"].lower(): # bug 포함한 글임 print("Someone posted a bug...") name = sc.users_info(user=msg["user"])["user"][ "name" ] # user id name으로 변환 data = {"USER": name, "TEXT": msg["text"]} # 데이터 Consumer에게 전송 p.produce( Config.SLACK_TOPID_ID, value=json.dumps(data), callback=acked, ) p.poll(0.5) else: # 파일에 저장할 수도 continue except SlackApiError as e: assert e.response["ok"] is False print("\t** FAILED: %s" % e.response["error"]) except Exception as e: print(type(e)) print(dir(e)) finally: print("Exiting...") p.flush(100)

     

    4. 결과

    powershell 창을 2개 띄운 후 (Producer과 Consumer), 작동을 확인한다.

     

    #일반 채널에서 글을 올림

    버그 찾았네요, 제 아이템 계속 복사 가능합니다.

     

    Producer.py

     

    Consumer.py

     

    #kafka 채널 결과

    유저 "ikr"이 버그를 찾았네요: 메시지, 지금 고칠 수 있는지 확인해주세요.

     

    Kafka 공부 Github (Java, Python)

     

    Alfex4936/kafka-Studies

    Apache Kafka 공부한 걸 Github에 정리

    github.com

     

    728x90

    '컴퓨터 > Kafka' 카테고리의 다른 글

    댓글