-
Python Kafka: Cloud Karafka 이용하기컴퓨터/Kafka 2020. 12. 24. 12:14728x90반응형
CloudKarafka
CloudKarafka
TYou will, among other things, understand how you can benefit from using Apache Kafka in your architecture and how to optimize your Kafka Cluster.
www.cloudkarafka.com
1. 소개
CloudKarafka는 Google cloud나 AWS처럼 클라우드 서버를 제공하는데, Kafka 클러스터만을 위한 서버이다.
무료 버전은 최대 5개 토픽, 토픽 당 최대 10MB 데이터, 28일 데이터 보유 기간이다.
(최고 옵션은 매 달 1200달러)
최고 옵션 무료 버전 2. 서버 만들기
원하는 플랜으로 카프카 서버를 만든다. @플랜 링크
Plans - CloudKarafka
Free Shared Plan For testing and development we offer a multi tenant Kafka server on a shared cluster.
www.cloudkarafka.com
무료 기준으로, GCD 미국 서버 1개, AWS 미국 서버 2개밖에 선택이 안 된다.
클러스터 이름 설정 및 제일 마음에 드는 서버를 선택을 하면 바로 서버가 만들어진다.
서버 선택 서버
무료 기준으로 토픽 만들기, 데이터 주고/받기 기능밖에 안 된다.
토픽 만들기
토픽은 username-이름으로 밖에 설정이 안 된다.
적당한 파티션과 replicas를 고른 후 진행
테스트 데이터 만들기
Browser로 들어간 후 topic을 입력하고 오른쪽 Producer로 데이터를 보내면
Consumer가 정상 작동하는 모습을 볼 수 있다.
3. 파이썬으로 이용하기
온라인에서 데이터를 계속 보낼 수도 없으니, python/nodejs/go 등 다양한 언어로 예제가 있다.
로컬 카프카 공부하는 사람이라면, 서버를 계속 실행하고 있지 않아도 돼서 편하다.
우선 시스템 변수 설정에서 CLOUDKARAFKA 변수들을 만들어준다. (설정 없이 바로 입력해도 상관 X)
(변수 값들은 자신의 서버 - Details에 나온다.)
공식 예제
Producer
CloudKarafka 프로토콜은 SASL_SSL을 사용한다.
import sys import os from confluent_kafka import Producer if __name__ == '__main__': topic = os.environ['CLOUDKARAFKA_TOPIC'].split(",")[0] # Consumer configuration # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md conf = { 'bootstrap.servers': os.environ['CLOUDKARAFKA_BROKERS'], 'session.timeout.ms': 6000, 'default.topic.config': {'auto.offset.reset': 'smallest'}, 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'SCRAM-SHA-256', 'sasl.username': os.environ['CLOUDKARAFKA_USERNAME'], 'sasl.password': os.environ['CLOUDKARAFKA_PASSWORD'] } p = Producer(**conf) def delivery_callback(err, msg): if err: sys.stderr.write('%% Message failed delivery: %s\n' % err) else: sys.stderr.write('%% Message delivered to %s [%d]\n' % (msg.topic(), msg.partition())) for line in sys.stdin: try: p.produce(topic, line.rstrip(), callback=delivery_callback) except BufferError as e: sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p)) p.poll(0) sys.stderr.write('%% Waiting for %d deliveries\n' % len(p)) p.flush()
Consumer
import sys import os from confluent_kafka import Consumer, KafkaException, KafkaError if __name__ == '__main__': topics = os.environ['CLOUDKARAFKA_TOPIC'].split(",") # Consumer configuration # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md conf = { 'bootstrap.servers': os.environ['CLOUDKARAFKA_BROKERS'], 'group.id': "%s-consumer" % os.environ['CLOUDKARAFKA_USERNAME'], 'session.timeout.ms': 6000, 'default.topic.config': {'auto.offset.reset': 'smallest'}, 'security.protocol': 'SASL_SSL', 'sasl.mechanisms': 'SCRAM-SHA-256', 'sasl.username': os.environ['CLOUDKARAFKA_USERNAME'], 'sasl.password': os.environ['CLOUDKARAFKA_PASSWORD'] } c = Consumer(**conf) c.subscribe(topics) try: while True: msg = c.poll(timeout=1.0) if msg is None: continue if msg.error(): # Error or event if msg.error().code() == KafkaError._PARTITION_EOF: # End of partition event sys.stderr.write('%% %s [%d] reached end at offset %d\n' % (msg.topic(), msg.partition(), msg.offset())) elif msg.error(): # Error raise KafkaException(msg.error()) else: # Proper message sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' % (msg.topic(), msg.partition(), msg.offset(), str(msg.key()))) print(msg.value()) except KeyboardInterrupt: sys.stderr.write('%% Aborted by user\n') # Close down consumer to commit final offsets. c.close()
728x90'컴퓨터 > Kafka' 카테고리의 다른 글
Apache Kafka 서버 보안 (security) (0) 2020.12.30 Python Kafka: Avro 사용하기 (2) 2020.12.23 Apache Kafka: local을 위한 명령어 인터페이스 (CLI) (0) 2020.12.18