Der Autor hat den COVID-19 Relief Fund dazu ausgewählt, eine Spende im Rahmen des Programms Write for DOnations zu erhalten.
Python-Threads stellen eine Form von Parallelismus dar, mit der Ihr Programm verschiedene Operationen gleichzeitig ausführen kann. Parallelismus in Python lässt sich auch durch Verwendung mehrerer Prozesse erzielen; Threads eignen sich jedoch besonders gut für die Beschleunigung von Anwendungen, die hohe I/O-Leistung benötigen.
Beispiel: I/O-gerichtete Operationen umfassen die Erstellung von Webanfragen und das Lesen von Daten aus Dateien. Im Gegensatz zu I/O-gerichteten Operationen werden CPU-gerichtete Operationen (wie die Ausführung von Berechnungen mit der Python-Standardbibliothek) von Python-Threads nur wenig profitieren.
Python 3 enthält das Dienstprogramm ThreadPoolExecutor
zur Ausführung von Code in einem Thread.
In diesem Tutorial werden wir ThreadPoolExecutor
verwenden, um zügige Netzwerkanfragen zu erstellen. Wir werden eine Funktion definieren, die für Aufrufe innerhalb von Threads geeignet ist, ThreadPoolExecutor
zur Ausführung dieser Funktion nutzen und Ergebnisse aus diesen Ausführungen verarbeiten.
In diesem Tutorial werden wir Netzwerkanfragen stellen, um die Existenz von Wikipedia-Seiten zu überprüfen.
Anmerkung: Die Tatsache, dass I/O-gerichtete Operationen mehr von Threads profitieren als I/O-orientierte Operationen, hängt mit einer Eigenart von Python zusammen, die_ global interpreter loc_k genannt wird. Wenn Sie möchten, können Sie in der offiziellen Python-Dokumentation mehr über „global interpreter lock“ von Python erfahren.
Für eine optimale Nutzung des Tutorials empfiehlt sich Vertrautheit mit der Programmierung in Python und einer lokalen Python-Programmierumgebung mit installiertem requests
-Paket.
Sie können für die notwendigen Hintergrundinformationen diese Tutorials durchsehen:
Installieren von Python 3 und Einrichten einer lokalen Programmierumgebung unter Ubuntu 18.04
Um das requests
-Paket in Ihrer lokalen Python-Programmierumgebung zu installieren, können Sie folgenden Befehl ausführen:
- pip install --user requests==2.23.0
Definieren wir zunächst eine Funktion, die wir mithilfe von Threads ausführen möchten.
Mit nano
oder Ihrem bevorzugten Texteditor/Ihrer bevorzugten Entwicklungsumgebung können Sie diese Datei öffnen:
- nano wiki_page_function.py
In diesem Tutorial werden wir eine Funktion schreiben, die ermittelt, ob eine Wikipedia-Seite vorhanden ist oder nicht:
import requests
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
Die Funktion get_wiki_page_existence
akzeptiert zwei Argumente: eine URL zu einer Wikipedia-Seite (wiki_page_url
) und eine timeout
-Anzahl von Sekunden, während der auf eine Antwort von dieser URL gewartet werden soll.
get_wiki_page_existence
nutzt das requests
-Paket, um eine Webanfrage an diese URL zu stellen. Je nach Statuscode der HTTP-Antwort
wird eine Zeichenfolge zurückgegeben, die beschreibt, ob die Seite vorhanden ist oder nicht. Verschiedene Statuscodes stellen verschiedene Ergebnisse einer HTTP-Anfrage dar. Hier gehen wir davon aus, dass ein 200
-Statuscode („Erfolg“) bedeutet, dass die Wikipedia-Seite existiert, und ein 404
-Statuscode („Nicht gefunden“) bedeutet, dass die Wikipedia-Seite nicht existiert.
Wie im Abschnitt zu den Voraussetzungen beschrieben, benötigen Sie das installierte requests
-Paket, um diese Funktion ausführen zu können.
Versuchen wir, die Funktion auszuführen, indem wir die url
und den Funktionsaufruf nach der Funktion get_wiki_page_existence
hinzufügen:
. . .
url = "https://en.wikipedia.org/wiki/Ocean"
print(get_wiki_page_existence(wiki_page_url=url))
Nachdem Sie den Code hinzugefügt haben, speichern und schließen Sie die Datei.
Wenn wir diesen Code ausführen:
- python wiki_page_function.py
Erhalten wir eine Ausgabe wie die folgende:
Outputhttps://en.wikipedia.org/wiki/Ocean - exists
Bei Aufruf der Funktion get_wiki_page_existence
mit einer gültigen Wikipedia-Seite wird eine Zeichenfolge zurückgegeben, die bestätigt, dass die Seite tatsächlich existiert.
Achtung: Im Allgemeinen ist es nicht sicher, Python-Objekte oder -Status zwischen Threads zu teilen, ohne sorgfältig darauf zu achten, dass keine Parallelitätsfehler auftreten. Wenn Sie eine Funktion definieren, die in einem Thread ausgeführt werden soll, ist es am besten, eine Funktion festzulegen, die einen einzelnen Auftrag ausführt und den Status nicht an andere Threads weitergibt oder veröffentlicht. get_wiki_page_existence
ist ein Beispiel für eine solche Funktion.
Nachdem wir nun über eine Funktion verfügen, die sich in Threads aufrufen lässt, können wir ThreadPoolExecutor
verwenden, um zügig mehrere Aufrufe dieser Funktion auszuführen.
Fügen Sie Ihrem Programm in wiki_page_function.py
den folgenden hervorgehobenen Code hinzu:
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
for future in concurrent.futures.as_completed(futures):
print(future.result())
Werfen wir einen Blick auf die Funktionsweise dieses Codes:
concurrent.futures
wird importiert, um uns Zugriff auf ThreadPoolExecutor
zu gewähren.with
-Anweisung dient der Erstellung eines ThreadPoolExecutor
-Instanz-Executors
, der Threads unmittelbar nach dem Abschluss bereinigt.Executor
übergeben
: einer für jede der URLs in der Liste wiki_page_urls
.submit
gibt eine Future
-Instanz zurück, die in der futures
-Liste gespeichert ist.as_completed
wartet, bis jeder Future
get_wiki_page_existence
-Aufruf abgeschlossen ist, damit wir das Ergebnis ausgeben können.Wenn wir dieses Programm mit dem folgenden Befehl erneut ausführen:
- python wiki_page_function.py
Erhalten wir eine Ausgabe wie die folgende:
Outputhttps://en.wikipedia.org/wiki/Island - exists
https://en.wikipedia.org/wiki/Ocean - exists
https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist
https://en.wikipedia.org/wiki/Shark - exists
Diese Ausgabe ergibt Sinn: drei der URLs sind gültige Wikipedia-Seiten, eine nicht (this_page_does_not_exist
). Beachten Sie, dass Ihre Ausgabe eine andere Reihenfolge aufweisen kann als diese Ausgabe. Die Funktion concurrent.futures.as_completed
in diesem Beispiel gibt Ergebnisse zurück, sobald sie verfügbar sind. Dabei ist es egal, in welcher Reihenfolge die Aufträge übermittelt wurden.
Im vorherigen Schritt hat get_wiki_page_existence
bei allen unseren Aufrufen erfolgreich einen Wert zurückgegeben. In diesem Schritt sehen wir, dass ThreadPoolExecutor
auch Ausnahmen auslösen kann, die in Threaded-Funktionsaufrufen generiert werden.
Betrachten wir den folgenden beispielhaften Codeblock:
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(
executor.submit(
get_wiki_page_existence, wiki_page_url=url, timeout=0.00001
)
)
for future in concurrent.futures.as_completed(futures):
try:
print(future.result())
except requests.ConnectTimeout:
print("ConnectTimeout.")
Dieser Codeblock ist fast identisch mit dem, den wir in Schritt 2 verwendet haben; er weist jedoch zwei wichtige Unterschiede auf:
timeout=0.00001
an get_wiki_page_existence
. Da das requests
-Paket seine Webanfrage an Wikipedia in 0,00001
Sekunden nicht abschließen kann, wird eine ConnectTimeout
-Ausnahme ausgelöst.ConnectTimeout
-Ausnahmen, die durch future.result()
ausgelöst werden, und drucken dabei jedes Mal eine Zeichenfolge aus.Wenn wir das Programm erneut ausführen, sehen wir die folgende Ausgabe:
OutputConnectTimeout.
ConnectTimeout.
ConnectTimeout.
ConnectTimeout.
Vier ConnectTimeout
-Nachrichten werden ausgegeben (eine für jede unserer vier wiki_page_urls
), da keine davon in 0,00001
Sekunden abgeschlossen werden konnte und jede der vier get_wiki_page_existence
-Aufrufe eine ConnectTimeout
-Ausnahme ausgelöst hat.
Sie haben gesehen, dass wenn ein Funktionsaufruf an einen ThreadPoolExecutor
eine Ausnahme auslöst, diese Ausnahme normalerweise durch Aufruf von Future.result
ausgelöst werden kann. Ein Aufruf von Future.result
bei all Ihren übermittelten Aufrufen stellt sicher, dass Ihr Programm keine Ausnahmen verpasst, die von Ihrer Threaded-Funktion ausgelöst werden.
Überprüfen wir nun, ob die Verwendung von ThreadPoolExecutor
Ihr Programm tatsächlich schneller macht.
Lassen Sie uns zunächst die Ausführung von get_wiki_page_existence
ohne Threads messen:
import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
print("Running without threads:")
without_threads_start = time.time()
for url in wiki_page_urls:
print(get_wiki_page_existence(wiki_page_url=url))
print("Without threads time:", time.time() - without_threads_start)
Im Codebeispiel rufen wir unsere get_wiki_page_existence
-Funktion mit fünfzig verschiedenen URLs von Wikipedia-Seiten hintereinander auf. Wir verwenden die Funktion time.time()
, um die Anzahl der Sekunden auszugeben, die für die Ausführung unseres Programms benötigt wurde.
Wenn wir diesen Code wie zuvor erneut ausführen, erhalten wir eine Ausgabe wie die folgende:
OutputRunning without threads:
https://en.wikipedia.org/wiki/0 - exists
https://en.wikipedia.org/wiki/1 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Without threads time: 5.803015232086182
Einträge 2 bis 47 in dieser Ausgabe wurden der Kürze halber ausgelassen.
Die Anzahl der Sekunden, die nach Without threads time
(Zeit ohne Threads) ausgegeben wird, wird sich bei Ausführung auf Ihrem Computer unterscheiden. Das ist in Ordnung; Sie erhalten einfach eine Baseline-Zahl, die Sie mit einer Lösung vergleichen können, die ThreadPoolExecutor
nutzt. In diesem Fall waren es ~5,803
Sekunden.
Führen wir nun die gleichen fünfzig Wikipedia-URLs über get_wiki_page_existence
aus, diesmal jedoch mit ThreadPoolExecutor
:
import time
import requests
import concurrent.futures
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)]
print("Running threaded:")
threaded_start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for url in wiki_page_urls:
futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
for future in concurrent.futures.as_completed(futures):
print(future.result())
print("Threaded time:", time.time() - threaded_start)
Der Code ist der gleiche Code, den wir in Schritt 2 erstellt haben; diesmal enthält er jedoch zusätzlich einige Druckanweisungen, um die Anzahl der Sekunden anzuzeigen, die zur Ausführung unseres Codes benötigt wurden.
Wenn wir das Programm erneut ausführen, erhalten wir die folgende Ausgabe:
OutputRunning threaded:
https://en.wikipedia.org/wiki/1 - exists
https://en.wikipedia.org/wiki/0 - exists
. . .
https://en.wikipedia.org/wiki/48 - exists
https://en.wikipedia.org/wiki/49 - exists
Threaded time: 1.2201685905456543
Auch die Anzahl der Sekunden, die nach Threaded time
(Zeit mit Threads) ausgegeben wird, wird sich auf Ihrem Computer unterscheiden (ebenso die Reihenfolge Ihrer Ausgabe).
Jetzt können Sie die Ausführungszeit beim Abrufen der fünfzig URLs von Wikipedia-Seiten mit und ohne Threads miteinander vergleichen.
Auf dem in diesem Tutorial verwendeten Rechner dauerte es ohne Threads ~5,803
Sekunden; mit Threads waren es ~1,220
Sekunden. Unser Programm lief mit Threads also deutlich schneller.
In diesem Tutorial haben Sie erfahren, wie Sie das Dienstprogramm ThreadPoolExecutor
in Python 3 verwenden können, um I/O-gerichteten Code effizient auszuführen. Sie haben eine Funktion erstellt, die sich für Aufrufe innerhalb von Threads eignet, gelernt, wie man sowohl Ausgaben als auch Ausnahmen von Threaded-Ausführungen dieser Funktion abruft, und den Leistungsschub beobachten können, der durch Verwendung von Threads entsteht.
Nun können Sie mehr über andere Parallelitätsfunktionen des concurrent.futures
-Moduls erfahren.
Thanks for learning with the DigitalOcean Community. Check out our offerings for compute, storage, networking, and managed databases.
This textbox defaults to using Markdown to format your answer.
You can type !ref in this text area to quickly search our full set of tutorials, documentation & marketplace offerings and insert the link!