Channels in Go: Bequem parallel

Viele fürchten nebenläufige Programmierung. Channels in Go können diese Art der Softwareentwicklung aber wesentlich erleichtern.

In Pocket speichern vorlesen Druckansicht 7 Kommentare lesen
Go
Lesezeit: 16 Min.
Von
  • Dr. Reinhard Wobst
Inhaltsverzeichnis

Moderne Prozessoren bieten viele CPU-Kerne, aber die meisten Programme nutzen diese nicht aus. Entwickler haben dafür eine einleuchtende Erklärung: Sie fürchten kaum mehr als sogenannte Race Conditions. Hierbei handelt es sich um Fehler, die aus unterschiedlicher Reihenfolge der Ausführung parallel laufender Programme und Threads resultieren und überwiegend nicht reproduzierbar sind – von sicheren Tests ganz zu schweigen.

Das ist zum Teil auch der Implementierung der Nebenläufigkeit geschuldet. Entwickler greifen üblicherweise auf Mutexe zurück, auch Locks genannt: Dabei handelt es sich um Status-Flags, die das Programm Thread-sicher verändern und somit anderen Threads den Zugriff auf Ressourcen sperren können –, wenn die anderen die Sperre berücksichtigen.

Der verbreitete Einsatz von Mutexen geht unter anderem auf eine Arbeit von Birell vom System Research Center aus dem Jahr 1989 zurück. Damals waren Threads noch etwas ganz Neues, und bei Parallelisierung dachte er wohl eher noch an Vektorprozessoren in Supercomputern.

Dabei entwarf der britische Informatiker Tony Hoare schon elf Jahre früher mit den Communicating Sequential Processes (CSP) ein anderes Konzept. Der Gedanke ist einfach: CSPs regeln den Zugriff auf Daten nicht per Mutex, denn das erinnert unangenehm an das kooperative Multitasking. Stattdessen reicht diese Technik die Daten einfach herüber: Wer sie bekommt, darf damit arbeiten. Letztlich beruht die viel gepriesene Sicherheit von Rust auch auf dieser Art des Vorgehens. Leider werden konkurrierende Datenzugriffe aber immer noch zuerst mit Mutexen assoziiert – außer bei Sprachen wie Go und Rust, denn Go hat das Konzept der CSP von Anfang an eingebaut. Das ist vielleicht das nützlichste und oft in seiner Bedeutung übersehene Feature dieser Sprache, streng nach dem Motto: "Kommuniziere nicht, indem du Speicher teilst, sondern teile Speicher, indem du kommunizierst."

Dieses Kommunizieren geschieht über Channels, also typstrenge Datenleitungen, die die Integrität der Daten sichern. Wenn ein Thread ein Objekt in einen Channel schreibt, muss ein anderer Thread, der in den gleichen Channel schreiben will, warten (so wie die Queue in Python). Dadurch kommt ein Objekt immer genauso am anderen Ende an, wie es gesendet wurde – niemand darf "dazwischenreden". Channels bilden eine eigene Typklasse und der CSP erzeugt sie unabhängig von Sender oder Empfänger.

Wer sich schon einmal mit drei oder mehr voneinander abhängigen Mutexen herumschlagen musste, weiß, dass diese Logik extrem fehleranfällig ist. Bei Channels ist es einfacher: Man muss nicht darüber nachdenken, wer wann auf was zugreifen darf, sondern hat den Strom der Daten sozusagen vor der Nase.

Ein Beispiel: Jemand muss häufig viele Bilder verkleinern; er ist ungeduldig und möchte gern die 16 CPU-Kerne seines Ryzen-Prozessors auslasten. Unter Linux eigentlich kein Problem mit xargs -P16 und dem ImageMagick-Befehl convert. Allerdings stellt sich heraus, dass das Paket golang.org/x/image/draw die Bilder mit CatmullRom.Scale() meist ohne erkennbaren Qualitätsverlust drei- bis fünfmal kleiner rechnet als mit ImageMagick. Damit ist die Entscheidung für ein eigenes Programm gefallen – eine Implementierung in Go könnte im einfachsten Fall so aussehen, wie es das folgende Listing zeigt:

func main() {
	pics, _ := filepath.Glob("*.jpg")
	for _, filename := range pics {
		go do_scale_image(filename)
	}
}

Dieser Code setzt eine primitive Parallelisierung über Go-Routinen um.

Das go vor do_scale_imagefile() startet die Funktion als Go-Routine, also nebenläufig. Go-Routinen sind mit Threads vergleichbar, sie sind aber je nach Runtime ressourcenschonender. Das Laufzeitsystem von Go kümmert sich um das Scheduling der Go-Routinen und die Auslastung der CPUs. Dieses primitive Vorgehen hat jedoch einige Nachteile:

  • Fehlermeldungen geraten durcheinander und werden möglicherweise zerstückelt, also unlesbar.
  • Jede Go-Routine alloziert für sich benötigten Speicher und gibt ihn nach Beendigung wieder frei.

Es hängt vom Laufzeitsystem ab, wie viel Speicher jede dieser Routinen bereits beim Aufruf oder erst dann reserviert, wenn eine CPU frei wird. Bei hunderten großen Bildern könnte das sehr viel Ressourcen benötigen. Außerdem eignet sich ein derart simples Programm nicht für Sonderwünsche – vielleicht soll es zum Beispiel noch für jedes Bild den Kompressionsgrad protokollieren. Paralleles Schreiben in eine Datei erzeugt Chaos, also senden die Go-Routinen ihre Ausgaben über einen Channel msgchan, den das Programm zentral liest:

for msg := range msgchan {
fmt.Fprintln(fd, msg) // fd = Filedeskriptor
}

Die range-Schleife würde mit dem Schließen des Channels verlassen (close(msgchan)), was hier aber nicht passiert. Eine weitere Go-Routine und eine WaitGroup wg können dabei helfen, wie sie im folgenden Listing zu sehen sind:

import (
	"fmt"
	"os"
	"path/filepath"
	"sync"
)

var wg sync.WaitGroup
var msgchan chan string

func main() {
	msgchan = make(chan string)
	pics,_ := filepath.Glob("*.jpg")
	fd,_ := os.Create("protocol.txt")

	for _, filename := range pics {
		wg.Add(1)
		go scale_image(filename)
	}

	go func() {
		for msg := range msgchan {
			fmt.Fprintln(fd, msg)
		}
	}()

	wg.Wait()
	close(msgchan)
	// weitere Aktionen
}

func scale_image(fn string) {
	defer wg.Done()
	...
	msgchan <- message
}

Auf diese Weise können Entwickler Ausgaben geordnet erfassen.

Dabei handelt es sich um ein Semaphor, den das Programm explizit vor jedem Start einer Go-Routine um 1 erhöht. Es führt das defer() automatisch beim Verlassen einer Funktion aus und senkt den Zähler wieder. wg.Wait() blockiert so lange, bis alle erfassten Go-Routinen beendet sind. Das close(msgchan) beendet schließlich die Protokoll-Go-Routine. Allerdings hat auch diese Umsetzung noch Probleme:

  • Die Zahl der Go-Routinen ist nach wie vor nicht begrenzt.
  • Es ist aus technischen Gründen leider nicht möglich, über die WaitGroup die Zahl laufender Go-Routinen abzufragen – Wait() blockiert den Ablauf. Die aufrufende Funktion main() muss also warten.

Eine Standardumsetzung für diese Probleme findet sich im folgenden Listing:

import (
	"fmt"
	"os"
	"path/filepath"
	"runtime"
)

func main() {
	fnchan := make(chan string)
	msgchan := make(chan string)
	done := make(chan bool)

	pics,_ := filepath.Glob("*.jpg")
	fd,_ := os.Create("protocol.txt")
	defer fd.Close()

	numcpu := runtime.NumCPU()

	for i := 0; i < numcpu; i++ {
		go scaler(fnchan, msgchan)
	}

	for _, filename := range pics {
		fnchan <- filename
	}

	close(fnchan)

	// Empfänger-Goroutine
	go func() {
		for i := numcpu; i > 0; {
			for msg := range msgchan {
				if len(msg) == 0 {
					i--
					continue
				}
				fmt.Fprintln(fd, msg)
			}
		}
		done<- true
	}()

	close(msgchan)
	<-done
	// weitere Aktionen
}

func scaler(fnchan, msgchan chan string) {
	for fn := range fnchan {
		...
		msgchan <- message
	}
	msgchan <- "" // end marker
}

Eine Standardlösung, die mit beschränkter Zahl von Routinen arbeitet.

Go kann die Zahl der verfügbaren CPUs zur Laufzeit ermitteln, hier per numcpu. Bei I/O-intensiven Go-Routinen und ähnlichen Situationen kann die Anwendung den Wert auch mit einem Faktor versehen. Sie startet dann numcpu-Instanzen, denen es über den Channel fnchan den Dateinamen übergibt. Welche Go-Routine das nächste Datum aus nchan erhält, entscheidet die Go-Runtime. Sind alle Go-Routinen beschäftigt, wartet der Aufrufer. Die Go-Routinen senden Werte – oder Fehlermeldungen – an eine Empfänger-Routine. main() kann sich derweil anderen Dingen widmen.

Mit dem Schließen von fnchan enden auch die Go-Routinen. Mit einem Trick lässt sich die WaitGroup umgehen: scaler() sendet vor dem Return einen leeren String, und der Empfänger zählt, bis von numcpu leere Strings empfangen wurden. Der leere String ist nur ein Beispiel – eine Struktur mit einem Boolean tut es ebenso wie bei Pointern der Wert nil.

Auf eine Falle ist hier zu achten: Die Schleife im Empfänger wird zwar durch close(msgchan) verlassen, aber vielleicht geschieht das nicht sofort. Um danach auf die Ergebnisse zuzugreifen, wartet main() auf die Fertigmeldung über den Bool-Channel done. Erst wenn diese Meldung eintrifft, sind sowohl die Go-Routinen scale als auch die Empfänger-Go-Routine beendet.

Diese Umsetzung reicht in den meisten Fällen aus. Sollte es im Channel msgchan häufig Stau geben, kann das Programm diesen Kanal auch mit einem Puffer für mehrere Objekte generieren. Dabei können mehrere Empfängerroutinen parallel laufen, sofern dabei keine Konflikte auftreten können. Zumindest der Empfang von Fehlermeldungen dürfte in der Regel über eine gesonderte Go-Routine laufen, wenn das Paket log nicht ausreicht (Logging ist Thread-sicher). Wer das mit Mutexen realisieren möchte, muss mit deutlich mehr Schwierigkeiten rechnen.

Der Code aus dem vorherigen Listing taugt für die meisten Fälle, aber nicht für alle. Beispielsweise dauerte das rekursive Bearbeiten eines großen Dateibaums zu lange. Eine bei jedem gefundenen Verzeichnis neu gestartete Go-Routine würde ähnlich wie im vorigen Abschnitt funktionieren, doch existiert ein einfacherer Weg, der in diesem Listing zu finden ist:

import (
	"io/fs"
	"os"
	"runtime"
)

var gocnt chan bool

func main() {
	gocnt = make(chan bool, 4*runtime.NumCPU())
	parseDir(".")
}

func parseDir(dir string) {
	defer func() {
		<-gocnt
	}()

	gocnt <- true

	flist, _ := os.ReadDir(dir)
	for _, entry := range flist {
		if entry.Type() & fs.ModeType == 0 {
			// entry ist eine Datei
			do_something_heavy_with(entry)
			return
		}
		if entry.IsDir() {
			parseDir(entry.Name())
		}
	}
}

Dieser Code zeigt die einfache Beschränkung der Zahl der Go-Routinen.

Dort erzeugt das Programm einen Channel mit Puffer – in diesem Fall kommt die vierfache Anzahl der CPU-Kerne zum Einsatz, weil es um I/O-intensive Operationen geht und weniger gerechnet wird. Ruft das Programm parseDir() auf, wird ein true hineingeschrieben. Wenn der Channel aber gefüllt ist und zu viele Go-Routinen aktiv sind, wartet die Routine an dieser Stelle, noch bevor weitere Aktionen folgen oder Ressourcen beansprucht werden. Beim Verlassen der Routine entfernt sie den Wert per defer() aus dem Channel. So können höchstens 4*numcpu Goroutinen gleichzeitig aktiv sein. Dieses Vorgehen brachte einen deutlichen Performancezuwachs beim Parsen und sogar beim parallelen Schreiben auf eine SSD-Platte dank intelligentem Controller hervor. Ein elegantes Beispiel zeigt folgendes Listing, das von Eli Bendersky stammt.

func limitNumClients(handle http.HandlerFunc, maxClients int) http.HandlerFunc {
	sema := make(chan bool, maxClients)

	return func(w http.ResponseWriter, req *http.Request) {
		sema <- true
		handle(w, req)
		<-sema
	}
}

// Limit to max 10 connections for this handler.
http.HandleFunc("/inc", limitNumClients(inchandler, 10))

Das Programm zeigt eine Möglichkeit zur eleganten Beschränkung der HTTP-Rufe im Webserver.

Dieses Programm gibt eine unbenannte Funktion zurück, die auf den Channel sema (der hier wie ein Semaphore agiert) zurückgreifen kann, da er in ihrem Kontext definiert ist. So kann man die gleichzeitige Nutzerzahl einer Webseite einfach beschränken. Channels können ihrerseits Channels oder sogar Funktionen mit einheitlicher Signatur übermitteln. Da Go typsicher ist, stellt dieses Vorgehen aber dank Go-Interfaces kein Problem dar. Programmierer und Programmiererinnen können bei Bedarf variable Aufgaben an eine Menge von "Arbeitsroutinen" (Workern) verteilen. In jedem Fall müssen sie nur einen Ablauf entwerfen und sich nicht mit unübersichtlicher Mutex-Logik plagen.

Die bisherigen Beispielprogramme haben nur Booleans und Strings über Channels gesendet. Das ist selbst bei langen Zeichenketten kein Problem, denn Go verwaltet sie intern als Struktur, die Länge und einen Pointer auf unveränderliche Daten enthält. Dabei übermittelt der Code nur Pointer und Länge. Der Empfänger kann den String wegen des Datentyps nicht verändern.

Anders verhält es sich, wenn Programme mit beliebig großen Ganzzahlen aus dem Paket math/big rechnen. Ein Beispiel: Bei einer Anwendung stellt man per Profiling (Paket runtime/pprof) fest, dass die Division den Löwenanteil der Rechenzeit benötigte. Auch hier bietet sich die Parallelisierung an, die Go-Routinen erhalten eine Struktur mit Zähler und Nenner. Sie senden die Ergebnisse jeweils an eine weitere Go-Routine, die die Quotienten aufsummieren. Zwar übergibt auch dieses Programm die Daten wieder nur als Pointer, aber die Empfänger ändern diesmal die übermittelten Werte. Deshalb muss das Programm die Werte vor dem Rechnen auf lokale Variable kopieren. Diese Umsetzung ist effektiv: Es legt die lokalen Variablen nur einmal an und überscheibt sie immer wieder, da es die Variablen in den Go-Routinen außerhalb einer Schleife definiert. Go erzwingt im Unterschied zu Rust aber ein derartiges lokales Umkopieren nicht. Andererseits lassen sich Go-Programme meist übersichtlich und einfach aufbauen, wodurch Denkfehler schneller sichtbar werden.

Weil Go-Routinen sehr schlank sind, ist es auch kein Problem, selbst einige tausend parallel laufen zu lassen. Allerdings gilt es dabei zu bedenken, dass der Start vieler Routinen – und mehrfaches Beenden – Zeit und Speicher verbraucht. Ebenso kostet das Senden über Channels mehr Zeit als ein simpler Funktionsaufruf. Es ist also durchaus sinnvoll, größere Datenmengen an einer Stelle zu halten und die Freigabe für konkurrierende Routinen per Channel oder per Mutex zu regeln. Wie der folgende Code-Ausschnitt zeigt, lauert eine weitere Falle häufig in der richtigen Reihenfolge von Anweisungen:

    close(fnchan)
    wg.Wait()
    close(prtout)
    <-done

In diesem Beispiel schließt das Programm zunächst den Kanal fnchan, der range-Schleifen in mehreren Go-Routinen beendet. Danach wartet der Code per WaitGroup, bis alle Go-Routinen beendet sind. Erst dann haben die Schleifen wirklich ihre Ergebnisse per Channel prtout an einen Empfänger geschickt, der nun per close(prtout) beendet wird. Davor schickt dieser Empfänger aber über den Bool-Channel done eine Meldung an main() zurück: "Ich bin fertig, das Ergebnis kann verwendet werden." Der nächste Abschnitt zeigt, was bei Fehlern in der Reihenfolge passiert.

Das Entwicklungstempo in einer Sprache hängt wesentlich davon ab, wie schnell das Entwicklerteam Fehler findet. Der Go-Compiler besticht durch klare, fast immer einzeilige Meldungen, wodurch man erstaunlich schnell zu einem kompilierbaren Programm kommt. Wer sich je mit seitenlangen, unverständlichen Template-Fehlern in C++ herumschlagen musste, weiß das zu schätzen. Doch an dieser Stelle geht es um die berüchtigten Race Conditions, bei denen der Compiler und in der Regel selbst ein Debugger nicht helfen können: Solche Fehler hängen von der zufälligen Abarbeitungsreihenfolge ab. Dafür bietet das Runtime-System von Go mit der Deadlock-Erkennung zur Laufzeit eine nützliche Besonderheit. Sie schlägt an, wenn Channels keinen aktiven Sender oder Empfänger mehr haben. Wenn man im letzten Beispiel zwei Anweisungen tauscht (etwa ein close() und das Wait()), endet das Programm mit einem präzisen Stacktrace, wie ihn das nächste Listing zeigt. Anhand von Dateinamen und Zeilennummern kann man in der Regel schon herausfinden, wo der Denkfehler steckt:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
        /datadisk/wobst/tmp/tmp/picscale.go:337 +0x6a6

goroutine 6 [chan receive]:
main.main.func3(0x0?)
        /datadisk/wobst/tmp/tmp/picscale.go:277 +0x89
created by main.main
        /datadisk/wobst/tmp/tmp/picscale.go:276 +0x233
exit status 2

Die präzise Quellangabe des Fehlers durch die Deadlock-Erkennung bei Go.

Grundsätzlich ist eine Deadlock-Erkennung nicht trivial. So weiß Go zum Beispiel, dass das vergebliche Warten auf ein Netzwerkinterface nicht dazu gehört. In der Praxis kommen Programmierer und Programmiererinnen überraschend schnell zum Ziel – ganz im Unterschied zu Sprachen, bei denen das Programm einfach hängenbleibt. Für Race Conditions im engeren Sinne (genauer: Data Races) steht hingegen die Compile- und Laufzeitoption -race zur Verfügung:

  go run -race program.go
 	 oder
  go build -race _paketname_

Sie verlangsamt die Ausführung erheblich bis zu 20-fach, entdeckt aber nicht synchronisierte Schreib-/Lesezugriffe auf Speicherbereiche. Solche Fehler werden durch saubere Verwendung von Channels zwar unwahrscheinlicher, aber wenn sie auftreten, ist häufig guter Rat teuer. Allerdings kann -race nur anschlagen, wenn ein solcher Fehler tatsächlich auftritt, was vom Timing abhängt. Doch dann steht wenigstens eine Diagnose zur Verfügung.

Wer sich hingegen fragt, warum das parallelisierte Programm nicht so schnell läuft wie erhofft, dem sei das Paket runtime/trace wärmstens empfohlen. Es erzeugt ähnlich wie der Profiler unter runtime/pprof eine Protokolldatei, die sich mit go tool trace _datei_ auswerten lässt. Das Kommando öffnet im Webbrowser eine Seite mit dem Link "View trace". Das folgende Bild zeigt, welche Informationen sich dahinter finden:

Das Paket runtime/trace erzeugt eine Protokolldatei, die Entwickler und Entwicklerinnen auswerten können.

(Bild: Dr. Reinhard Wobst)

Wann ist welche Funktion als Go-Routine gelaufen, welche System- und Funktionsrufe wurden dabei ausgeführt (braune Striche, hineinzoomen) und wann war denn der Garbage Collector (GC) überhaupt aktiv? So ist erkennbar, dass der GC teilweise parallel zu vielen Go-Routinen läuft. Das Tool ist noch in der Entwicklung und nicht vollständig dokumentiert, aber es liefert schnell eine Vorstellung davon, wie gut die Parallelisierung wirkte und wo es noch hakt.

Channels nehmen viel vom Schrecken der Parallelprogrammierung, denn mit ihnen können Entwickler und Entwicklerinnen nebenläufige Programme besser verstehen. Mutexe werden durch sie nicht überflüssig, sondern neben den atomaren Operationen aus sync/atomic auch in Go verwendet. Für die hier beschriebenen Fälle sind Channels jedoch einfach besser geeignet und erlauben größere Flexibilität ohne Verkomplizierung. Entwicklungszeit nebst Fehlersicherheit eines Programms sind nicht selten entscheidender als einige Prozente Performance mehr. Go ist als kompilierte Sprache trotz des Garbage Collector – der unzählige Fehler vermeiden hilft und die Programmierung extrem vereinfacht – schnell genug.

Channels sind nicht auf Go beschränkt. In Python gibt es das Modul Queue, das funktional mithalten kann, obgleich es ungleich ineffizienter und langsamer als Go-Channels ist. Dafür geht Python Queues sogar prozessübergreifend an (Modul Multiprocessing), die zudem echte Parallelisierung im Unterschied zum Multithreading erlauben. Python führt Threads nach wie vor nur auf einem Core aus. Auch Rust bietet Channels und in C++ lassen sie sich programmieren.

Dr. Reinhard Wobst
ist Mathematiker und arbeitet als selbstständiger Programmierer, Autor und Berater mit Spezialisierung auf die Gebiete Kryptologie und Unix/Python.

(fms)