brunch

You can make anything
by writing

C.S.Lewis

by 유윤식 Mar 21. 2021

Python:ProcessPoolExecutor

#multi-process

예를 들어보면.

10GB 파일을 쓴다고 가정하자.


라인 by 라인 형태로 데이터를 쓴다. JSON 형태의 데이터를 하나씩 하나씩.

몇 시간이나 걸릴까?

약 3시간 정도 걸린다.


대략 코드를 이렇게 만들어보면,


with gzip.open('/data/candidates_cnt.gz', 'wb') as fw:

        for idx, (cand, score) in enumerate(zip(cands, scores)):

        candidates_ = []

            for c, s in zip(cand, score):

                try:

                    if s >= 0.3 and c != 0:

                        candidates_.append(

                            {'track_id': int(index_to_word2[str(track)]),

                             'score': score

                             }

                        )

                    else:

                        break

                except Exception as e:

                    continue

            if len(candidates_) >= 50:

                txt = json.dumps({'key': custom_dict[idx], 'value': candidates_})

                fw.write(txt.encode())

                fw.write('\n'.encode())


두번의 for 구문.

딱 봐도 오래걸릴 것 같이 생겼다.


코드상의 로직을 대충 훑어만봐도 알 수 있지만 JSON 형태를 뭔가 파일에 적는 형태로 진행된다.

이를 또 gzip 형태로 바로 써내려가는 with 구문도 볼 수 있다.


파이썬(Python) 제공하는 ProcessPoolExecutor

파이썬은 이를 활용해서 마법같은 퍼포먼스와 실행시간을 단축 할 수 있다고 설명한다.


실제 프로젝트에 적용한 결과,

20분 내로 작업을 마칠 수 있었다.


일단 8개의 Core 를 사용했다. 전체 적어야하는 라인의 수가 1,000,000 이라고 가정하면,

Tensorflow 학습을 시킬때와 비슷하게 batch 를 정해서 하나의 프로세스에서 5,000 라인씩 하나의 파일에 적어나갈 수 있도록 로직을 바꾸었다.


그럼 총 200개의 *.gz 파일이 생성된다.

파이썬의 shell 명령어로 cat *.gz > final_res.gz 를 생성하면 5초안에 전체 200개 파일이 하나의 gz 파일로 생성된다.


대략 코드를 보면,


from concurrent.futures import ProcessPoolExecutor


with ProcessPoolExecutor(max_workers=8) as ppexec:

    ppe_res = []

    for i in range(len(total_cands_data)//_batch+1):

        print(i, ' count is running / ', len(total_cands_data)//_batch+1)

        ppe_res.append(ppexec.submit(write_candidates

                                     str(i*_batch),

                                     total_cands_data[i*_batch:(i+1)*_batch], total_scores_data[i*_batch:(i+1)*_batch]))


    for ppe in concurrent.futures.as_completed(ppe_res):

        try:

            data = ppe.result()

        except Exception as e:

            print(e)

        else:

            print(data)


num_workers 는 8개로 지정한다. 더 많은 Resource를 운영하는 환경에서는 더 많은 수를 지정 할 수 있다.


중요한 부분은 submit 을 통해서 필요한 JOB을 넘겨서 멀티프로세스를 구현한다.

Param 은 작업을 실행하는 주체(함수)와 그 함수에 필요한 파라미터로 구성된다.


다행히(?) 문서가 길지 않아서 금방 읽고 이해할 수 있으며,

쉽게 구현해 볼 수 있도록 친절한 설명을 제공한다.


https://docs.python.org/3/library/concurrent.futures.html



"3시간 작업 --> 30분 작업"

내 프로젝트에 적용시킬 수 있는지 파악하는게 우선이다.

잘 쓰면 너무 유용하고 다음 작업들이 밀리지 않고 빠르게 진행시킬 수 있다.

 

브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari