ABOUT ME

-

  • Python: luigi 배치 job 파이프라인 생성기
    컴퓨터/파이썬 2020. 11. 21. 22:02
    728x90
    반응형

    Luigi by Spotify

     

    spotify/luigi

    Luigi is a Python module that helps you build complex pipelines of batch jobs.

    github.com

     

    1. 소개

    task 구조

    luigi[루이지]는 배치 job들의 파이프라인을 만들 수 있게 도와주는 라이브러리이다.

    주목적은 오래 실행되는 배치 프로세스들을 처리하기 위함인데,

    아파치 Hadoop처럼 데이터베이스에 데이터를 저장/추출하거나, 머신 러닝 알고리즘을 돌리거나,

    이 배치 job은 아무거나 될 수 있다.

     

    장점

    • 파이썬이므로 어떤 작업이든 쉽게 만들 수 있다.
    • 파이프라인을 거꾸로 돌기 때문에 태스크를 재실행하기보다, 실패한 테스크로부터 돌릴 수 있다.
    • 아마존 ECS로 바로 작업할 수 있는 AWS 배치 wrapper
    • GUI 포함
    python
    # 파이프라인 Start -> Node A -> Node B -> Node C -> End # luigi에선 Start -> Node C -> Node B -> Node A -> Node B -> Node C -> End

     

    단점

    • 테스트가 어렵다.
    • 중앙 스케쥴러가 있어서 병렬 작업을 다루기 어렵다.
    • 트리거가 없어서, cronjob을 만들어서 파이프라인을 시작해야 한다.

     

    visualizer 기능 포함

     

    2. 사용법

    예제로 만들어 볼 것은, 공식 문서에 있는 예제를 수정한 버전이다.

    테스크: Count Letters, Generate Words

    powershell
    pip install luigi pip install Faker

     

    1. CountLetters Task는 GenerateWords를 필요로 한다.
    2. GenerateWords는 랜덤 미국식 이름으로 이루어진 크기 1,000의 배열을 txt에 저장한다.
    3. CountLetters는 위 생성된 txt 이름들을 읽어서 각각 이름의 길이를 계산한다. (공백 제거)

    결과 미리보기

     

    CountLetters Task

    python
    import luigi class CountLetters(luigi.Task): def requires(self): return GenerateWords() # txt 파일을 먼저 만들어야 def output(self): # temp_letter_counts.txt 저장할 것임 return luigi.LocalTarget("temp_letter_counts.txt") def run(self): # 테스크 실행 # 줄마다 잘라서 배열에 저장 with self.input().open("r") as infile: words = infile.read().splitlines() # 줄마다 이름 길이 계산 with self.output().open("w") as outfile: for word in words: outfile.write( "{word} | {letter_count}\n".format( word=word, letter_count=self.counter(word) ) ) # ) "Sabrina Bates | 12" 형식으로 output @staticmethod def counter(word): # 공백 제거 길이 return word = word.replace(" ", "") return len(word)

     

    GenerateWords Task

    python
    from faker import Faker class GenerateWords(luigi.Task): def output(self): return luigi.LocalTarget("temp_words.txt") # temp_words.txt 저장 # 자동으로 CountLetters input 된다. def run(self): fake = Faker() names = [fake.name() for _ in range(1000)] # faker 통한 미국식 이름 랜덤 생성 with self.output().open("w") as f: for name in names: f.write("{word}\n".format(word=name))

     

    3. 실행법

    콘솔에서 아래를 입력

    powershell
    python -m luigi --module file_name TaskName --local-schedule

     

    잘 안되거나, localhost에서 실행해보고 싶으면, 아래로 먼저 포트를 열고, 서버를 실행한다.

    powershell
    luigid --port 8082

    CountLetters 테스크 실행

    powershell
    python mytask.py CountLetters --scheduler-host localhost

     

     

    참고

    luigi 공식 문서 @링크

    728x90

    댓글