rss resume / curriculum vitae linkedin linkedin gitlab github twitter mastodon instagram
Complex Pipelines in Go (Part 4): Sane Coordination and Cancellation
Aug 19, 2020

This post is part 4 in a series:

  1. Part 1 - Introduction
  2. Part 2 - Storing Values in Batches
  3. Part 3 - Transforming Data to Tab Separated Values
  4. Part 4 - Sane Coordination and Cancellation (this post)
  5. Part 5 - Putting it All Together

Sane Cancellation and Coordination

In the previous two posts we covered the steps to implement the processes in charge of persisting and parsing data. Independently those two processes work flawlessly, but things start to become complicated when communication is needed, specially in terms of propagating errors and coordinating communication.

Thankfully errgroup covers everything we possibly need for achieving this, it provides synchronization, error propagation, and Context cancellation for groups of goroutines working on subtasks of a common task.

Recall that our goal for solving this problem is using goroutines in parallel, and although is technically possible to solve the same problem using only the standard library (via a bunch of chan error for example) not using errgroup makes the code much more harder to implement and maintain. Just consider the different ways to deal with error propagation or cancellation between multiple goroutines, it is a though problem and it gets out of control quickly.

Minimum Requirements

All the code relevant to this post is on Github, feel free to explore it for more details, the following is the minimum required for running the example:

  • Go 1.14
  • Packages required in part 2.

Understanding errgroup

The package errgroup has one function called errgroup.WithContext, this function returns an instance of errgroup.Group, this type has two methods:

  • Group.Go which receives a function, to be invoked via a goroutine, and expects an error in return; and
  • Group.Wait which blocks until all goroutines complete (by all of them returning no errors), or any of them returns an error.

This API is simple, yet so powerful. In practice they key important thing to always keep in mind is to make sure the function received by Group.Go properly handles context.Done(), via select, this is what properly handles cancellation with multiple running goroutines.

For example, consider this full example:

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"golang.org/x/sync/errgroup"
)

func main() {
	ctx, cancel := context.WithDeadline(context.Background(),
		time.Now().Add(200*time.Millisecond)) // Replace "200" with anything larger than 400
	defer cancel()

	messages := make(chan string)

	g, ctx := errgroup.WithContext(ctx)

	g.Go(func() error {
		defer close(messages)

		for _, msg := range []string{"a", "b", "c", "d"} {
			select {
			case messages <- msg:
			case <-ctx.Done():
				return ctx.Err()
			}
		}

		return nil
	})

	g.Go(func() error {
		for {
			select {
			case msg, open := <-messages:
				if !open {
					return nil
				}
				fmt.Println(msg)
			case <-ctx.Done():
				return ctx.Err()
			}

			time.Sleep(100 * time.Millisecond)
		}

		return nil
	})

	if err := g.Wait(); err != nil {
		log.Fatalln("wait", err)
	}
}

Running the example above will always fail, this is because of the 3 reasons:

  • We defined a deadline of 200 milliseconds,
  • Processing each messages takes 100 milliseconds,
  • We are trying to process 4 messages therefore we need at least ~400 milliseconds to complete the whole process.

If we replace the 200 milliseconds value with something larger than 400 milliseconds we should always see this example work.

Now, let’s change this a bit, instead of defining a deadline, what if one of the goroutines fail? For example the one meant to receive the messages:

package main

import (
	"context"
	"errors"
	"log"

	"golang.org/x/sync/errgroup"
)

func main() {
	messages := make(chan string)

	g, ctx := errgroup.WithContext(context.Background())

	g.Go(func() error {
		defer close(messages)

		for _, msg := range []string{"a", "b", "c", "d"} {
			select {
			case messages <- msg:
			case <-ctx.Done():
				return ctx.Err()
			}
		}

		return nil
	})

	g.Go(func() error {
		for {
			select {
			case msg, open := <-messages:
				if !open {
					return nil
				}

				if msg == "c" {
					return errors.New("I don't like c")
				}

			case <-ctx.Done():
				return ctx.Err()
			}
		}

		return nil
	})

	if err := g.Wait(); err != nil {
		log.Fatalln("wait", err)
	}
}

The error is still propagated correctly to the other goroutine. Both are definitely simple examples, but when you start adding steps to the pipeline you immediately see how things can get complicated really quickly.

I don’t want to sound like a broken record but it’s clear that using select and handling the case of <-ctx.Done() is what really drives all the logic behind this package.

What’s next?

The next blog post will be the conclusion of this series, we will combine everything into the final tool that solves the problem we defined in part 1. We are almost there.


Back to posts