L’auteur a choisi le COVID-19 Relief Fund pour recevoir un don dans le cadre du programme Write for DOnations.
Les threads Python sont une forme de parallélisme qui permet à votre programme d’exécuter plusieurs procédures à la fois. Le parallélisme en Python peut également être réalisé en utilisant des processus multiples, mais les threads sont particulièrement bien adaptés pour accélérer les applications qui impliquent des quantités importantes d’entrées/sorties (input/output).
Les opérations liées aux entrées/sorties comprennent, par exemple, les requêtes web et la lecture des données des fichiers. Contrairement aux opérations liées aux entrées/sorties, les opérations liées au CPU (comme l’exécution de calculs mathématiques avec la bibliothèque standard Python) ne bénéficieront pas beaucoup des threads Python.
Python 3 inclut l’utilitaire ThreadPoolExecutor
pour exécuter du code dans un thread.
Au cours de ce tutoriel, nous utiliserons ThreadPoolExecutor
pour effectuer rapidement des requêtes réseau. Nous allons définir une fonction bien adaptée à l’invocation dans les threads, utiliser ThreadPoolExecutor
pour exécuter cette fonction, et traiter les résultats de ces exécutions.
Pour ce tutoriel, nous allons faire des requêtes réseau pour vérifier l’existence de pages Wikipédia.
Note : le fait que les opérations liées aux entrées/sorties bénéficient davantage des threads que les opérations liées au CPU est causé par une idiosyncrasie en Python appelée verrouillage global de l’interpréteur. Si vous le souhaitez, vous pouvez en apprendre davantage sur le verrouillage global de l’interpréteur de Python dans la documentation officielle de Python.
Pour tirer le meilleur parti de ce tutoriel, il est recommandé de se familiariser avec la programmation en Python et d’avoir un environnement de programmation Python local avec des requêtes
installé.
Vous pouvez consulter ces tutoriels pour obtenir les informations de base nécessaires :
Comment installer Python 3 et configurer un environnement de programmation local sur Ubuntu 18.04
Pour installer le paquet requests
dans votre environnement de programmation Python local, vous pouvez exécuter cette commande :
- pip install --user requests==2.23.0
Commençons par définir une fonction que nous aimerions exécuter à l’aide de threads.
En utilisant nano
ou votre éditeur de texte/environnement de développement préféré, vous pouvez ouvrir ce fichier :
- nano wiki_page_function.py
Pour ce tutoriel, nous allons écrire une fonction qui détermine si une page Wikipédia existe ou non :
import requests
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
La fonction get_wiki_page_existence
accepte deux arguments : une URL vers une page Wikipédia (wiki_page_url
), et un timeout
de quelques secondes pour obtenir une réponse de cette URL.
get_wiki_page_existence
utilise le paquet requests
pour faire une requête web à cette URL. En fonction du code d’état de la response
HTTP, une chaîne de caractères qui décrit si la page existe ou non est renvoyée. Les différents codes d’état représentent les différents résultats d’une requête HTTP. Cette procédure suppose qu’un code d’état 200
“réussi” signifie que la page Wikipédia existe, et qu’un code d’état 404
“non trouvé” signifie que la page Wikipédia n’existe pas.
Comme décrit dans la section Prérequis, vous aurez besoin du paquet requests
installé pour exécuter cette fonction.
Essayons d’exécuter la fonction en ajoutant l’url
et l’appel de fonction après la fonction get_wiki_page_existence
:
. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))
Une fois que vous avez ajouté le code, enregistrez et fermez le fichier.
Si nous exécutons ce code :
- python wiki_page_function.py
Nous verrons une sortie comme celle-ci :
Outputhttps://en.wikipedia.org/wiki/Ocean - exists
L’appel de la fonction get_wiki_page_existence
avec une page Wikipédia valide renvoie une chaîne de caractères qui confirme que la page existe bel et bien.
Avertissement : en général, il n’est pas sûr de partager des objets ou des états Python entre les threads sans prendre un soin particulier pour éviter les bogues de concurrence. Lors de la définition d’une fonction à exécuter dans un thread, il est préférable de définir une fonction qui effectue un seul travail et qui ne partage ni ne publie l’état à d’autres threads. get_wiki_page_existence
est un exemple d’une telle fonction.
Maintenant que nous disposons d’une fonction bien adaptée à l’invocation avec des threads, nous pouvons utiliser ThreadPoolExecutor
pour effectuer de multiples invocations de cette fonction de manière opportune.
Ajoutons le code surligné suivant à votre programme dans wiki_page_function.py
:
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
for future in concurrent.futures.as_completed(futures):
print(future.result())
Voyons comment ce code fonctionne :
concurrent.futures
est importé pour nous donner accès à ThreadPoolExecutor
.with
est utilisé pour créer un executor
d’instance ThreadPoolExecutor
qui nettoiera rapidement les threads dès leur achèvement.submitted
à l’executor
: un pour chacune des URL de la liste wiki_page_urls
.submit
renvoie une instance Future
qui est stockée dans la liste futures
.as_completed
attend que chaque appel Future
get_wiki_page_existence
soit terminé pour que nous puissions imprimer son résultat.Si nous exécutons à nouveau ce programme, avec la commande suivante :
- python wiki_page_function.py
Nous verrons une sortie comme celle-ci :
Outputhttps://en.wikipedia.org/wiki/Island - exists
https://en.wikipedia.org/wiki/Ocean - exists
https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist
https://en.wikipedia.org/wiki/Shark - exists
Cette sortie est logique : 3 des URLs sont des pages Wikipédia valides, et l’une d’entre elles, this_page_does_not_exist
, ne l’est pas. Notez que votre sortie peut être ordonnée différemment de cette sortie. Dans cet exemple, la fonction concurrent.futures.as_completed
renvoie les résultats dès qu’ils sont disponibles, quel que soit l’ordre dans lequel les emplois ont été soumis.
Au cours de l’étape précédente, get_wiki_page_existence
a réussi à retourner une valeur pour toutes nos invocations. Dans cette étape, nous verrons que ThreadPoolExecutor
peut également lever les exceptions générées dans les invocations de fonctions threadées.
Considérons l’exemple de bloc de code suivant :
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(
executor.submit(
get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
)
)
for future in concurrent.futures.as_completed(futures):
try:
print(future.result())
except requests.ConnectTimeout:
print("ConnectTimeout.")
Ce bloc de code est presque identique à celui que nous avons utilisé à l’étape 2, mais il présente deux différences essentielles :
timeout=0.00001
à get_wiki_page_existence
. Comme le paquet requests
ne pourra pas terminer sa requête web à Wikipedia en 0,0000
1 seconde, cela entraînera une exception Connect
Timeout.ConnectTimeout
soulevées par future.result()
et imprimons une chaîne de caractères à chaque fois que nous le faisons.Si nous relançons le programme, nous obtiendrons la sortie suivante :
OutputConnectTimeout.
ConnectTimeout.
ConnectTimeout.
ConnectTimeout.
Quatre messages ConnectTimeout
sont imprimés - un pour chacun de nos quatre wiki_page_urls
, car aucun d’entre eux n’a pu être terminé en 0.00001
seconde et chacun des quatre appels get_wiki_page_existence
a soulevé l’exception ConnectTimeout
.
Vous avez maintenant vu que si un appel de fonction soumis à un ThreadPoolExecutor
soulève une exception, cette exception peut être soulevée normalement en appelant Future.result
. Appeler Future.result
sur toutes vos invocations soumises garantit que votre programme ne manquera aucune exception soulevée par votre fonction threadée.
Maintenant, vérifions que l’utilisation de ThreadPoolExecutor
rend réellement votre programme plus rapide.
Tout d’abord, chronométrons get_wiki_page_existence
si nous le faisons fonctionner sans threads :
import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)
Dans l’exemple de code, nous appelons notre fonction get_wiki_page_existence
avec cinquante URL de pages Wikipédia différentes, une par une. Nous utilisons la fonction time.time()
pour imprimer le nombre de secondes qu’il faut pour exécuter notre programme.
Si nous exécutons à nouveau ce code comme auparavant, nous obtiendrons la sortie suivante :
OutputRunning without threads:
https://en.wikipedia.org/wiki/0 - exists
https://en.wikipedia.org/wiki/1 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Without threads time: 5.803015232086182
Les entrées 2–47 dans cette sortie ont été omises par concision.
Le nombre de secondes affiché après Without threads time
(Temps écoulé sans threads) sera différent lorsque vous l’exécuterez sur votre machine - ce n’est pas grave, vous obtenez juste un nombre de base à comparer avec une solution qui utilise ThreadPoolExecutor
. Dans ce cas, il était de ~5.803
secondes.
Exécutons les mêmes cinquante URLs de Wikipédia dans get_wiki_page_existence
, mais cette fois en utilisant ThreadPoolExecutor
:
import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
print("Running threaded:")
threaded_start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
for future in concurrent.futures.as_completed(futures):
print(future.result())
print("Threaded time:", time.time() - threaded_start)
Le code est le même que celui que nous avons créé à l’étape 2, mais avec l’ajout de quelques énoncés imprimés qui nous indiquent le nombre de secondes qu’il faut pour exécuter notre code.
Si nous relançons le programme, nous obtiendrons la sortie suivante :
OutputRunning threaded:
https://en.wikipedia.org/wiki/1 - exists
https://en.wikipedia.org/wiki/0 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Threaded time: 1.2201685905456543
Là encore, le nombre de secondes imprimées après Threaded time
(temps threadé écoulé) sera différent sur votre ordinateur (ainsi que l’ordre de votre sortie).
Vous pouvez maintenant comparer le temps d’exécution pour récupérer les cinquante URLs des pages Wikipédia avec et sans threads.
Sur la machine utilisée dans ce tutoriel, l’exécution sans threads a pris ~5.803
secondes, et celle avec threads a pris ~1.220
secondes. Notre programme a fonctionné beaucoup plus rapidement avec les threads.
Dans ce tutoriel, vous avez appris à utiliser l’utilitaire ThreadPoolExecutor
en Python 3 pour exécuter efficacement du code lié aux entrées/sorties. Vous avez créé une fonction bien adaptée à l’invocation dans les threads, appris comment récupérer à la fois la sortie et les exceptions des exécutions threadées de cette fonction, et observé l’augmentation des performances obtenue en utilisant les threads.
De là, vous pouvez en savoir plus sur les autres fonctions de concurrence offertes par le module concurrent.futures
.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!