-
Kafka Slack API Consumer 만들기 (Python)컴퓨터/Kafka 2020. 12. 1. 19:31728x90반응형
Kafka
예제 프로젝트
1. 소개
이 예제에서 만들 결과물은 다음과 같다.
Producer CLI를 통해서, SLACK-KAFKA 토픽에게 아래의 json 데이터를 보냈다.
(Producer를 만들어서 전송해도 된다.)
{"CLUB_STATUS":"platinum","EMAIL":"ikr@kakao.com","STARS":1,"MESSAGE":"Exceeded all my expectations!"}
그러면, Consumer는 자동으로 EMAIL과 MESSAGE를 읽어서,
내가 원하는 Slack 채널에 다음과 같이 메시지를 남겨준다.
("EMAIL"이 나쁜 리뷰를 남겼어요 :(, 이 문제를 당장 해결할 수 있는지 확인해주세요!)
2. Slack API Token
"KafkaTest"란 봇을 만들어서 "kafka" 채널에 추가하고,
kafka에 데이터가 전송되면, Consumer로 이 데이터를 json 파싱 하여,
봇이 자동으로 위 메시지를 남겨주는 시스템을 만들 것이다.
1. 우선, Slack App을 만들어준다. @링크
2. 앱 생성 후, 앱을 클릭하여 OAuth & Permissions에 들어간다.
3. chat:write scope 추가
(**FAILED: missing_scope 오류 방지)
4. workspace에 install 후 token 복사
5. 채널에 봇 추가
원하는 채널에 봇을 추가해준다.
("** FAILED: not_in_channel" 오류 방지)
/INVITE @MY_APP_NAME
3. Slack Kafka Consumer
자신의 token을 입력 후 테스트해본다.
from slack import WebClient from slack.errors import SlackApiError from confluent_kafka import Consumer, KafkaError import json import time # Bot User OAuth Access Token # Scope = chat:write token = "TOKEN" sc = WebClient(token) # Set 'auto.offset.reset': 'smallest' if you want to consume all messages # from the beginning of the topic settings = { "bootstrap.servers": "localhost:9092", "group.id": "python_kafka_notify.py", "default.topic.config": {"auto.offset.reset": "largest"}, } c = Consumer(settings) # Topic = "SLACK-KAFKA" c.subscribe(["SLACK-KAFKA"]) try: while True: msg = c.poll(0.1) time.sleep(10) if msg is None: continue elif not msg.error(): print("Received a message: {0}".format(msg.value())) if msg.value() is None: print("But the message value is empty.") continue try: app_msg = json.loads(msg.value().decode()) except: app_msg = json.loads(msg.value()) email = app_msg["EMAIL"] message = app_msg["MESSAGE"] channel = "kafka" text = ( "`%s` just left a bad review :disappointed:\n> %s\n\n_Please contact them immediately and see if we can fix the issue *right here, right now*_" % (email, message) ) print('\nSending message "%s" to channel %s' % (text, channel)) # 아래가 Slack API로 message 작성하는 코드 try: sc_response = sc.chat_postMessage( channel=channel, text=text, as_user=True, username="댓글 봇" ) # as_user은 new slack app에서 작동 안 함 except SlackApiError as e: assert e.response["ok"] is False print("\t** FAILED: %s" % e.response["error"]) elif msg.error().code() == KafkaError._PARTITION_EOF: print( "End of partition reached {0}/{1}".format(msg.topic(), msg.partition()) ) else: print("Error occured: {0}".format(msg.error().str())) except Exception as e: print(e) print(dir(e)) except KeyboardInterrupt: print("Pressed CTRL+C...") finally: c.close()
4. 결과
파이썬 코드를 실행 후, producer로 데이터를 전송한다.
토픽은 아래와 같이 만들었다. (복제 계수 = 1, 파티션 = 3)
kafka-topics --bootstrap-server localhost:9092 create --topic SLACK-KAFKA --create --partitions 3 --replication-factor 1
예제 데이터
{"CLUB_STATUS":"platinum","EMAIL":"ikr@kakao.com","STARS":1,"MESSAGE":"Exceeded all my expectations!"}
Kafka 공부 Github (Java, Python)
728x90'컴퓨터 > Kafka' 카테고리의 다른 글
Kafka Slack API Producer 만들기 (Python) (0) 2020.12.03 [#3] Kafka 이론 (Producer, Consumer, Zookeeper) (0) 2020.11.29 [#2] Kafka 이론 (topic, broker) (0) 2020.11.28