10.31.2010

scrapper multiproceso en python

Nota inicial: Si no te gusta python puede que este post te haga cambiar de opinión :)

Una de las mejoras de Python 2.6 (en estos momentos vamos por la 2.7, que será la última de la rama 2.x) es el módulo multiprocessing. En pocas palabras viene a ser un módulo para trabajar con procesos de la misma forma que se hace con threads, de hecho en un subconjunto de la funcionalidad puedes cambiar threads por procesos cambiando un solo import.

Sin embargo el módulo multiprocessing añade cosas muy interesantes como la posibilidad de trabajar con pool de procesos. Veamos un ejemplo.

Imaginemos que tenemos que bajar una serie de ficheros pdf para posteriormente extraer información de ellos. Una primera aproximación sería esta:



import urllib
import urllib2

reg_nos = [16738, 17288, 18162, 18776, 18868, 19116, 19223, 19505];
pdf_url = 'http://www.mapa.es/agricultura/pags/fitos/registro/sustancias/pdf/%s.pdf'

def fetch_url(url, params={}):
return urllib2.urlopen(url).read()

def save_url_as_file(url, filename):
open(filename,'wb').write(fetch_url(url))

def download_pdf(reg_no):
f = '%d.pdf' % reg_no
save_url_as_file(pdf_url % reg_no, f)
print "\t- %s downloaded" % f

# tests
def single(regs):
for u in regs:
download_pdf(u)

single(reg_nos)

(puedes verlo mejor con sintáxis coloreada en github)

Para 4 míseros ficheros no merece la pena hacer más, pero imaginemos que queremos bajarnos miles y que además lo tenemos que hacer periódicamente, el tiempo en bajarse todos esos ficheros es alto. Lo primero que se nos ocurre es usar concurrencia: lanzando una serie de hilos/procesos que vayan bajando los ficheros aceleraría sensiblemente el proceso (de hecho así lo hacen los navegadores cuando se bajan los ficheros que referencia el HTML).

En python esto traducido a código ocupa mucho menos que explicarlo:


def download_multi(regs, nprocesses=4):
pool = Pool(processes=nprocesses)
pool.map_async(download_pdf, regs).get()


Usando multiprocessing.Pool python se encarga de lanzar los procesos y preparar una cola para enviarle a la función que especificamos en el primer parámetro.

Este es un uso de multiprocessing, pero tiene otros muchos muy interesantes.

Podéis ver todo el código en github y ejecutar el pequeño benchmark:

q6:smll javi$ python fetch.py
- 16738.pdf downloaded
- 17288.pdf downloaded
- 18162.pdf downloaded
- 18776.pdf downloaded
- 18868.pdf downloaded
- 19116.pdf downloaded
- 19223.pdf downloaded
- 19505.pdf downloaded
2.30190205574
- 18776.pdf downloaded
- 17288.pdf downloaded
- 18162.pdf downloaded
- 16738.pdf downloaded
- 19116.pdf downloaded
- 18868.pdf downloaded
- 19505.pdf downloaded
- 19223.pdf downloaded
0.807252883911

Un incremento un poco menor de 4X, el número de procesos que lanzo en el pool.

Últimamente uso este módulo para muchísimas tareas ya que el uso es prácticamente directo si la aplicación está bien modularizada y permite aprovechar la potencia de las máquinas actuales (en mi caso un dual core).

Bonus Track - threads

Con threads también es posible hacerlo, pero lamentablemente el módulo threading no tiene la funcionalidad Pool, así que debemos emularla.

Antes de pasar a la implementeación está bien decir que desde hace cosa de dos años hasta ahora se ha criticado mucho el modelo multithread de python debido a que existe una cosa llamada GIL (Global Interpreter Lock) que hace que solo pueda estar ejecutándose un hilo al mismo tiempo en el intérprete python. A pesar de ser hilos nativos hay un lock que evita que dos hilos se puedan ejecutar al mismo tiempo. Si quieres saber un poco más sobre el GIL hay una presentación excelente de maestro Dave Beazley.

Es para llevarse las manos a la cabeza, pero esto no quiere decir que el desarrollo con hilos en python esté "prohibido", símplemente hay que saber para qué se puede o no usar. En este caso el uso de threads, a pesar del Lock es muy interesante, ya que al ser tareas fundamentalmente de Entrada/Salida no hay problemas de bloqueo entre hilos (la explicación más en detalle en la presentación que he citado antes).

Sin más, usando Queue (otro módulo python mágico), una cola FIFO sincronizada la tarea es más o menos simple:


def threaded(regs, nthreads=4):
# ripped from http://www.dabeaz.com/generators/Generators.pdf
def consumer(q):
while True:
item = q.get()
if not item: break
download_pdf(item)

in_q = Queue.Queue()

# start threads
ths = [threading.Thread(target=consumer,args=(in_q,))
for th in xrange(nthreads)]
for x in ths: x.start()

# put files to download
for i in regs:
in_q.put(i)

# put end guards
for th in xrange(nthreads): in_q.put(None)

# wait to finish
for x in ths: x.join()

2 comentarios:

agh dijo...

Interesante post. El soporte para concurrencia en Ruby es un horror, y en Python por lo que veo no mucho mejor (lock gigante). Este uso de un pool de procesos no esta mal como parche.

La versión secuencial en clojure:

http://gist.github.com/657830


Y la versión threaded usando futuros:

http://gist.github.com/657828

Esta última tiene un montón de código extra para que el hilo inicial sepa cuando todos los workers han acabado y poder calcular el tiempo transcurrido.

De todos modos, aquí hay código bloqueante para dar y tomar. Una solución 'evented' a lo mejor no era mala idea.

Anónimo dijo...

Hola eth0, eres el culpable de que me haya puesto a mirarme python, tanto leer las bondades que comentas sobre él que me acabó picando el gusanillo.
Buen artículo, sigues así :)