#multi-process
예를 들어보면.
10GB 파일을 쓴다고 가정하자.
라인 by 라인 형태로 데이터를 쓴다. JSON 형태의 데이터를 하나씩 하나씩.
몇 시간이나 걸릴까?
약 3시간 정도 걸린다.
대략 코드를 이렇게 만들어보면,
with gzip.open('/data/candidates_cnt.gz', 'wb') as fw:
for idx, (cand, score) in enumerate(zip(cands, scores)):
candidates_ = []
for c, s in zip(cand, score):
try:
if s >= 0.3 and c != 0:
candidates_.append(
{'track_id': int(index_to_word2[str(track)]),
'score': score
}
)
else:
break
except Exception as e:
continue
if len(candidates_) >= 50:
txt = json.dumps({'key': custom_dict[idx], 'value': candidates_})
fw.write(txt.encode())
fw.write('\n'.encode())
두번의 for 구문.
딱 봐도 오래걸릴 것 같이 생겼다.
코드상의 로직을 대충 훑어만봐도 알 수 있지만 JSON 형태를 뭔가 파일에 적는 형태로 진행된다.
이를 또 gzip 형태로 바로 써내려가는 with 구문도 볼 수 있다.
파이썬(Python) 제공하는 ProcessPoolExecutor
파이썬은 이를 활용해서 마법같은 퍼포먼스와 실행시간을 단축 할 수 있다고 설명한다.
실제 프로젝트에 적용한 결과,
20분 내로 작업을 마칠 수 있었다.
일단 8개의 Core 를 사용했다. 전체 적어야하는 라인의 수가 1,000,000 이라고 가정하면,
Tensorflow 학습을 시킬때와 비슷하게 batch 를 정해서 하나의 프로세스에서 5,000 라인씩 하나의 파일에 적어나갈 수 있도록 로직을 바꾸었다.
그럼 총 200개의 *.gz 파일이 생성된다.
파이썬의 shell 명령어로 cat *.gz > final_res.gz 를 생성하면 5초안에 전체 200개 파일이 하나의 gz 파일로 생성된다.
대략 코드를 보면,
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=8) as ppexec:
ppe_res = []
for i in range(len(total_cands_data)//_batch+1):
print(i, ' count is running / ', len(total_cands_data)//_batch+1)
ppe_res.append(ppexec.submit(write_candidates,
str(i*_batch),
total_cands_data[i*_batch:(i+1)*_batch], total_scores_data[i*_batch:(i+1)*_batch]))
for ppe in concurrent.futures.as_completed(ppe_res):
try:
data = ppe.result()
except Exception as e:
print(e)
else:
print(data)
num_workers 는 8개로 지정한다. 더 많은 Resource를 운영하는 환경에서는 더 많은 수를 지정 할 수 있다.
중요한 부분은 submit 을 통해서 필요한 JOB을 넘겨서 멀티프로세스를 구현한다.
Param 은 작업을 실행하는 주체(함수)와 그 함수에 필요한 파라미터로 구성된다.
다행히(?) 문서가 길지 않아서 금방 읽고 이해할 수 있으며,
쉽게 구현해 볼 수 있도록 친절한 설명을 제공한다.
https://docs.python.org/3/library/concurrent.futures.html
"3시간 작업 --> 30분 작업"
내 프로젝트에 적용시킬 수 있는지 파악하는게 우선이다.
잘 쓰면 너무 유용하고 다음 작업들이 밀리지 않고 빠르게 진행시킬 수 있다.