ABOUT ME

-

Total
-
  • Python Kafka: Avro 사용하기
    컴퓨터/Kafka 2020. 12. 23. 17:40
    728x90
    반응형

    Apache Avro

     

    Welcome to Apache Avro!

    Welcome to Apache Avro! Apache Avro™ is a data serialization system. To learn more about Avro, please read the current documenta

    avro.apache.org

     

    1. AVRO란

    Apache Kafka는 기본적으로 데이터를 binary형식으로 받아서, binary로 보낸다.

    (010100001..., 카프카 자체는 데이터를 확인하지 않는다. [0% CPU usage, zero copy])

     

    그렇기 때문에 만약 아래와 같은 상황이라면

    • Producer가 잘못된 데이터를 보낼 때
    • field 이름이 변경됐을 때
    • 데이터 포맷 형식을 바꿔버릴 때
    • 등등

     

    당연히, Consumer는 작동을 못하게 된다.

     

    그래서 데이터 자체가 자신을 설명할 수 있어야 하고,

    Consumer를 break하지 않고 데이터를 변경될 수 있게 만들어야 한다.

     

    그래서 schema(대략적인 계획이나 도식) 방식으로 avro가 나오게 됐다.

    (JSON 형식을 사용함)

     

    (Scheme를 사용할 때 프로그램 작동 방식)

    @udemy Stephane Maarek

     

    2. Python 사용법

    Python으로 avro를 사용할 때 크게 다음 3가지를 선택할 수 있다.

    1. avro by Apache

    2. fastavro (C언어로 작성한 avro)

    3. confluent avro (fastavro 기반 HTTPS 통신 schema)

     

    당연히 python보단 C언어로 작성된 버전이 빠르기 때문에, 이 글에선 fastavro로 예제를 만들 것이다.

     

    설치

    pip install fastavro

     

    스키마 예제

    schema는 avsc로 파일로 저장된 json이나 dictionary로 바로 파싱 할 수 있다.

    (아래는 dictionary로 schema 파싱)

    from fastavro import parse_schema
    
    
    schema = {  # avsc
        "namespace": "ajou.parser",
        "name": "Notice",  # namespace.name으로 변경된다.
        "doc": "A notice parser",
        "type": "record",
        "fields": [
            {"name": "id", "type": "int"},
            {"name": "title", "type": "string"},
            {"name": "date", "type": "string"},
            {"name": "link", "type": "string"},
            {"name": "writer", "type": "string"},
        ],
    }
    
    """ 아래 message를 보내는 것이랑 같다.
    message = {
        "id": 10000,
        "title": "[FastAVRO] 테스트 공지 제목",
        "date": "20.12.23",
        "link": "https://somelink",
        "writer": "alfex4936",
    }
    """
    parsed_schema = parse_schema(schema)  # avro schema로 변환

     

    avro 파일 write 하기

    # 'records' can be an iterable (including generator)
    records = [
        {
            "id": 10005,
            "title": "공지 1",
            "date": "2020-12-01",
            "link": "https://link=10005",
            "writer": "CSW",
        },
        {
            "id": 10006,
            "title": "공지 2",
            "date": "2020-12-02",
            "link": "https://link=10006",
            "writer": "CSW",
        },
        {
            "id": 10007,
            "title": "공지 3",
            "date": "2020-12-04",
            "link": "https://link=10007",
            "writer": "CSW",
        },
    ]
    
    BASE_DIR = os.path.dirname(os.path.abspath(__file__))
    AVRO_PATH = os.path.join(BASE_DIR, "test.avro")  # 현재 폴더에 test.avro 저장
    
    # Writing
    with open(AVRO_PATH, "wb") as out:
        writer(out, parsed_schema, records)
    

     

    avro 파일 read 하기

    # Reading
    with open(AVRO_PATH, "rb") as fo:
        for record in reader(fo):
            print(record)
            assert isinstance(record["id"], int)
            
            
     """ record는 dictionary로 출력
     
    {'id': 10005, 'title': '공지 1', 'date': '2020-12-01', 'link': 'https://link=10005', 'writer': 'CSW'}
    {'id': 10006, 'title': '공지 2', 'date': '2020-12-02', 'link': 'https://link=10006', 'writer': 'CSW'}
    {'id': 10007, 'title': '공지 3', 'date': '2020-12-04', 'link': 'https://link=10007', 'writer': 'CSW'}
     """

     

    Producer, Consumer 예제

    Producer로 하나의 avro record를 Consumer로 보내는 방법은 아래와 같다.

    from io import BytesIO
    from fastavro import parse_schema, schemaless_reader, writer
    
    
    # 아래와 같은 데이터가 있을 때
    message = {
        "id": 10000,
        "title": "[FastAVRO] 테스트 공지 제목",
        "date": "20.12.23",
        "link": "https://somelink",
        "writer": "alfex4936",
    }
    
    # Producer part
    producer_rb = BytesIO()  # 바이트로 변환
    schemaless_writer(producer_rb, parsed_schema, message)  # 1개의 data write
    produced_data = producer_rb.getvalue()
    producer.produce(produced_data)  # Producer
    
    ... 각각 다른 py
    
    # Consumer part
    # SIGINT can't be handled when polling, limit timeout to 1 second.
    msg = c.poll(1.0)  # consume data from producer
    
    message = msg.value()
    
    consumer_rb = BytesIO(message)
    decoded = schemaless_reader(consumer_rb, parsed_schema)  # 1개의 data read
    
    assert decoded == {
        "id": 10000,
        "title": "[FastAVRO] test title",
        "date": "20.12.23",
        "link": "https://somelink",
        "writer": "alfex4936",
    }
    728x90

    댓글