Byte Friendly

Random thoughts on programming and related topics.

Pipeline Processing in Go

| Comments

Pipeline processing is a very powerful design idiom. You have some simple building blocks that you can arrange in different combinations to perform complex tasks. A classic example of that is unix command line. Each tool is very simple. It does only job. But it still amazes me regularly, what you can achieve by simply combining those tools into a pipeline and feeding/piping data through it.

Say, you’re building an RSS reader that shows new posts live. Implementing it in a regular procedural manner is easy. Something like this (pseudo-code):

  loop {
    fetch posts
    for each post {
      if we have not yet seen this post {
        mark post seen
        show it to user
      }
    }
  }

Say, now we want to do a focused reading. Which means that we only want to see a subset of posts which satisfy some arbitrary criteria (filter by tags, etc.). No problemo, we just add one conditional in there.

  loop {
    fetch posts
    for each post {
      if post is interesting {
        if we have not yet seen this post {
          mark post seen
          show it to user
        }
      }
    }
  }

Now it’s getting a little bit difficult to read and it will get worse. All business rules are in the same pile of code and it may be difficult to tell them apart.

But if you think about it, there is a pipeline that consists of several primitive segments:

Each individual block is completely separate from each other (well, maybe except that “select unseen” and “mark all seen” might use the same storage). If we were to, say, remove caching (and stop serving cached content), we’d only have to take out those two segments. If we want to change how content is presented to user (print to terminal, send to text-to-speect engine, …), we only have to replace the final segment. The rest of the pipeline stays untouched. And the aforementioned tag filtering - we just insert it after fetcher:

In the pipeline, each segment can either swallow a message or pass it on (after applying some transformation (optional)). Two simple rules, infinite flexibility.

Go language allows for natural expression of this design: goroutines as segments, connected by channels.

Here’s an intentionally primitive example, which filters and transforms a stream of integers.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package main

import (
  "fmt"
  "time"
)

// type alias for our pipeline segment
// segment is a function that reads from a stream of integers
//  and writes to a stream of integers
type pipelineSegment func(in, out chan int)

func main() {
  // construct our pipeline. Put generator first, then filters/modifiers
  out := makePipeline(generator, onlyOdd, plusOne, squared, plusOne)

  for v := range out {
    fmt.Printf("Resulting value: %d\n", v)
  }
}

// This simply generates sequential integers in infinite loop.
func generator(_, out chan int) {
  i := 0

  for {
    out <- i
    i += 1
    time.Sleep(100 * time.Millisecond)
  }
}

// Filter. Selects only odd integers. Even integers are swallowed.
func onlyOdd(in, out chan int) {
  defer close(out)
  for val := range in {
    if val%2 == 1 {
      out <- val
    }
  }
}

// Modifier. Adds 1 and passes on.
func plusOne(in, out chan int) {
  defer close(out)
  for val := range in {
    out <- val + 1
  }
}

// Modifier. Passes a square of incoming integer.
func squared(in, out chan int) {
  defer close(out)
  for val := range in {
    out <- val * val
  }
}

// Generates pipeline out of individual segments. 
// Returns an "exhaust pipe", from which fully processed integers can be read.
func makePipeline(segments ...pipelineSegment) (out chan int) {
  current_input := make(chan int)
  var current_output chan int

  for _, seg := range segments {
    current_output = make(chan int)
    go seg(current_input, current_output)

    current_input = current_output
  }

  return current_output
}

Produced output:

  Resulting value: 5
  Resulting value: 17
  Resulting value: 37
  Resulting value: 65
  Resulting value: 101
  Resulting value: 145
  Resulting value: 197
  ...

First generated integer is 0. It’s even (well, certainly not odd), so it does not make it past filter.

Next one is 1. It passes filter. Then it gets +1, so it’s now 2. Then it’s squared and becomes 4. And finally, one more +1 which results in 5. There are no more segments, so this value is read from the output pipe and printed to terminal.

Hope this was useful.

Comments