ABOUT ME

-

Total
-
  • Python Kafka: Cloud Karafka 이용하기
    컴퓨터/Kafka 2020. 12. 24. 12:14
    728x90
    반응형

    CloudKarafka

     

    CloudKarafka

    TYou will, among other things, understand how you can benefit from using Apache Kafka in your architecture and how to optimize your Kafka Cluster.

    www.cloudkarafka.com

     

    1. 소개

    CloudKarafka는 Google cloud나 AWS처럼 클라우드 서버를 제공하는데, Kafka 클러스터만을 위한 서버이다.

    무료 버전은 최대 5개 토픽, 토픽 당 최대 10MB 데이터, 28일 데이터 보유 기간이다.

     

    (최고 옵션은 매 달 1200달러)

    최고 옵션
    무료 버전

     

    2. 서버 만들기

    원하는 플랜으로 카프카 서버를 만든다. @플랜 링크

     

    Plans - CloudKarafka

    Free Shared Plan For testing and development we offer a multi tenant Kafka server on a shared cluster.

    www.cloudkarafka.com

     

    무료 기준으로, GCD 미국 서버 1개, AWS 미국 서버 2개밖에 선택이 안 된다.

    클러스터 이름 설정 및 제일 마음에 드는 서버를 선택을 하면 바로 서버가 만들어진다.

    서버 선택

     

    서버

    무료 기준으로 토픽 만들기, 데이터 주고/받기 기능밖에 안 된다.

     

    토픽 만들기

    토픽은 username-이름으로 밖에 설정이 안 된다.

    적당한 파티션과 replicas를 고른 후 진행

     

    테스트 데이터 만들기

    Browser로 들어간 후 topic을 입력하고 오른쪽 Producer로 데이터를 보내면

    Consumer가 정상 작동하는 모습을 볼 수 있다.

     

    3. 파이썬으로 이용하기

    온라인에서 데이터를 계속 보낼 수도 없으니, python/nodejs/go 등 다양한 언어로 예제가 있다.

    로컬 카프카 공부하는 사람이라면, 서버를 계속 실행하고 있지 않아도 돼서 편하다.

     

    우선 시스템 변수 설정에서 CLOUDKARAFKA 변수들을 만들어준다. (설정 없이 바로 입력해도 상관 X)

    (변수 값들은 자신의 서버 - Details에 나온다.)

     

    공식 예제

    Producer

    CloudKarafka 프로토콜은 SASL_SSL을 사용한다.

    import sys
    import os
    
    from confluent_kafka import Producer
    
    if __name__ == '__main__':
        topic = os.environ['CLOUDKARAFKA_TOPIC'].split(",")[0]
    
        # Consumer configuration
        # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
        conf = {
            'bootstrap.servers': os.environ['CLOUDKARAFKA_BROKERS'],
            'session.timeout.ms': 6000,
            'default.topic.config': {'auto.offset.reset': 'smallest'},
            'security.protocol': 'SASL_SSL',
    	    'sasl.mechanisms': 'SCRAM-SHA-256',
            'sasl.username': os.environ['CLOUDKARAFKA_USERNAME'],
            'sasl.password': os.environ['CLOUDKARAFKA_PASSWORD']
        }
    
        p = Producer(**conf)
    
        def delivery_callback(err, msg):
            if err:
                sys.stderr.write('%% Message failed delivery: %s\n' % err)
            else:
                sys.stderr.write('%% Message delivered to %s [%d]\n' %
                                 (msg.topic(), msg.partition()))
    
        for line in sys.stdin:
            try:
                p.produce(topic, line.rstrip(), callback=delivery_callback)
            except BufferError as e:
                sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
                                 len(p))
            p.poll(0)
    
        sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
        p.flush()

     

    Consumer 

    import sys
    import os
    
    from confluent_kafka import Consumer, KafkaException, KafkaError
    
    if __name__ == '__main__':
        topics = os.environ['CLOUDKARAFKA_TOPIC'].split(",")
    
        # Consumer configuration
        # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
        conf = {
            'bootstrap.servers': os.environ['CLOUDKARAFKA_BROKERS'],
            'group.id': "%s-consumer" % os.environ['CLOUDKARAFKA_USERNAME'],
            'session.timeout.ms': 6000,
            'default.topic.config': {'auto.offset.reset': 'smallest'},
            'security.protocol': 'SASL_SSL',
    	'sasl.mechanisms': 'SCRAM-SHA-256',
            'sasl.username': os.environ['CLOUDKARAFKA_USERNAME'],
            'sasl.password': os.environ['CLOUDKARAFKA_PASSWORD']
        }
    
        c = Consumer(**conf)
        c.subscribe(topics)
        try:
            while True:
                msg = c.poll(timeout=1.0)
                if msg is None:
                    continue
                if msg.error():
                    # Error or event
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        # End of partition event
                        sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                         (msg.topic(), msg.partition(), msg.offset()))
                    elif msg.error():
                        # Error
                        raise KafkaException(msg.error())
                else:
                    # Proper message
                    sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                     (msg.topic(), msg.partition(), msg.offset(),
                                      str(msg.key())))
                    print(msg.value())
    
        except KeyboardInterrupt:
            sys.stderr.write('%% Aborted by user\n')
    
        # Close down consumer to commit final offsets.
        c.close()

     

    728x90

    댓글