IT 일반

[Python] Child process 생성

직장인B 2022. 12. 3. 00:56

 작업 간에 별도의 프로세스를 만들어서 작업을 처리해야할 때가 있다. 가장 흔하게는 controller 역할을 하는 어플리케이션에서 worker 혹은 executor 등의 작업 주체를 만드는 경우가 있다. Airflow에서 Celery worker를 사용할 때가 그렇다. airflow는 worker들을 데몬으로 미리 띄워놓는게 아니라 작업이 수행되는 때에 필요에 따라 worker process를 띄워서 사용한다. 이번 포스트에서는 간략하게 python으로 child process를 띄우는 코드를 소개한다. 

 

import os

print(f'Current PID : {os.getpid()}')

pid = os.fork()
if pid > 0:
	_, ret = os.waitpid(pid, 0)

	print("Parent processing")
	print(f'Parent PID: {os.getpid()}, Child PID: {pid}')
	print(f'Child process exit code: {ret}')
else:
	print("Child processing")
	print(f'Child PID:{os.getpid()}')
	os._exit(os.EX_OK)

 

위 코드를 실행했을때의 출력은 다음과 같다. 

Current PID : 2977
Child processing
Child PID:2978
Parent processing
Parent PID: 2977, Child PID: 2978
Child process exit code: 0

 

code의 실행순서는 이렇게된다. 

import os

print(f'Current PID : {os.getpid()}') # 1

pid = os.fork() # 2 - child process 생성
if pid > 0:
	_, ret = os.waitpid(pid, 0) # 3 - child process 처리 & 대기

	print("Parent processing") #7
	print(f'Parent PID: {os.getpid()}, Child PID: {pid}') #8
	print(f'Child process exit code: {ret}') #9
else:
	print("Child processing") # 4
	print(f'Child PID:{os.getpid()}') # 5
	os._exit(os.EX_OK) # 6 - child process 완료

 

child process 가 실행될 때 process 목록을 보면 다음과 같다. 

child pid : 3357

parent pid : 3356

 


airflow 에선 이 코드를 어떻게 이용하는지 참고해보자

https://github.com/apache/airflow/blob/main/airflow/executors/celery_executor.py

def _execute_in_fork(command_to_exec: CommandType, celery_task_id: str | None = None) -> None:
    pid = os.fork()
    if pid:
        # In parent, wait for the child
        pid, ret = os.waitpid(pid, 0)
        if ret == 0:
            return

        msg = f"Celery command failed on host: {get_hostname()} with celery_task_id {celery_task_id}"
        raise AirflowException(msg)

    from airflow.sentry import Sentry

    ret = 1
    try:
        from airflow.cli.cli_parser import get_parser

        settings.engine.pool.dispose()
        settings.engine.dispose()

        parser = get_parser()
        # [1:] - remove "airflow" from the start of the command
        args = parser.parse_args(command_to_exec[1:])
        args.shut_down_logging = False
        if celery_task_id:
            args.external_executor_id = celery_task_id

        setproctitle(f"airflow task supervisor: {command_to_exec}")
        args.func(args)
        ret = 0
    except Exception as e:
        log.exception("[%s] Failed to execute task %s.", celery_task_id, str(e))
        ret = 1
    finally:
        Sentry.flush()
        logging.shutdown()
        os._exit(ret)