Programació en paral.lel

Un programa Python només utilitza 1 CPU del processador encara que el processador tingui vàries CPUs disponbibles.

Amb el mòdul multiprocessing podem executar una part del nostre programa en un nou procés que s'executarà en una altre CPU del processador.

Això es fa quan tens una funció computacionalment intensiva, que vol dir que ha de fer moltes coses i tarda molt en fer-les com es calcular el Factorial d'un número bastant gran.

A continuació tens una funció factorial:

def factorial(n):

    start_time = time.perf_counter()
    fact = 1
    for i in range(1, n + 1):
        fact = fact * i

    print(f"n = {n}: {time.perf_counter() - start_time : .4f} seconds")

Si executes dos cops la funció factorial() pots veure que el programa tarda més d'1 segon en executar-se perquè fins que no s'ha acabat d'executar la primera crida a la funció factorial()no és pot executar la segona crida a la funció factorial().

import time

def factorial(n):

    start_time = time.perf_counter()
    fact = 1
    for i in range(1, n + 1):
        fact = fact * i

    print(f"n = {n}: {time.perf_counter() - start_time : .4f} seconds")


if __name__ == "__main__":

    factorial(100000)
    factorial(90000)

    print("Program finished")

Si executes el programa pots veure que el progama tarda uns 5 segons en executar-se perquè només utiliza un procés i una CPU:

$ time python3 multi.py 
n = 100000:  3.1057 seconds
n = 90000:  2.1048 seconds
Program finished

real    0m5,245s
user    0m4,799s
sys     0m0,446s

Amb la llibreria multiprocessing podem crear un procés per executar la tasca factorial.

...
import multiprocessing as mp

process = mp.Process(target=factorial, args= (100000,))

Quan cridem la funció start() de l'objecte Process el sistema operatiu crearà un nou procés per executar la funció corresponent.

import multiprocessing as mp
import time

def factorial(n):

    start_time = time.perf_counter()
    fact = 1
    for i in range(1, n + 1):
        fact = fact * i

    print(f"n = {n}: {time.perf_counter() - start_time : .4f} seconds")


if __name__ == "__main__":

    mp.Process(target=factorial, args= (100000,)).start()
    mp.Process(target=factorial, args= (90000,)).start()

    print("Program finished")

Pots veure que el programa s'executa en poc més d'3 segons perquè les tasques s'executen en paral.lel en procesos diferents en CPUs diferents:

$ time python3 multi.py 
Program finished
n = 90000:  2.4677 seconds
n = 100000:  3.2331 seconds

real    0m3,309s
user    0m5,208s
sys     0m0,572s

El temps real és el temps que ha tardat en executar-se el programa mentres que user i sys és temps d'execució total en CPUs.

Si et fixes en la sortida, les tasques s'executen més tard que el print('Program finished').

Amb la funció join() podem indicar que el procés que està executant el programa ha d'esperar a que acabi el nou procés que ha creat abans de seguir executant més codi i imprimir per pantalla Program finished.

Per exemple, podem fer un join() amb el segon procés:

import multiprocessing as mp
import time

def factorial(n):

    start_time = time.perf_counter()
    fact = 1
    for i in range(1, n + 1):
        fact = fact * i

    print(f"n = {n}: {time.perf_counter() - start_time : .4f} seconds")


if __name__ == "__main__":

    p1 = mp.Process(target=factorial, args= (100000,))
    p2 = mp.Process(target=factorial, args= (90000,))

    p1.start()
    p2.start()

    p2.join()

    print("Program finished")

Pots veure que el procés principal només espera el procés p2 per seguir executant codi:

$ time python3 multi.py 
n = 90000:  2.5013 seconds
Program finished
n = 100000:  3.2731 seconds

real    0m3,356s
user    0m5,241s
sys     0m0,623s

Activitat. Modifica el codi perquè el procés principal esperi que acabi el procés p1 enlloc del procés p2.

CPUs bound

L'execució de procesos en paral.lel està limitat pel les CPUs que té el processador.

En el meu cas el meu processador és un Intel(R) Xeon(R) Platinum 8358 CPU @ 2.60GHz amb 8 CPUs tal com es pot veure a continuació:

$ lscpu
...
CPU(s):                  8
Vendor ID:               GenuineIntel
  Model name:            Intel(R) Xeon(R) Platinum 8358 CPU @ 2.60GHz

Tu has d'adaptar els exemples al teu processador.

Anem a provar que passa executan 6 procesos al mateix temps:

import multiprocessing as mp
import time


def factorial(n):

    start_time = time.perf_counter()
    fact = 1
    for i in range(1, n + 1):
        fact = fact * i

    print(f"n = {n}: {time.perf_counter() - start_time : .4f} seconds")


if __name__ == "__main__":

    processes = [mp.Process(target=factorial, args=(100000 + x,)) for x in range(6)]
    [p.start() for p in processes]
    [p.join() for p in processes]

    print("Program finished")

Pots veure que el programa s'executa en 3,216 segons, però el temps d'execució de CPUs és de 16,445 segons en espai d'usuari:

$ time python3 multi.py 
n = 100003:  3.0745 seconds
n = 100004:  3.1588 seconds
n = 100005:  3.1770 seconds
n = 100000:  3.2295 seconds
n = 100002:  3.3566 seconds
n = 100001:  3.5194 seconds
Program finished

real    0m3,606s
user    0m17,296s
sys     0m2,327s

En canvi si executem més procesos que CPUs té el procesador, no tots els procesos es poden executar a la vegada i el temps real será superior als 3 segons perquè els procesos es van executant de manera cooperativa compartint les CPUs.

Si executem 16 processos el temps real d'execució és de 5,621 segons, i tots els processos tarden entre 4 i 6 segons en executar-se encara que es poden executar en 3 segons si tenen una CPU en exclusiva només per a ells:

$ time python3 multi.py 
n = 100000:  4.1471 seconds
n = 100000:  4.4849 seconds
n = 100000:  4.9202 seconds
n = 100000:  5.0799 seconds
n = 100000:  5.1861 seconds
n = 100000:  5.2199 seconds
n = 100000:  5.3791 seconds
n = 100000:  5.3963 seconds
n = 100000:  5.4531 seconds
n = 100000:  5.4804 seconds
n = 100000:  5.5332 seconds
n = 100000:  5.5426 seconds
n = 100000:  5.6940 seconds
n = 100000:  5.7076 seconds
Program finished

real    0m5,796s
user    0m38,791s
sys     0m4,952s

Pool manager

Tenir molts procesos en execució al mateix temps no és molt bona idea.

Imagina`t una classe de 80 alumnes amb només 4 ordinadors tots junts a la mateixa aula compartint els ordinadors per fer una tasca.

És millor que només hi hagi 4 alumnes treballant i a mida que un acabi una altre alumne ocupi l'ordinador.

Ja ser que hi haurà discusions de qui va primer, que aquest alumne tarda massa, etc. , i el professor ha de decidir de la mateixa manera que tu com a programador tens que decidir quins processos tenen prioritat, etc.

Per limitar el número de processos que estan en execució a la vegada pots utilitzar un Pool:

if __name__ == "__main__":

    print (f"CPU count: {mp.cpu_count()}")
    pool =mp.Pool()
    processes = [pool.apply_async(factorial, args=(100000 + x,)) for x in range(16)]
    [p.get() for p in processes]

    print("Program finished")

Per defecte mp.Pool() crea un pool amb el número de CPUS que té el processador, encara que pots passar per argument un número diferent.

Amb la funció apply_async passem la funció que s'ha d'executar en el nou procés.

Com que volem es esperar a que el procés acabi d'executarse fem servir la funció get().

Pots verificar que tots els processos s'executen en uns 3 segons.

$ time python3 multi.py
CPU count: 8 
n = 100007:  3.1095 seconds
n = 100002:  3.1361 seconds
n = 100006:  3.1888 seconds
n = 100005:  3.2014 seconds
n = 100000:  3.2473 seconds
n = 100004:  3.2495 seconds
n = 100003:  3.2950 seconds
n = 100001:  3.3974 seconds
n = 100009:  2.7888 seconds
n = 100008:  2.8171 seconds
n = 100011:  2.8187 seconds
n = 100012:  2.8125 seconds
n = 100013:  2.8400 seconds
n = 100010:  2.9230 seconds
n = 100014:  2.9810 seconds
n = 100015:  2.9160 seconds
Program finished

real    0m6,401s
user    0m43,894s
sys     0m4,687s

Si vols escriure menys codi pots utilitzar la funció map():

if __name__ == "__main__":

    pool = mp.Pool()
    pool.map(factorial, [100000 + x for x in range(16)])

    print("Program finished")

We don’t have the start and join here because it is hidden behind the pool.map() function. What it does is split the iterable range(1,1000) into chunks and runs each chunk in the pool.

Amb la funció map no tenim que fer un start i un join perqè la funció map ja s'ocupa de fer-ho.

concurrent.futures

Enlloc de fer servir el mòdul multiprocessing podem utilitzar el mòdul concurrent.features:

if __name__ == "__main__":

    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor.map(factorial, [100000 + x for x in range(16)])

    print("Program finished")

This code is running the multiprocessing module under the hood. The beauty of doing so is that we can change the program from multiprocessing to multithreading by simply replacing ProcessPoolExecutor with ThreadPoolExecutor. Of course, you have to consider whether the global interpreter lock is an issue for your code.

La funció ProcessPoolExecutor utilitza el mòdul multiprocessing.

L'avantatge més important és que la lógica del codi és la mateixa en multiprocessament que en multithreading: només has de substituir ProcessPoolExecutor per ThreadPoolExecutor.

joblib

The package joblib is a set of tools to make parallel computing easier. It is a common third-party library for multiprocessing. It also provides caching and serialization functions. To install the joblib package, use the command in the terminal: pip install joblib

We can convert our previous example into the following to use joblib: import time from joblib import Parallel, delayed

def cube(x): return x**3

start_time = time.perf_counter() result = Parallel(n_jobs=3)(delayed(cube)(i) for i in range(1,1000)) finish_time = time.perf_counter() print(f"Program finished in {finish_time-start_time} seconds") print(result)

Indeed, it is intuitive to see what it does. The delayed() function is a wrapper to another function to make a “delayed” version of the function call. Which means it will not execute the function immediately when it is called.

Then we call the delayed function multiple times with different sets of arguments we want to pass to it. For example, when we give integer 1 to the delayed version of the function cube, instead of computing the result, we produce a tuple, (cube, (1,), {}) for the function object, the positional arguments, and keyword arguments, respectively.

We created the engine instance with Parallel(). When it is invoked like a function with the list of tuples as an argument, it will actually execute the job as specified by each tuple in parallel and collect the result as a list after all jobs are finished. Here we created the Parallel() instance with n_jobs=3, so there will be three processes running in parallel.

We can also write the tuples directly. Hence the code above can be rewritten as: result = Parallel(n_jobs=3)((cube, (i,), {}) for i in range(1,1000))

The benefit of using joblib is that we can run the code in multithread by simply adding an additional argument: result = Parallel(n_jobs=3, prefer="threads")(delayed(cube)(i) for i in range(1,1000))

And this hides all the details of running functions in parallel. We simply use a syntax not too much different from a plain list comprehension.

Referències