Nota inicial: Si no te gusta python puede que este post te haga cambiar de opinión :)
Sin embargo el módulo multiprocessing añade cosas muy interesantes como la posibilidad de trabajar con pool de procesos. Veamos un ejemplo.
Imaginemos que tenemos que bajar una serie de ficheros pdf para posteriormente extraer información de ellos. Una primera aproximación sería esta:
import urllib
import urllib2
reg_nos = [16738, 17288, 18162, 18776, 18868, 19116, 19223, 19505];
pdf_url = 'http://www.mapa.es/agricultura/pags/fitos/registro/sustancias/pdf/%s.pdf'
def fetch_url(url, params={}):
return urllib2.urlopen(url).read()
def save_url_as_file(url, filename):
open(filename,'wb').write(fetch_url(url))
def download_pdf(reg_no):
f = '%d.pdf' % reg_no
save_url_as_file(pdf_url % reg_no, f)
print "\t- %s downloaded" % f
# tests
def single(regs):
for u in regs:
download_pdf(u)
single(reg_nos)
(puedes verlo mejor con sintáxis coloreada en github)
Para 4 míseros ficheros no merece la pena hacer más, pero imaginemos que queremos bajarnos miles y que además lo tenemos que hacer periódicamente, el tiempo en bajarse todos esos ficheros es alto. Lo primero que se nos ocurre es usar concurrencia: lanzando una serie de hilos/procesos que vayan bajando los ficheros aceleraría sensiblemente el proceso (de hecho así lo hacen los navegadores cuando se bajan los ficheros que referencia el HTML).
En python esto traducido a código ocupa mucho menos que explicarlo:
def download_multi(regs, nprocesses=4):
pool = Pool(processes=nprocesses)
pool.map_async(download_pdf, regs).get()
Usando multiprocessing.Pool python se encarga de lanzar los procesos y preparar una cola para enviarle a la función que especificamos en el primer parámetro.
Este es un uso de multiprocessing, pero tiene otros muchos muy interesantes.
Podéis ver todo el código en github y ejecutar el pequeño benchmark:
q6:smll javi$ python fetch.py
- 16738.pdf downloaded
- 17288.pdf downloaded
- 18162.pdf downloaded
- 18776.pdf downloaded
- 18868.pdf downloaded
- 19116.pdf downloaded
- 19223.pdf downloaded
- 19505.pdf downloaded
2.30190205574
- 18776.pdf downloaded
- 17288.pdf downloaded
- 18162.pdf downloaded
- 16738.pdf downloaded
- 19116.pdf downloaded
- 18868.pdf downloaded
- 19505.pdf downloaded
- 19223.pdf downloaded
0.807252883911
Un incremento un poco menor de 4X, el número de procesos que lanzo en el pool.
Últimamente uso este módulo para muchísimas tareas ya que el uso es prácticamente directo si la aplicación está bien modularizada y permite aprovechar la potencia de las máquinas actuales (en mi caso un dual core).
Bonus Track - threads
Con threads también es posible hacerlo, pero lamentablemente el módulo threading no tiene la funcionalidad Pool, así que debemos emularla.
Antes de pasar a la implementeación está bien decir que desde hace cosa de dos años hasta ahora se ha criticado mucho el modelo multithread de python debido a que existe una cosa llamada GIL (Global Interpreter Lock) que hace que solo pueda estar ejecutándose un hilo al mismo tiempo en el intérprete python. A pesar de ser hilos nativos hay un lock que evita que dos hilos se puedan ejecutar al mismo tiempo. Si quieres saber un poco más sobre el GIL hay una presentación excelente de maestro Dave Beazley.
Es para llevarse las manos a la cabeza, pero esto no quiere decir que el desarrollo con hilos en python esté "prohibido", símplemente hay que saber para qué se puede o no usar. En este caso el uso de threads, a pesar del Lock es muy interesante, ya que al ser tareas fundamentalmente de Entrada/Salida no hay problemas de bloqueo entre hilos (la explicación más en detalle en la presentación que he citado antes).
Sin más, usando Queue (otro módulo python mágico), una cola FIFO sincronizada la tarea es más o menos simple:
def threaded(regs, nthreads=4):
# ripped from http://www.dabeaz.com/generators/Generators.pdf
def consumer(q):
while True:
item = q.get()
if not item: break
download_pdf(item)
in_q = Queue.Queue()
# start threads
ths = [threading.Thread(target=consumer,args=(in_q,))
for th in xrange(nthreads)]
for x in ths: x.start()
# put files to download
for i in regs:
in_q.put(i)
# put end guards
for th in xrange(nthreads): in_q.put(None)
# wait to finish
for x in ths: x.join()