El autor seleccionó el COVID-19 Relief Fund para que reciba una donación como parte del programa Write for DOnations.
Los subprocesos de Python son una especie de paralelismo que le permiten a su programa ejecutar varios procedimientos a la vez. Este paralelismo en Python también se puede lograr utilizando varios procesos, pero los subprocesos son particularmente adecuados para acelerar las aplicaciones que implican una cantidad considerable de E/S (entrada/salida).
Algunos ejemplos de operaciones limitadas por las E/S son la realización de solicitudes web y la lectura de datos de archivos. A diferencia de las operaciones limitadas por las E/S, las operaciones limitadas por la CPU (como realizar cálculos matemáticos con la biblioteca estándar de Python) no se benefician mucho de los subprocesos de Python.
Python 3 incluye la utilidad ThreadPoolExecutor
para ejecutar código en subprocesos.
En este tutorial, utilizaremos ThreadPoolExecutor
para realizar solicitudes de red de forma rápida. Definiremos una función que se pueda invocar desde subprocesos, utilizaremos ThreadPoolExecutor
para ejecutar esa función y procesaremos los resultados de esas ejecuciones.
Para los fines de este tutorial, realizaremos solicitudes de red para verificar la existencia de páginas de Wikipedia.
Nota: El hecho de que las operaciones limitadas por las E/S se beneficien más de los subprocesos que las limitadas por la CPU se debe a una idiosincrasia de Python denominada bloqueo global de intérpretes. Si desea obtener más información sobre el bloqueo global de intérpretes de Python, puede consultar la documentación oficial de Python.
Para aprovechar este tutorial al máximo, es recomendable tener algunos conocimientos de programación en Python y un entorno de programación local de Python con requests
instalado.
Puede revisar estos tutoriales para encontrar la información básica necesaria:
Cómo instalar Python 3 y configurar un entorno de programación local en Ubuntu 18.04
Para instalar el paquete requests
en su entorno de programación local de Python, puede ejecutar este comando:
- pip install --user requests==2.23.0
Vamos a comenzar por definir una función que nos gustaría ejecutar con la ayuda de subprocesos.
Puede abrir este archivo usando nano
o el editor de texto o entorno de desarrollo que prefiera:
- nano wiki_page_function.py
Para los fines de este tutorial, vamos a escribir una función que determine si una página de Wikipedia existe o no:
import requests
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
La función get_wiki_page_existence
acepta dos argumentos: una URL a una página de Wikipedia (wiki_page_url
) y un timeout
que indica la cantidad de segundos que se debe esperar una respuesta de esa URL.
get_wiki_page_existence
utiliza el paquete requests
para realizar una solicitud web a esa URL. Se devuelve una cadena que indica si la página existe o no dependiendo del código de estado de response
, la respuesta HTTP. Los diferentes códigos de estado representan los distintos resultados que puede tener una solicitud HTTP. En este procedimiento, se asume que un código de estado 200
, “correcto”, indica que la página de Wikipedia existe y un código de estado 404
, “no encontrada”, que la página de Wikipedia no existe.
Como se describe en la sección Requisitos previos, deberá tener el paquete requests
instalado para poder ejecutar esta función.
Intentemos ejecutar la función añadiendo la url
y la invocación a la función después de la función get_wiki_page_existence
:
. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))
Una vez que haya añadido el código, guarde y cierre el archivo.
Si ejecutamos este código:
- python wiki_page_function.py
Veremos un resultado como el siguiente:
Outputhttps://en.wikipedia.org/wiki/Ocean - exists
La invocación de la función get_wiki_page_existence
con una página de Wikipedia válida devuelve una cadena que confirma que la página efectivamente existe.
Advertencia: En general, no es seguro compartir objetos o estados de Python entre subprocesos sin tener especial cuidado para evitar errores de simultaneidad. A la hora de definir una función que se ejecute en un subproceso, lo mejor es definir una que realice una sola tarea y no publique ni comparta su estado con otros subprocesos. La función get_wiki_page_existence
es un ejemplo de una función con estas características.
Ahora que tenemos una función que se puede invocar con subprocesos, podemos usar ThreadPoolExecutor
para invocar esa función varias veces de forma rápida.
Agregue el siguiente código resaltado a su programa en wiki_page_function.py
:
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
for future in concurrent.futures.as_completed(futures):
print(future.result())
Veamos cómo funciona este código:
concurrent.futures
se importa para darnos acceso a ThreadPoolExecutor
.with
para crear un executor
de una instancia de ThreadPoolExecutor
que limpia de forma rápida los subprocesos al completarse.submit
) cuatro tareas al executor
para cada una de las URL de la lista wiki_page_urls
.submit
devuelve una instancia Future
que se almacena en la lista de futures
.as_completed
espera que se complete cada invocación a get_wiki_page_existence
de Future
para que podamos imprimir su resultado.Si volvemos a ejecutar este programa con el siguiente comando:
- python wiki_page_function.py
Veremos un resultado como el siguiente:
Outputhttps://en.wikipedia.org/wiki/Island - exists
https://en.wikipedia.org/wiki/Ocean - exists
https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist
https://en.wikipedia.org/wiki/Shark - exists
Este resultado es lógico: tres de las URL son páginas de Wikipedia válidas y una de ellas, this_page_does_not_exist
, no lo es. Tenga en cuenta que su resultado puede tener un orden distinto a este. La función concurrent.futures.as_completed
de este ejemplo devuelve resultados tan pronto estén disponibles, independientemente del orden en que se presentaron las tareas.
En el paso anterior, get_wiki_page_existence
devolvió correctamente un valor para todas nuestras invocaciones. En este paso, veremos que ThreadPoolExecutor
también puede crear excepciones generadas en invocaciones de funciones con subprocesos.
Consideremos el siguiente bloque de código de ejemplo:
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(
executor.submit(
get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
)
)
for future in concurrent.futures.as_completed(futures):
try:
print(future.result())
except requests.ConnectTimeout:
print("ConnectTimeout.")
Este bloque de código es casi idéntico al que utilizamos en el paso 2, pero tiene dos diferencias clave:
timeout=0.00001
a get_wiki_page_existence
. Como el paquete requests
no puede completar su solicitud web a Wikipedia en 0.00001
segundos, creará una excepción ConnectTimeout
.ConnectTimeout
que generó future.result()
e imprimimos una cadena cada vez que lo hacemos.Si volvemos a ejecutar el programa, veremos el siguiente resultado:
OutputConnectTimeout.
ConnectTimeout.
ConnectTimeout.
ConnectTimeout.
Se imprimen cuatro mensajes de ConnectTimeout
(uno para cada una de nuestras cuatro wiki_page_urls
), dado que no se pudo completar ninguna de ellas en 0.00001
segundos y cada una de las cuatro invocaciones a get_wiki_page_existence
generó la excepción ConnectTimeout
.
Aprendió que si la invocación a una función enviada a ThreadPoolExecutor
crea una excepción, esa excepción se puede generar normalmente al invocar Future.result
. Invocar Future.result
en todas sus invocaciones enviadas garantiza que su programa no omita ninguna excepción que se haya generado a partir de su función de subprocesos.
Ahora, vamos a verificar que usar ThreadPoolExecutor
efectivamente hace que su programa sea más rápido.
Primero, vamos a medir el tiempo de ejecución de get_wiki_page_existence
sin subprocesos:
import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)
En el código de ejemplo, invocamos nuestra función get_wiki_page_existence
con cincuenta URL de páginas de Wikipedia distintas, una por una. Usamos la función time.time()
para imprimir la cantidad de segundos que toma la ejecución de nuestro programa.
Si volvemos a ejecutar este código como antes, veremos un resultado similar al siguiente:
OutputRunning without threads:
https://en.wikipedia.org/wiki/0 - exists
https://en.wikipedia.org/wiki/1 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Without threads time: 5.803015232086182
Se omitieron las entradas 2 a 47 de este resultado para mayor brevedad.
La cantidad de segundos que se imprima después de Without threads time
será distinta cuando lo ejecute en su máquina, lo que es normal, dado que solo está recibiendo un número de referencia para realizar una comparación con una solución que utiliza ThreadPoolExecutor
. En este caso, tomó ~5.803
segundos.
Ejecutemos las mismas cincuenta URL de Wikipedia a través de get_wiki_page_existence
, pero, esta vez, utilizando ThreadPoolExecutor
:
import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
print("Running threaded:")
threaded_start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
for future in concurrent.futures.as_completed(futures):
print(future.result())
print("Threaded time:", time.time() - threaded_start)
El código es el mismo que el que creamos en el Paso 2, solo agregamos algunas instrucciones de impresión que nos muestran la cantidad de segundos que toma la ejecución de nuestro código.
Si volvemos a ejecutar el programa, veremos lo siguiente:
OutputRunning threaded:
https://en.wikipedia.org/wiki/1 - exists
https://en.wikipedia.org/wiki/0 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Threaded time: 1.2201685905456543
Nuevamente, la cantidad de segundos que se imprime después de Threaded time
será diferente en su computadora (al igual que el orden de su resultado).
Ahora, puede comparar el tiempo de ejecución de la búsqueda de las cincuenta URL de páginas de Wikipedia con y sin subprocesos.
En la máquina utilizada en este tutorial, la ejecución sin subprocesos llevó ~5.803
segundos y con subprocesos, ~1.220
segundos. Nuestro programa se ejecutó considerablemente más rápido con subprocesos.
En este tutorial, aprendió a usar la utilidad ThreadPoolExecutor
de Python 3 para ejecutar de forma eficiente código limitado por las E/S. Creó una función que se puede invocar desde subprocesos, aprendió a recuperar resultados y excepciones de ejecuciones de esa función y notó la mejora de desempeño que se obtiene al utilizar subprocesos.
Ahora, puede obtener más información sobre otras funciones de simultaneidad que ofrece el módulo concurrent.futures
.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!