rss twitter gitlab github linkedin
Complex Pipelines in Go (Part 1): Introduction
Jul 23, 2020

Just recently I had the opportunity to work on a tool in charge of processing gigabytes of data over the wire, the end goal was to download that data, process the values and finally insert them into persistent storage in batches.

This is the first of a series of posts covering all the different pieces involved to achieve the final tool.

  1. Part 1 - Introduction (this post)
  2. Part 2 - Storing Values in Batches
  3. Part 3 - Transforming Data to Tab Separated Values
  4. Part 4 - Sane Coordination and Cancellation
  5. Part 5 - Putting All Together

Big Picture

Succinctly this solution will consist of 3 processes:

  1. Data Producer Process: reads input data and sends that to a destination for further processing,
  2. Data Consumer Process: receives raw data, parses those values using the expected format and sends them to a different process,
  3. Persistent Storage Process: receives parsed data and stores that in batches.

This is the classic problem solved using Pipelines. The biggest difference between that classic post and this new series is how cancellation comes into place when working with multiple goroutines. This means defining rules regarding the expected behavior when anything fails, all of this handled using two great Go packages: context and errgroup.

For our example we will be using a file part of IMDB’s datasets. Those files are gzipped, tab-separated-values (TSV) formatted in the UTF-8 character set. The specific file to use will be name.basics.tsv.gz which defines the following fields:

|-------------------|-----------|---------------------------------------------------|
|       Field       | Data Type |                   Description                     |
|-------------------|-----------|---------------------------------------------------|
| nconst            | string    | alphanumeric unique identifier of the name/person |
| primaryName       | string    | name by which the person is most often credited   |
| birthYear         | string    | in YYYY format                                    |
| deathYear         | string    | in YYYY format if applicable, else '\N'           |
| primaryProfession | []string  | the top-3 professions of the person               |
| knownForTitles    | []string  | titles the person is known for                    |
|-------------------|-----------|---------------------------------------------------|

Data Producer Process: Input Data Format

Because of the location (http resource) and the file data format (gzip) of this input file, our Data Producer Process will request the file using net/http, uncompres the received values using compress/gzip and send them to the Data Consumer Process as raw []byte.

Data Consumer Process: Input Data Format

Those raw []byte values will be read into TSV records using encoding/csv and from there they will be converted into values of a new struct type Name that our next step in our pipeline can understand.

Persistent Storage Process: Input Data Format

The following will be used as the type containing the values to be eventually persisted in a database:

type Name struct {
	NConst             string
	PrimaryName        string
	BirthYear          string
	DeathYear          string
	PrimaryProfessions []string
	KnownForTitles     []string
}

We will be using PostgreSQL as the relational database for this, and specifically github.com/jackc/pgx will be imported for storing the values in batches.

What’s next?

The next blog post will cover the implementation of the PostgreSQL Batcher, as we progress in the series we will continously connect all the pieces together to eventually complete our final tool.


The more you know


Back to posts