ABOUT ME

-

Total
-
  • Python: luigi 배치 job 파이프라인 생성기
    컴퓨터/파이썬 2020. 11. 21. 22:02
    728x90
    반응형

    Luigi by Spotify

     

    spotify/luigi

    Luigi is a Python module that helps you build complex pipelines of batch jobs.

    github.com

     

    1. 소개

    task 구조

    luigi[루이지]는 배치 job들의 파이프라인을 만들 수 있게 도와주는 라이브러리이다.

    주목적은 오래 실행되는 배치 프로세스들을 처리하기 위함인데,

    아파치 Hadoop처럼 데이터베이스에 데이터를 저장/추출하거나, 머신 러닝 알고리즘을 돌리거나,

    이 배치 job은 아무거나 될 수 있다.

     

    장점

    • 파이썬이므로 어떤 작업이든 쉽게 만들 수 있다.
    • 파이프라인을 거꾸로 돌기 때문에 태스크를 재실행하기보다, 실패한 테스크로부터 돌릴 수 있다.
    • 아마존 ECS로 바로 작업할 수 있는 AWS 배치 wrapper
    • GUI 포함
    # 파이프라인
    Start -> Node A -> Node B -> Node C -> End
    
    # luigi에선
    Start -> Node C -> Node B -> Node A -> Node B -> Node C -> End

     

    단점

    • 테스트가 어렵다.
    • 중앙 스케쥴러가 있어서 병렬 작업을 다루기 어렵다.
    • 트리거가 없어서, cronjob을 만들어서 파이프라인을 시작해야 한다.

     

    visualizer 기능 포함

     

    2. 사용법

    예제로 만들어 볼 것은, 공식 문서에 있는 예제를 수정한 버전이다.

    테스크: Count Letters, Generate Words

    pip install luigi
    pip install Faker
    

     

    1. CountLetters Task는 GenerateWords를 필요로 한다.
    2. GenerateWords는 랜덤 미국식 이름으로 이루어진 크기 1,000의 배열을 txt에 저장한다.
    3. CountLetters는 위 생성된 txt 이름들을 읽어서 각각 이름의 길이를 계산한다. (공백 제거)

    결과 미리보기

     

    CountLetters Task

    import luigi
    
    class CountLetters(luigi.Task):
        def requires(self):
            return GenerateWords()  # txt 파일을 먼저 만들어야 함
    
        def output(self):  # temp_letter_counts.txt에 저장할 것임
            return luigi.LocalTarget("temp_letter_counts.txt")
    
        def run(self):  # 테스크 실행
            # 줄마다 잘라서 배열에 저장
            with self.input().open("r") as infile:
                words = infile.read().splitlines()
                
            # 줄마다 이름 길이 계산
            with self.output().open("w") as outfile:
                for word in words:
                    outfile.write(
                        "{word} | {letter_count}\n".format(
                            word=word, letter_count=self.counter(word)
                        )
                    )  # 예) "Sabrina Bates | 12" 형식으로 output
    
        @staticmethod
        def counter(word): # 공백 제거 후 길이 return
            word = word.replace(" ", "")
            return len(word)
    

     

    GenerateWords Task

    from faker import Faker
    
    
    class GenerateWords(luigi.Task):
        def output(self):
            return luigi.LocalTarget("temp_words.txt")
            # temp_words.txt에 저장
            # 자동으로 CountLetters의 input이 된다.
    
        def run(self):
            fake = Faker()
            names = [fake.name() for _ in range(1000)]  # faker를 통한 미국식 이름 랜덤 생성
            with self.output().open("w") as f:
                for name in names:
                    f.write("{word}\n".format(word=name))

     

    3. 실행법

    콘솔에서 아래를 입력

    python -m luigi --module file_name TaskName --local-schedule

     

    잘 안되거나, localhost에서 실행해보고 싶으면, 아래로 먼저 포트를 열고, 서버를 실행한다.

    luigid --port 8082

    CountLetters 테스크 실행

    python mytask.py CountLetters --scheduler-host localhost

     

     

    참고

    luigi 공식 문서 @링크

    728x90

    댓글