1290 words
6 minutes

Go Pipeline Pattern: Turning Streams into Useful Data

Pipeline Pattern in Go#

Introduction#

Sometimes, the hard part of concurrent programming is not making things run in parallel. The hard part is keeping the flow of data understandable.

A pipeline is a simple way to do that. Instead of putting every responsibility inside one large loop, you split the work into small stages. Each stage receives values from a channel, does one transformation, and sends the result to the next stage.

source -> parse -> filter -> enrich -> sink

In Go, this pattern feels natural because goroutines and channels already give us the building blocks. A generator can create the initial stream, each pipeline stage can transform it, and a final consumer can collect or print the result.

This article continues the Go Patterns series after Producer-Consumer, Generator, and Worker Pool. The goal is not to build a framework. The goal is to learn how to structure data processing without turning one function into a pile of responsibilities.

When to Use#

Use the Pipeline Pattern when data moves through multiple steps:

  • Processing log streams
  • Reading, validating, and transforming CSV rows
  • Cleaning API responses before storing them
  • Building small ETL-like flows
  • Splitting parsing, filtering, enrichment, and reporting into separate responsibilities

The pattern is especially useful when each step can be described as:

take a stream of values, transform or filter it, and return another stream of values.

Why Use It#

Pipelines are useful because they keep each stage focused.

  • Separation of concerns: parsing, filtering, and reporting do not live in the same loop
  • Composability: stages can be reused and reordered
  • Natural backpressure: channels slow down upstream stages when downstream stages cannot keep up
  • Testability: each stage can be tested with a small input channel
  • Readable concurrency: the data flow is visible from the stage composition

The pattern is not magic. It is mostly discipline: one stage, one responsibility.

How It Works#

A pipeline stage usually has this shape:

func stage(in <-chan Input) <-chan Output {
out := make(chan Output)
go func() {
defer close(out)
for value := range in {
out <- transform(value)
}
}()
return out
}

The stage receives a read-only channel, creates its own output channel, starts a goroutine, then closes the output channel when the input channel is exhausted.

This gives us a chain:

raw := source()
parsed := parse(raw)
filtered := filter(parsed)
enriched := enrich(filtered)
sink(enriched)

Simple Example#

Before looking at logs, let’s start with a tiny pipeline:

numbers -> square -> keep even -> print
package main
import "fmt"
func numbers(max int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= max; i++ {
out <- i
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func keepEven(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
if n%2 == 0 {
out <- n
}
}
}()
return out
}
func main() {
values := numbers(10)
squared := square(values)
evenSquares := keepEven(squared)
for n := range evenSquares {
fmt.Println(n)
}
}

Each function owns one small part of the work. numbers produces values, square transforms them, keepEven filters them, and main consumes the final stream.

That is the pipeline pattern in its smallest useful form.

Real-World Example: Log Processing Pipeline#

Now let’s use a more realistic example.

Imagine we receive raw log lines and want to turn them into useful information. We can model that as a pipeline:

raw log lines -> parse logs -> filter errors -> enrich logs -> print report

For a complete runnable version, this example needs:

import (
"fmt"
"strings"
"time"
)

First, define the data we want to pass between stages:

type RawLog string
type LogEntry struct {
Timestamp time.Time
Level string
Service string
Message string
Alert bool
}

The source stage sends raw log lines:

func logSource(lines []string) <-chan RawLog {
out := make(chan RawLog)
go func() {
defer close(out)
for _, line := range lines {
out <- RawLog(line)
}
}()
return out
}

The parser turns each raw line into a structured LogEntry:

func parseLogs(in <-chan RawLog) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
for raw := range in {
parts := strings.SplitN(string(raw), "|", 4)
if len(parts) != 4 {
continue
}
timestamp, err := time.Parse(time.RFC3339, parts[0])
if err != nil {
continue
}
out <- LogEntry{
Timestamp: timestamp,
Level: parts[1],
Service: parts[2],
Message: parts[3],
}
}
}()
return out
}

The filter stage keeps only errors:

func filterErrors(in <-chan LogEntry) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
for entry := range in {
if entry.Level == "ERROR" {
out <- entry
}
}
}()
return out
}

The enrichment stage adds a small piece of derived information:

func enrichLogs(in <-chan LogEntry) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
for entry := range in {
entry.Alert = entry.Service == "payment" || entry.Service == "auth"
out <- entry
}
}()
return out
}

Finally, the sink consumes the enriched entries:

func printReport(in <-chan LogEntry) {
for entry := range in {
alert := ""
if entry.Alert {
alert = " [ALERT]"
}
fmt.Printf("%s %s: %s%s\n", entry.Service, entry.Level, entry.Message, alert)
}
}

The full pipeline becomes very readable:

func main() {
lines := []string{
"2026-04-24T10:00:00Z|INFO|api|request completed",
"2026-04-24T10:00:01Z|ERROR|payment|card authorization failed",
"2026-04-24T10:00:02Z|ERROR|worker|job timeout",
"2026-04-24T10:00:03Z|ERROR|auth|invalid token",
}
raw := logSource(lines)
parsed := parseLogs(raw)
errors := filterErrors(parsed)
enriched := enrichLogs(errors)
printReport(enriched)
}

The important part is not the log format. The important part is that the flow is explicit.

Each stage can be read, tested, and replaced independently.

Error Handling#

The log parser above silently skips invalid lines. That keeps the example small, but it is not always what you want in production.

Two common approaches are:

  1. Send errors to a separate error channel
  2. Pass a result type through the pipeline

For example:

type LogResult struct {
Entry LogEntry
Err error
}

This makes failures explicit without panicking inside a goroutine. It also lets the final consumer decide whether to log, count, retry, or ignore invalid records.

Cancellation#

The examples above work for finite inputs. For long-running pipelines, use context.Context so every stage can stop when the caller is done.

The shape usually looks like this:

func parseLogs(ctx context.Context, in <-chan RawLog) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
return
case raw, ok := <-in:
if !ok {
return
}
entry, ok := parseLog(raw)
if ok {
out <- entry
}
}
}
}()
return out
}

Without cancellation, a pipeline that reads from a never-ending source can leak goroutines when the consumer stops early.

Best Practices and Pitfalls#

Best Practices:

  1. Keep each stage focused on one responsibility
  2. Return receive-only channels (<-chan T) from stages
  3. Close the output channel from the goroutine that writes to it
  4. Use context.Context for long-running or cancellable pipelines
  5. Test each stage independently with small input channels

Pitfalls:

  1. Forgetting to close output channels
  2. Stopping early without cancelling upstream stages
  3. Creating too many tiny stages that hide simple logic
  4. Mixing parsing, filtering, enrichment, and reporting in one function
  5. Assuming ordering will stay the same if you later parallelize a stage
  • Generator Pattern: Creates the initial stream of values
  • Producer-Consumer Pattern: Separates production from consumption
  • Worker Pool Pattern: Parallelizes expensive stages
  • Fan-Out/Fan-In Pattern: Distributes one stage across multiple workers and merges the results

Summary#

The Pipeline Pattern is one of the most readable ways to structure data processing in Go. It lets you split a flow into small stages, connect them with channels, and keep each responsibility isolated.

It works well when data naturally moves through a sequence: read, parse, filter, enrich, report.

The pattern is also a bridge to more advanced concurrency designs. Once one stage becomes too slow, you can combine a pipeline with a Worker Pool or Fan-Out/Fan-In to parallelize only that part of the flow.

Series Navigation#

This article is part of the Go Patterns series:


If you want to experiment with the code examples, you can find them on my GitHub repository.

Go Pipeline Pattern: Turning Streams into Useful Data
https://corentings.dev/blog/go-pattern-pipeline/
Author
Corentin Giaufer Saubert
Published at
2026-04-24
License
CC BY-NC-SA 4.0

The Backend Blueprint

Get weekly backend engineering insights delivered to your inbox.