검색하기귀찮아서만든블로그

[python] multiprocessing 사용법 본문

개발

[python] multiprocessing 사용법

hellworld 2022. 4. 2. 15:38

[개요]

파이썬 스레드를 사용할 경우 멀티 코어를 활용하지 못하기 때문에 멀티 프로세싱 처리를 해야 한다.
싱글 코어를 사용한 스레딩 처리는 비동기 처리는 가능하지만 스트레스 테스트 환경에서 병목 현상을 발생시키기 때문에 멀티 프로세싱 처리를 해야 한다.

[코드]

아래 코드는 자식 프로세스 5개를 생성하고 자식 프로세스가 5초 동안 대기 후에 종료되는 로직이다.

자식 프로세스에서 처리한 결과를 수집하기 위해 multiprocessing 패키지는 다양한 오브젝트를 제공한다.
아래 코드에서는 Value, Array, Queue를 활용한 결과 수집과 Lock 오브젝트를 활용한 프로세스 동기화 예제를 구현하였다.

[중요]

- utf-8 코드 일 경우 상단에 # -*- coding: utf-8 -*- 추가.
. python2.7에서 한글 주석이 문제 발생시킴
- 자식 프로세스 처리 함수 상단에 signal.signal(signal.SIGINT, signal.SIG_IGN) 추가.
. 자식 프로세스는 Ctrl + C에 대한 키 이벤트 처리를 무시한다는 의미
. 사용자 의도에 의해 중단하더라도 자식 프로세스에서 마무리 작업을 수행할 수 있도록 처리하기 위함.
- 메인 문 최상단에 multiprocessing.freeze_support() 추가.
. 멀티 프로세스에서는 해당 메인 구문을 동작하지 않겠다는 의미

# -*- coding: utf-8 -*-
import multiprocessing
import signal
import ctypes
import time
import os

def ProcessFunction(processLock, processEvent, output_value, output_array, output_queue):
    # 자식 프로세스는 ctrl + c 종료 이벤트를 처리하지 않는다.
    signal.signal(signal.SIGINT, signal.SIG_IGN)

    # 자식 프로세스 동작 수행
    for i in range(5):
        if processEvent.is_set():
            output_queue.put('PID : {0} (set event)'.format(os.getpid()))
            return

        time.sleep(1)
        
        #프로세스 락
        processLock.acquire()
        # 동기화 처리가 필요한 동작을 수행한다.
        output_value.value += 1
        output_array[0] += i
        output_array[1] += i
        output_array[2] += i
        #프로세스 언락
        processLock.release()

    output_queue.put('PID : {0} (complete)'.format(os.getpid()))

if __name__ == '__main__':
    # 자식 프로세스 재귀 호출 방지 
    multiprocessing.freeze_support()
    
    # 프로세스 동기화
    processLock = multiprocessing.Lock()
    # 프로세스 이벤트
    processEvent =multiprocessing.Event()
    # 값 수집용 변수
    output_value = multiprocessing.Value(ctypes.c_int, 0)
    # 값 수집용 배열 (고정 크기)
    output_array = multiprocessing.Array(ctypes.c_int, [0,0,0])
    # 값 수집용 큐
    manager = multiprocessing.Manager()
    # multiprocessing.Queue() 는 버그가 있어서 Manager().Queue() 로 대체
    output_queue = manager.Queue()

    # 자식 프로세스 생성
    listProcessObj = list()
    for processIdx in range(5):
        processObj = multiprocessing.Process(target=ProcessFunction, args=(processLock, processEvent, output_value, output_array, output_queue,))
        processObj.daemon = True
        processObj.start()
        listProcessObj.append(processObj)

    # Ctrl + C 종료 인터럽트 및 자식 프로세스 종료를 대기한다.
    try:
        isAliveProcess = True
        while isAliveProcess:
            isAliveProcess = False
            for proccessObj in listProcessObj:
                if not proccessObj.is_alive():
                    continue
                isAliveProcess = True
    except KeyboardInterrupt:
        processEvent.set()

    # 자식 프로세스 소멸 대기 (Ctrl + C 로 이벤트 발생 후 프로세스 종료 대기)
    for proccessObj in listProcessObj:
        proccessObj.join()

    # 결과 출력
    while not output_queue.empty():
        print(output_queue.get())

 

[결과]

# 정상 처리 완료 결과
PID : 13996 (complete)
PID : 13168 (complete)
PID : 1804 (complete)
PID : 16656 (complete)
PID : 7900 (complete)



# 처리 중 ctrl + c 를 눌러 중단한 결과
PID : 4512 (set event)
PID : 20368 (set event)
PID : 18092 (set event)
PID : 20676 (set event)
PID : 21112 (set event)