TL;DR
- Python์ ๋์์ฑ/๋ณ๋ ฌ์ฑ์ thread์ process ๋ ๊ฐ๋์ด๋ฉฐ GIL ๋๋ฌธ์ CPU ๋ณ๋ ฌ์ process์ ๋ชซ
threading์ I/O-bound์ ์ ํฉํ ์ ์์ค,multiprocessing์ GIL์ ์ฐํํด CPU-bound์ ์ ํฉ,concurrent.futures๋ ๋์ ๊ฐ์ผ ๊ณ ์์ค- ์ ํ ๊ธฐ์ค์ I/O-bound๋ thread, CPU-bound๋ process, ๋๊ท๋ชจ I/O๋ asyncio
AI-assisted
1. ๋ ๊ฐ๋ โ thread์ process
Python์์ ์ฌ๋ฌ ์ผ์ ๊ฒน์น๊ฑฐ๋ ๋๋ ๋๋ฆฌ๋ ๊ธธ์ ํฌ๊ฒ ๋์ด๋ค.
- thread: ํ process ์์ ์ฌ๋ฌ ์คํ ํ๋ฆ. ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๊ณต์ ํ๋ค. ์์ฑ์ด ๊ฐ๋ณ๋ค.
- process: ๋ ๋ฆฝ๋ ํ๋ก๊ทธ๋จ ์ธ์คํด์ค. ๋ฉ๋ชจ๋ฆฌ๊ฐ ๊ฒฉ๋ฆฌ๋๋ค. ์ฌ๋ฌ ์ฝ์ด์์ ์ง์ง ๋ณ๋ ฌ์ด ๊ฐ๋ฅํ๋ค.
๋ฌด์์ ๊ณ ๋ฅผ์ง๋ GIL(Global Interpreter Lock)์ด ๊ฐ๋ฅธ๋ค.
CPython์๋ ํ ์๊ฐ์ ํ๋์ thread๋ง Python ๋ฐ์ดํธ์ฝ๋๋ฅผ ์คํํ๊ฒ ๋ง๋ ์ ๊ธ์ด ์์ด์ thread๋ฅผ ์ฌ๋ฌ ๊ฐ ๋์๋ ์์ Python ๊ณ์ฐ์ ํ ๋ฒ์ ํ๋์ฉ๋ง ๋๋ค. ๊ทธ๋์ CPU-bound ์์
์ thread๋ก ๋๋ ๋ ๋นจ๋ผ์ง์ง ์๊ณ ์ฌ๋ฌ ์ฝ์ด๋ฅผ ์ค์ ๋ก ์ฐ๋ ค๋ฉด GIL์ด process๋ง๋ค ๋ฐ๋ก ์๋ multiprocessing์ผ๋ก ๊ฐ์ผ ํ๋ค.
๋ฐ๋๋ก I/O ๋๊ธฐ ์ค์๋ GIL์ด ํ๋ฆฌ๋ฏ๋ก, I/O-bound ์์ ์๋ thread๋ ์ถฉ๋ถํ ์ธ๋ชจ ์๋ค.
| thread | process | |
|---|---|---|
| ๋ฉ๋ชจ๋ฆฌ | ๊ณต์ (ํยท์ ์ญ) | ๋ ๋ฆฝ(๊ฒฉ๋ฆฌ) |
| CPU ๋ณ๋ ฌ | GIL์ด ๋ง์ | ๊ฐ๋ฅ |
| ์์ฑยท์ ํ ๋น์ฉ | ๊ฐ๋ฒผ์ | ๋ฌด๊ฑฐ์(์์ฑ + IPC) |
| ํต์ | ๊ณต์ ๋ฉ๋ชจ๋ฆฌ๋ก ๋ฐ๋ก | IPC ํ์(pickle ์ง๋ ฌํ) |
| ์ ํฉํ ์์ | I/O-bound | CPU-bound |
2. threading โ OS thread ์ ์์ค
threading์ OS thread ๊ธฐ๋ฐ ๋์์ฑ์ ์ ๊ณตํ๋ ํ์ค ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ค. I/O-bound ์์
์์ ๋๊ธฐ๋ฅผ ๊ฒน์น๊ฑฐ๋, ๋ฐฑ๊ทธ๋ผ์ด๋ ์์ปค๋ฅผ ๋๋ฆด ๋ ์ด๋ค.
thread ์์ฑ๊ณผ ์คํ
import threading
def worker(name):
print(f"{name} ์์
์ค")
t = threading.Thread(target=worker, args=("A",))
t.start() # non-blocking: worker๋ฅผ ์ thread์์ ์์, ๋ฉ์ธ์ ์ฆ์ ๋ค์ ์ค๋ก (async launch)
# ... ์ด ์ฌ์ด ์ฝ๋๋ worker์ ๊ฒน์ณ์ ๋๋ค (concurrent) ...
t.join() # blocking + synchronous: worker๊ฐ ๋๋ ๋๊น์ง ๋ฉ์ธ์ด Blocked๋ก ๋๊ธฐ (์๋ฃ๋ฅผ ์ง์ pull)start(): ์ thread๋ฅผ ๋ง๋ค์ดtargetํจ์๋ฅผ ์คํํ๋ค. non-blocking์ด๋ผ ์ฆ์ ๋ฆฌํดํ๊ณ worker๋ ๋ค์์ concurrentํ๊ฒ ๋๋ค โ ์์ ๋ง ๊ฑธ์ด๋๋ async launch๋ค.run()์ ์ง์ ๋ถ๋ฅด๋ฉด ์ thread ์์ด ํ์ฌ thread์์ ์คํ๋๋ ์ฃผ์ํ๋ค.join(): ํด๋น thread๊ฐ ๋๋ ๋๊น์ง ํธ์ถํ ์ชฝ์ด ๊ธฐ๋ค๋ฆฐ๋ค. blocking + synchronous๋ค โ ํธ์ถํ thread๊ฐ Blocked ์ํ๋ก ์ ๋ค์ด ์๋ฃ๋ฅผ ์ง์ ๊ธฐ๋ค๋ ค ํ์ํ๋ค(pull). ๋จ worker์ ๋ฆฌํด๊ฐ์ ์ฃผ์ง ์๊ณ โ๋๋ฌ๋คโ๋ ์ ํธ๋ง ์ค๋ค. ๊ฒฐ๊ณผ๊ฐ ํ์ํ๋ฉดqueue.Queue๋ 4์ ์Future๋ฅผ ์ด๋ค.daemon: ์ ์ต์ ์์์ ์์ง๋ง ์์ฑ ์Thread(target=..., daemon=True)(๋๋t.daemon = True)๋ก ์ง์ ํ๋ ์ต์ ์ด๋ค. ์ด๋ ๊ฒ ๋ง๋ thread๋ ๋ฉ์ธ thread๊ฐ ๋๋๋ฉด ํจ๊ป ๊ฐ์ ์ข ๋ฃ๋๋ค. ๋ฐฑ๊ทธ๋ผ์ด๋ ์์ฃผ ์์ ์ ์ฐ๋, ์ ๋ฆฌ ์์ด ์ฃฝ์ผ๋ ์ค์ํ ์์ ์ ๋ถ์ ํฉํ๋ค.
๋๊ธฐํ primitives
์ฌ๋ฌ thread๊ฐ ๊ฐ์ ์์์ ๋์์ ๊ฑด๋๋ฆฌ๋ฉด race condition์ด ์๊ธด๋ค. ์ ๊ทผ ์์๋ฅผ ๊ฐ์ ํ๋ ๋๊ตฌ๋ค์ด๋ค.
Lock: ๊ฐ์ฅ ๊ธฐ๋ณธ. ํ ๋ฒ์ ํ๋์ thread๋ง critical section์ ๋ค์ด๊ฐ๊ฒ ํ๋ค. ์์ ๊ถ ๊ฐ๋ ์ด ์์ด ์๋ฌด thread๋ releaseํ ์ ์๊ณ ๊ฐ์ thread๊ฐ ํ์ง ์๊ณ ๋ acquireํ๋ฉด ์๊ธฐ ์์ ์ ๊ธฐ๋ค๋ฆฌ๋ค deadlock์ ๋น ์ง๋ค(๋ฝ์ ๋ชป ์ก์ thread๋ฅผ ์ฌ์ฐ๋๋ ์คํ์ํค๋๋์ ๊ฐ๋ฆผ์ Busy Polling and Spinlock).RLock: reentrant lock. ํ๋ํ thread๊ฐ ์์ ๊ถ์ ๊ฐ์ง๋ฉฐ ๊ทธ thread๋ ํ์ง ์๊ณ ๋ ์ฌ๋ฌ ๋ฒ ๋ค์ acquireํ ์ ์๋ค(์ฌ์ง์ ). ํ๋ ํ์๋งํผ releaseํด์ผ ํ๋ฆฌ๊ณ ์์ ํ thread๋ง releaseํ ์ ์๋ค. ์ฌ๊ท ํจ์๋, ๋ฝ์ ์ฅ ๋ฉ์๋๊ฐ ๊ฐ์ ๋ฝ์ ์ฐ๋ ๋ค๋ฅธ ๋ฉ์๋๋ฅผ ๋ถ๋ฅผ ๋.Event: thread ์ฌ์ด ์ ํธ ๊น๋ฐ. ํ thread๊ฐset()ํ๋ฉดwait()ํ๋ thread๋ค์ด ๊นจ์ด๋๋ค. โ์ค๋น๋๋คโ๋ ํต์ง์ ์ด๋ค. ์ด๋ฆ์ ๋น์ทํด๋ asyncio์ event loop์๋ ๋ฌด๊ดํ๋ค โ ๊ทธ๋ฅ boolean ํ๋๊ทธ๋ค.Condition: ํน์ ์กฐ๊ฑด์ด ๋ ๋๊น์ง ๋๊ธฐยทํต์ง. producer-consumer์ ์ธ๋ฐํ ์ ์ด์.Semaphore: ๋์ ์ง์ ์๋ฅผ N๊ฐ๋ก ์ ํ. ์ปค๋ฅ์ ํ์ฒ๋ผ โ๋์ K๊ฐ๊น์ง๋งโ ํ์ฉํ ๋.
Lock์ผ๋ก ๊ณต์ ๋ณ์ ๋ณดํธ: ๊ฐ์ฅ ๊ธฐ๋ณธ ํจํด์ด๋ค.
lock = threading.Lock()
counter = 0
def increment():
global counter
with lock: # ์ด ๋ธ๋ก์ ํ thread์ฉ ์ง๋ ฌํ
counter += 1Lock vs RLock โ ์์ ๊ถ๊ณผ ์ฌ์ง์
. ์๋๋ Lock์ด๋ฉด ๋ ๋ฒ์งธ ํ๋์์ ์๊ธฐ ์์ ์ ๊ธฐ๋ค๋ฆฌ๋ฉฐ ๋ฉ์ถ๋ค(deadlock). RLock์ ๊ฐ์ thread์ ์ฌํ๋์ ํ์ฉํด ํต๊ณผํ๋ค.
lock = threading.RLock() # threading.Lock()์ด๋ฉด ์๋์์ deadlock
def outer():
with lock:
inner() # ๋ฝ์ ์ฅ ์ฑ ๋ค๋ฅธ ํจ์๋ฅผ ๋ถ๋ฅธ๋ค
def inner():
with lock: # ๊ฐ์ thread๊ฐ ๋ acquire โ RLock์ด๋ผ ํต๊ณผ
...์์ ๊ถ์ผ๋ก ์ ๋ฆฌํ๋ฉด, RLock์ ์์ ํ thread๋ง ํ ์ ์๋ ์ง์ง mutex๊ณ Lock์ ์์ ๊ถ์ด ์์ด ์๋ฐํ๋ binary semaphore์ ๊ฐ๊น๋ค. ์ค๋ฌด์์ ๋ ๋ค ๊ทธ๋ฅ โlockโ์ด๋ผ ๋ถ๋ฅธ๋ค.
Event๋ก ์ถ๋ฐ ์ ํธ ๋ง์ถ๊ธฐ: ์ฌ๋ฌ worker๋ฅผ ํ ์ ํธ์ ํจ๊ป ์ถ๋ฐ์ํจ๋ค. set()์ ํ๋๊ทธ๋ฅผ ์ฌ๋ฆฌ๊ณ wait()์ ์ฌ๋ผ๊ฐ ๋๊น์ง Blocked๋ก ์๋ค.
start = threading.Event()
def worker():
start.wait() # set๋ ๋๊น์ง Blocked๋ก ๋๊ธฐ
print("์ถ๋ฐ")
threading.Thread(target=worker).start()
start.set() # ๋๊ธฐํ๋ worker๋ฅผ ๊นจ์Semaphore๋ก ๋์ ์คํ ์ ์ ํ: โ๋์ K๊ฐ๊น์ง๋งโ์ ๊ฐ์ ํ๋ค.
sem = threading.Semaphore(3) # ๋์ 3๊ฐ๊น์ง ํ์ฉ
def fetch():
with sem: # ์์ 3๊ฐ๊ฐ ๋ค ์ฐจ ์์ผ๋ฉด 4๋ฒ์งธ๋ ๋๊ธฐ
... # ์ธ๋ถ API ํธ์ถ ๋ฑthread ๊ฐ ํต์ โ queue.Queue
๊ณต์ ๋ณ์๋ฅผ lock์ผ๋ก ์งํค๋ ๋์ , thread-safeํ queue.Queue๋ก ๊ฐ์ ์ฃผ๊ณ ๋ฐ๋ ํธ์ด ์์ ํ๋ค. producer๊ฐ put(), consumer๊ฐ get()ํ๋ฉฐ ๋ด๋ถ์ ์ผ๋ก lock์ ์์์ ์ฒ๋ฆฌํ๋ค. ๊ฐ์ ์ฃผ๊ณ ๋ฐ์ ๊ณต์ ํ๋ ์ด ๋ฐ์์ Go์ channel๊ณผ ๊ฐ๋ค(๋ฌด๋ฒํผ ๋๋ฐ๋ถยทselectยทclose ๊ฐ์ ์ฑ๋ ๊ณ ์ ๊ธฐ๋ฅ์ ์๋ค).
import queue, threading
q = queue.Queue()
def producer():
for i in range(5):
q.put(i)
def consumer():
while True:
item = q.get() # ํ๊ฐ ๋น๋ฉด ๋๊ธฐ(blocking)
print(item)
q.task_done()threading์ thread์ ์๋ช
์ฃผ๊ธฐ์ ๋๊ธฐํ๋ฅผ ์ธ๋ฐํ๊ฒ ์ ์ดํ ์ ์์ง๋ง ๊ทธ๋งํผ ์ฝ๋๊ฐ ๋๊ณ deadlockยทrace condition์ ์ง์ ์ ๊ฒฝ ์จ์ผ ํ๋ค. ๋จ์ํ โํจ์ N๊ฐ๋ฅผ ๋์์ ๋๋ฆฌ๊ณ ๊ฒฐ๊ณผ๋ฅผ ๋ชจ์ผ๋โ ์ฉ๋๋ผ๋ฉด 4์ ์ concurrent.futures๊ฐ ๋ ๊ฐ๊ฒฐํ๋ค.
3. multiprocessing โ process ์ ์์ค
CPU-bound ์์ ์ ์ฌ๋ฌ ์ฝ์ด์ ์ค์ ๋ก ๋๋๋ ค๋ฉด process๋ฅผ ์ด๋ค. ๊ฐ process๋ ์๊ธฐ๋ง์ Python ์ธํฐํ๋ฆฌํฐ์ GIL์ด ์์ผ๋ฏ๋ก GIL ์ ์ฝ์ ์ฐํํ๋ค.
process ์์ฑ๊ณผ ์คํ
from multiprocessing import Process
def worker(name):
print(f"{name} ๊ณ์ฐ ์ค")
if __name__ == "__main__": # spawn ๋ฐฉ์์์ ํ์
p = Process(target=worker, args=("A",))
p.start()
p.join()์ธํฐํ์ด์ค๋ threading.Thread์ ๋ฎ์์ง๋ง(start/join), ์คํ ๋จ์๊ฐ ๋
๋ฆฝ process๋ผ๋ ์ ์ด ๋ค๋ฅด๋ค.
Pool โ ์์ปค process ํ
process๋ฅผ ๋งค๋ฒ ๋ง๋ค์ง ์๊ณ ๋ฏธ๋ฆฌ ๋ช ๊ฐ ๋์๋๊ณ ์์ ์ ๋๋ ์ค๋ค.
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == "__main__":
with Pool(4) as pool:
results = pool.map(square, range(10)) # 4๊ฐ process์ ๋ถ์ฐIPC โ process ๊ฐ ํต์
process๋ ๋ฉ๋ชจ๋ฆฌ๊ฐ ๊ฒฉ๋ฆฌ๋ผ ์์ด ๊ณต์ ๋ณ์๋ก ๊ฐ์ ๋ชป ๋๊ธด๋ค. ํต์ ์๋จ์ด ๋ฐ๋ก ํ์ํ๋ค.
Queue/Pipe: process ์ฌ์ด๋ก ๊ฐ์ ์ฃผ๊ณ ๋ฐ๋ ํต๋ก. ๋๊ธฐ๋ ๊ฐ์ pickle๋ก ์ง๋ ฌํ๋๋ค.- shared memory(
Value,Array): ์ฌ๋ฌ process๊ฐ ๊ณต์ ํ๋ ๋ฉ๋ชจ๋ฆฌ ๋ธ๋ก. ์ง๋ ฌํ ์์ด ๊ฐ์ ๊ฐ์ ๋ณธ๋ค.
๋๊ธฐ๋ ๋ฐ์ดํฐ๊ฐ ํฌ๋ฉด ์ด ์ง๋ ฌํยท์ ์ก ๋น์ฉ์ด ๊ณ์ฐ ์ด๋์ ๊น์๋จน์ ์ ์๋ค. ํฐ ๋ฐ์ดํฐ๋ฅผ ์์ฃผ ์ค๊ฐ๋ ๊ตฌ์กฐ๋ผ๋ฉด ์คํ๋ ค ๋๋ ค์ง๋ค.
fork์ spawn
์์ process๋ฅผ ๋ง๋๋ ๋ฐฉ์์ด ๋์ด๋ค.
- fork(๋ฆฌ๋ ์ค ๊ธฐ๋ณธ): ๋ถ๋ชจ process์ ๋ฉ๋ชจ๋ฆฌ๋ฅผ ๋ณต์ฌํด ์์์ ๋ง๋ ๋ค. ๋ถ๋ชจ ์ํ๋ฅผ ๊ทธ๋๋ก ๋ฌผ๋ ค๋ฐ๋๋ค.
- spawn(์๋์ฐยทmacOS ๊ธฐ๋ณธ): ์ Python ์ธํฐํ๋ฆฌํฐ๋ฅผ ์ฒ์๋ถํฐ ์์ํ๊ณ ํ์ํ ๊ฒ๋ง pickle๋ก ์ ๋ฌํ๋ค. ๊ทธ๋์ ์์์ด ๋ถ๋ชจ ์ํ๋ฅผ ์๋์ผ๋ก ๋ฌผ๋ ค๋ฐ์ง ์๊ณ ์คํ ์ฝ๋๋
if __name__ == "__main__":์๋์ ๋ฌ์ผ ๋ฌดํ ์ฌ์คํ์ ๋ง๋๋ค.
์ spawn์ __name__ ๊ฐ๋๊ฐ ํ์ํ๊ฐ
๊ฐ๋๊ฐ ํ์ํ ์ด์ ๋ spawn ์์์ด ๋ถ๋ชจ ์คํฌ๋ฆฝํธ๋ฅผ ๋ค์ importํ๊ธฐ ๋๋ฌธ์ด๋ค. ๋น ์ธํฐํ๋ฆฌํฐ๋ก ๋ฌ ์์์ ์คํํ ํจ์ ์ ์๋ฅผ ์ป์ผ๋ ค main ๋ชจ๋์ ์โ์๋๋ก ๋ค์ ์คํํ๋๋ฐ, ์ด๋ process๋ฅผ ๋ง๋๋ ์ฝ๋๊ฐ ์ต์๋จ์ ๋
ธ์ถ๋ผ ์์ผ๋ฉด ๊ทธ ์ฝ๋๋ ์ฌ์คํ๋ผ ์์์ด ๋ ์์์ ๋ณ๋ ์ฌ๊ท ์์ฑ์ด ๋๋ค(ํ๋ CPython์ ์ด๋ฅผ ๊ฐ์งํด RuntimeError๋ก ๋ง๋๋ค). __name__์ ์ง์ ์คํํ ๋ถ๋ชจ์์ "__main__", import๋ ์์์์ ๋ชจ๋ ์ด๋ฆ์ด๋ผ, ๊ฐ๋ ์์ ์ฝ๋๋ ๋ถ๋ชจ์์๋ง ๋๊ณ ์์์ ๊ฑด๋๋ด๋ค.
๊ทธ๋์ ๋ฐฐ์น ์์น์ด ๊ฐ๋ฆฐ๋ค โ ํจ์ยทํด๋์ค ์ ์๋ ๊ฐ๋ ๋ฐ์(์์์ด importํด ๊ฐ์ ธ๊ฐ์ผ ํ๋ฏ๋ก), process๋ฅผ ๋ง๋ค๊ณ ์์ํ๋ ์ฝ๋๋ ๊ฐ๋ ์์ ๋๋ค. ์ฐธ๊ณ ๋ก ์ด ๊ฐ๋๊ฐ ํ์ํ ๊ฑด Python์ด ๋ชจ๋์ importํ ๋ top-level ์ฝ๋๋ฅผ ์คํํ๊ธฐ ๋๋ฌธ์ด๊ณ ๋ช
์์ main() ์ง์
์ ์ ์ฐ๋ ์ธ์ด(CยทGoยทJava ๋ฑ)์๋ ์ด footgun์ด ์๋ค. fork๋ ์ฌimport๋ฅผ ์ ํ๋ ๊ฐ๋๊ฐ ์์ด๋ ๋์ง๋ง ์ด์์ฑ์ ์ํด ๋ถ์ฌ๋๋ ๊ฒ ๊ด๋ก๋ค.
4. concurrent.futures โ ๊ณ ์์ค ํต์ผ ์ธํฐํ์ด์ค
threadingยทmultiprocessing์ ์ง์ ๋ค๋ฃจ๋ ๋์ , ์์
์ ํ์ ๋์ง๊ณ ๊ฒฐ๊ณผ๋ฅผ Future๋ก ๋ฐ๋ ํต์ผ๋ ๊ณ ์์ค ์ธํฐํ์ด์ค๋ค. ์์ธยทํ์์์ยท์ทจ์ยท๊ฒฐ๊ณผ ์์ง์ด Future๋ก ์ ๋ฆฌ๋ผ ์ฝ๋๊ฐ ์งง๋ค.
ํต์ฌ 3์์ โ Executor, Future, submit
- Executor: ์์
์ ๋งก๋ ํ.
ThreadPoolExecutor(thread ํ)์ProcessPoolExecutor(process ํ) ๋. submit(func, *args): ์์ ์ ํ์ ๋์ง๊ณ ์ฆ์Future๋ฅผ ๋ฐํํ๋ค.Future: โ๋ฏธ๋์ ์ฑ์์ง ๊ฒฐ๊ณผโ๋ฅผ ๋ด๋ ๊ฐ์ฒด. ์ง๊ธ์ ๋น์ด ์๊ณ ์์ ์ด ๋๋๋ฉด ๊ฒฐ๊ณผ๋ ์์ธ๊ฐ ๋ด๊ธด๋ค. ์คํ ํ๋ฆ์ด ์๋๋ผ ๊ฒฐ๊ณผ๋ฅผ ๋ด๋ ๊ทธ๋ฆ ์ด๋ค.
from concurrent.futures import ThreadPoolExecutor
def fetch(url):
... # I/O ์์
return len(url)
with ThreadPoolExecutor(max_workers=8) as executor:
future = executor.submit(fetch, "https://example.com")
result = future.result() # ์๋ฃ๊น์ง ๋๊ธฐ ํ ๊ฒฐ๊ณผ ํ์- I/O-bound โ
ThreadPoolExecutor, CPU-bound โProcessPoolExecutor. ์ธํฐํ์ด์ค๊ฐ ๊ฐ์ ํ ์ค๋ง ๋ฐ๊พธ๋ฉด ๋๋ค.
์ฌ๋ฌ ์์ ์กฐ์จ
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(fetch, u) for u in urls]
for f in as_completed(futures): # ์๋ฃ๋ ๊ฒ๋ถํฐ
print(f.result())์์
์ ์ฌ๋ฌ ๊ฐ submitํ๋ฉด max_workers ๊ฐ์๋งํผ๋ง ๋์์ ๋๊ณ ๋๋จธ์ง๋ ๋ด๋ถ FIFO ํ์์ ๋๊ธฐํ๋ค ์์ปค๊ฐ ๋น๋ ๋๋ก submit ์์๋๋ก ํฌ์
๋๋ค(์: max_workers=8์ 100๊ฐ๋ฅผ ๋์ง๋ฉด 8๊ฐ๊ฐ ๋๊ณ 92๊ฐ๋ ํ์์ ์์๋๋ก ๋๊ธฐ). ์์ ์์ ์ ๋ง์ถ๋ ค Event ๊ฐ์ ๊ฑธ ๊ฑธ ํ์ ์์ด executor๊ฐ ์์์ ๋ฐฐ๋ถํ๋ค. ๋จ ์๋ฃ ์์๋ ๋ณด์ฅ๋์ง ์์ ๋จผ์ ๋๋ ๊ฒ๋ถํฐ ๋ฐ์ผ๋ ค๋ฉด ์๋ as_completed๋ฅผ ์ด๋ค. submit์ ์์์ ๊ธฐ๋ค๋ฆฌ์ง ์๊ณ ์ฆ์ Future๋ฅผ ๋๋ ค์ฃผ๋ non-blocking ํธ์ถ์ด๋ค.
executor.map(func, iterable): ๊ฒฐ๊ณผ๋ฅผ ์ ๋ ฅ ์์๋๋ก ๋๋ ค์ค๋ค.as_completed(futures): ์๋ฃ๋ ์์๋๋ก ๊บผ๋ธ๋ค. ๋จผ์ ๋๋ ๊ฒ๋ถํฐ ์ฒ๋ฆฌํ ๋.wait(futures, ...): ์๋ฃ ์กฐ๊ฑด(์ ๋ถ/ํ๋/ํ์์์)์ ์ง์ ํด ๋๊ธฐ.
Future ๋ค๋ฃจ๊ธฐ
result(timeout=None): ์๋ฃ๊น์ง ๋๊ธฐ ํ ๊ฒฐ๊ณผ. ํ์์์ ์ง์ ๊ฐ๋ฅ. ์์ ์ด ์์ธ๋ก ๋๋ฌ์ผ๋ฉด ๊ทธ ์์ธ๊ฐ ์ฌ๊ธฐ์ ๋ค์ ํฐ์ง๋ค.done(): ์๋ฃ ์ฌ๋ถ๋ง non-blocking์ผ๋ก ํ์ธ.exception(): ์์ธ ๊ฐ์ฒด๋ฅผ ๊ฐ์ ธ์จ๋ค(์์ผ๋ฉด).cancel(): ์์ง ์์ ์ ์ด๋ฉด ์ทจ์๋ฅผ ์๋ํ๋ค.
5. ๋ฌด์์ ์ธ์ ์ฐ๋
| ์ํฉ | ๋๊ตฌ |
|---|---|
| I/O-bound, ์ ์ยท์ค๊ฐ ๊ท๋ชจ ๋์ ์์ | ThreadPoolExecutor / threading |
| CPU-bound | ProcessPoolExecutor / multiprocessing |
| I/O-bound, ๋๋(์์ฒ-์๋ง) | asyncio |
์ ํ์ ํฐ ๊ฐ๋๋ ์ด๋ ๋ค.
- ์์ ์ด ๋๋ฆฐ ์์ธ๋ถํฐ ๊ตฌ๋ถํ๋ค. CPU๊ฐ ๋ฐ์๋ฉด(CPU-bound) process, ๋๊ธฐ๊ฐ ๊ธธ๋ฉด(I/O-bound) thread๋ asyncio๋ค. GIL ๋๋ฌธ์ CPU-bound๋ฅผ thread๋ก ๋๋๋ ๊ฑด ํ์๊ณ ๋ค.
- ์ ์์ค ์ง์ vs ๊ณ ์์ค. threadยทprocess์ ์๋ช
์ฃผ๊ธฐ๋ ๋๊ธฐํ๋ฅผ ์ธ๋ฐํ๊ฒ ์ ์ดํด์ผ ํ๋ฉด
threadingยทmultiprocessing์ ์ง์ ์ฐ๊ณ โํจ์ N๊ฐ๋ฅผ ๋์์ ๋๋ ค ๊ฒฐ๊ณผ๋ฅผ ๋ชจ์ผ๋โ ํํ ํจํด์ด๋ฉดconcurrent.futures๊ฐ ์งง๊ณ ์์ ํ๋ค. - ๋์ ์์ ์ด ์์ฒ ๊ฐ ์ด์์ธ I/O๋ผ๋ฉด thread ์์ฒ ๊ฐ๋ ์ ํ ๋น์ฉ์ด ์ปค์ง๋ค. ์ด๋๋ ๋จ์ผ thread์์ coroutine์ ๊ตด๋ฆฌ๋ asyncio๊ฐ ๋ซ๋ค.
- ์์ฌ ์์ผ๋ฉด ๋๋ ๋งก๊ธด๋ค. ๋คํธ์ํฌ I/O๋ asyncio๋ก ๊ฒน์น๊ณ CPU ํ์ฒ๋ฆฌ๋ process pool๋ก ๋ถ๋ฆฌํ๊ณ ๋๊ธฐ ์ ์ฉ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ thread๋ก ์ฐํํ๋(
asyncio.to_thread) ์์ผ๋ก ์ค๋ฌด์์ ํํ ์กฐํฉํ๋ค.
๊ณต์ ์์์ ์ฌ๋ฟ์ด ๊ฑด๋๋ฆฌ๋ฉด ์ด๋ ๋ชจ๋ธ์ด๋ race condition์ ์ ๊ฒฝ ์จ์ผ ํ๋ค. thread๋ lockยทqueue๋ก, process๋ ๊ฒฉ๋ฆฌ ๋์ ๊ณต์ ๋ฉ๋ชจ๋ฆฌ ๊ฒฝ์์ ์ ์ง๋ง ๊ณต์ ํ์ผยทDB ๊ฒฝ์์ ๋จ๋๋ค.