-
Kafka Slack API Producer 만들기 (Python)컴퓨터/Kafka 2020. 12. 3. 15:56728x90반응형
Kafka
예제 프로젝트
이전 글: Kafka Slack API Consumer 만들기
1. 소개
이 예제에서 만들 결과물은 다음과 같다.
Producer를 통해 이전 글(Consumer)에게, 매 5초마다 Slack 채널 "일반"에서 메시지를 읽고,
누군가 글을 남겼는데 "bug"란 단어를 포함한 글이면, json으로 dump 시켜서 Consumer에게 전송한다.
{"USER": "ikr", "TEXT": "I found a bug, where I can copy my items over and over."}
그러면, Consumer는 자동으로 USER과 TEXT를 읽어서,
내가 원하는 Slack 채널에 다음과 같이 메시지를 남겨준다.
2. Slack API Token
"KafkaTest"란 봇을 만들어서 "kafka" 채널에 추가하고,
kafka에 데이터가 전송되면, Consumer로 이 데이터를 json 파싱 하여,
봇이 자동으로 위 메시지를 남겨주는 시스템을 만들 것이다.
1. 우선, Slack App을 만들어준다. @링크
2. 앱 생성 후, 앱을 클릭하여 OAuth & Permissions에 들어간다.
3. scope 추가
이 Producer는 (users:read, channels:history, channels:read, im:history, mpim:history)
위 scopes들을 사용할 것이다.
4. workspace에 install 후 token 복사
5. 채널에 봇 추가
유저가 있는 채널과 컨슈머가 메시지를 보낼 채널에 봇을 추가해준다.
("** FAILED: not_in_channel" 오류 방지)
/INVITE @MY_APP_NAME
3. Slack Kafka Producer
자신의 token을 입력 후 테스트해본다.
import json import time from config import Config from confluent_kafka import Producer from slack import WebClient from slack.errors import SlackApiError # Bot User OAuth Access Token token = "" # Slack API 초기화 sc = WebClient(token) # Kafka Producer 만들기 "localhost:9092" settings = {"bootstrap.servers": Config.MY_SERVER} p = Producer(settings) def acked(err, msg): # callback if err is not None: print("Failed to deliver message: {0}: {1}".format(msg.value(), err.str())) else: print("Message produced: {0}".format(msg.value())) # binary channel = "C01FVD0QD42" # 아래 sc.conversations_list로 id를 확인 # channel_name = "일반" # try: # sc_response = sc.conversations_list(channel=channel) # # for channel in sc_response["channels"]: # # print(channel["name"]) # # if channel["name"] == channel_name: # # channel_id = channel["id"] # except SlackApiError as e: # assert e.response["ok"] is False # print("\t** FAILED: %s" % e.response["error"]) posts = {} # 켤 때마다 중복 메시지 받음, 파일에 저장하는 형식으로 하면 더 나음. # 매 5초마다 메시지를 계속 읽어옴. # ratelimited 에러가 발생하면, 시간대를 늘려야 함. try: time.sleep(5) while True: try: sc_response = sc.conversations_history(channel=channel) for msg in sc_response["messages"]: if msg["ts"] not in posts: # 없는 메시지 posts[msg["ts"]] = True if "bug" in msg["text"].lower(): # bug를 포함한 글임 print("Someone posted a bug...") name = sc.users_info(user=msg["user"])["user"][ "name" ] # user id를 name으로 변환 data = {"USER": name, "TEXT": msg["text"]} # 데이터 Consumer에게 전송 p.produce( Config.SLACK_TOPID_ID, value=json.dumps(data), callback=acked, ) p.poll(0.5) else: # 파일에 저장할 수도 continue except SlackApiError as e: assert e.response["ok"] is False print("\t** FAILED: %s" % e.response["error"]) except Exception as e: print(type(e)) print(dir(e)) finally: print("Exiting...") p.flush(100)
4. 결과
powershell 창을 2개 띄운 후 (Producer과 Consumer), 작동을 확인한다.
#일반 채널에서 글을 올림
#kafka 채널 결과
Kafka 공부 Github (Java, Python)
728x90'컴퓨터 > Kafka' 카테고리의 다른 글
Kafdrop: Kafka 웹 UI (0) 2020.12.04 Kafka Slack API Consumer 만들기 (Python) (0) 2020.12.01 [#3] Kafka 이론 (Producer, Consumer, Zookeeper) (0) 2020.11.29