rss resume / curriculum vitae linkedin linkedin gitlab github twitter mastodon instagram
Complex Pipelines in Go (Part 5): Putting it all together
Aug 27, 2020

This is the last post of the series Complex Pipelines in Go!

  1. Part 1 - Introduction
  2. Part 2 - Storing Values in Batches
  3. Part 3 - Transforming Data to Tab Separated Values
  4. Part 4 - Sane Coordination and Cancellation
  5. Part 5 - Putting it All Together (this post)

Putting it All Together

It’s time to connect all the dots and build the final tool. The most important components were already covered in the previous posts but are missing something else: the initial HTTP request meant to be used for downloading the gzip file.

Let’s work on that first and then we can put everything together.

Minimum Requirements

All the code relevant to this post is on Github, feel free to explore it for more details, the following is the minimum required for running the example:

  • Go 1.14
  • PostgreSQL 12.3: in theory any recent version should work, the README.md includes specific instructions for running it with Docker.

Downloading the file

To download the file via HTTP we have to use the standard library, specifically the types net/http.Client and compress/gzip.Reader, this is because the file we expect to download is a gzipped one.

For both requirements the following short snippet should cover that:

 // XXX omiting error handling to keep code short
req, _ := http.NewRequest(http.MethodGet, "https://datasets.imdbws.com/name.basics.tsv.gz", nil)

client := &http.Client{
	Timeout: 10 * time.Minute, // XXX: use something reasonable
}

resp, _ := client.Do(req)
 defer resp.Body.Close()

gr, _ := gzip.NewReader(resp.Body)
defer gr.Close()

for {
	line, err := cr.ReadString('\n')
	if err == io.EOF {
		return
	}

   // XXX: do something with the read value!
}

Connecting all the dots

The biggest and most important thing to consider during this integration (of all the previous posts, that is) is how we should handle downstream errors coming from PostgreSQL, specifically the change in batcher, which in the end consolidates a type introduced in part 2 (Storing Values in Batches) called copyFromSourceMediator this is with the idea of handling errors more closely.

The reason being of this change is the delay between the actual pgx calls (and therefore PostgreSQL) and ours, in practice what this means is that we require sync.Mutex for synchronizing the two goroutines handling sending messages to PostgreSQL and receiving messages from upstream.

See:

func (b *batcher) Copy(ctx context.Context, namesC <-chan name) <-chan error {
	outErrC := make(chan error)
	var mutex sync.Mutex
	var copyFromErr error

	copyFrom := func(batchNamesC <-chan name, batchErrC <-chan error) <-chan error {
		cpOutErrorC := make(chan error)

		go func() {
			defer close(cpOutErrorC)

			copier := newCopyFromSource(batchNamesC, batchErrC)

			_, err := b.conn.CopyFrom(ctx,
				pgx.Identifier{"names"},
				[]string{
					"nconst",
					"primary_name",
					"birth_year",
					"death_year",
					"primary_professions",
					"known_for_titles",
				},
				copier)

			if err != nil {
				mutex.Lock()
				copyFromErr = err
				mutex.Unlock()
			}
		}()

		return cpOutErrorC
	}

	go func() {
		batchErrC := make(chan error)
		batchNameC := make(chan name)

		cpOutErrorC := copyFrom(batchNameC, batchErrC)

		defer func() {
			close(batchErrC)
			close(batchNameC)
			close(outErrC)
		}()

		var index int64

		for {
			select {
			case n, open := <-namesC:
				if !open {
					return
				}

				mutex.Lock()
				if copyFromErr != nil {
					namesC = nil
					mutex.Unlock()
					outErrC <- copyFromErr
					return
				}
				mutex.Unlock()

				batchNameC <- n

				index++

				if index == b.size {
					close(batchErrC)
					close(batchNameC)

					if err := <-cpOutErrorC; err != nil {
						outErrC <- err
						return
					}

					batchErrC = make(chan error)
					batchNameC = make(chan name)

					cpOutErrorC = copyFrom(batchNameC, batchErrC)
					index = 0
				}
			case <-ctx.Done():
				if err := ctx.Err(); err != nil {
					batchErrC <- err
					outErrC <- err
					return
				}
			}
		}
	}()

	return outErrC
}

The code looks like a lot but really the important bits are in the variable/function copyFrom which still uses copyFromSource for dealing with events coming from upstream and also uses a mutex to set errors coming from pgx’s CopyFrom.

What’s next?

This is the last post of the series but I don’t think this is the end, I will follow up and improve the existing code in the future. I will answer (at least) the two following questions:

  1. How to provide processing status? How many events left? How many events processed?
  2. How to resume processing events in case of failures?

I will improve this implementation, wait for it.


Back to posts