Go Pipeline Pattern: Turning Streams into Useful Data
Pipeline Pattern in Go
Introduction
Sometimes, the hard part of concurrent programming is not making things run in parallel. The hard part is keeping the flow of data understandable.
A pipeline is a simple way to do that. Instead of putting every responsibility inside one large loop, you split the work into small stages. Each stage receives values from a channel, does one transformation, and sends the result to the next stage.
source -> parse -> filter -> enrich -> sinkIn Go, this pattern feels natural because goroutines and channels already give us the building blocks. A generator can create the initial stream, each pipeline stage can transform it, and a final consumer can collect or print the result.
This article continues the Go Patterns series after Producer-Consumer, Generator, and Worker Pool. The goal is not to build a framework. The goal is to learn how to structure data processing without turning one function into a pile of responsibilities.
When to Use
Use the Pipeline Pattern when data moves through multiple steps:
- Processing log streams
- Reading, validating, and transforming CSV rows
- Cleaning API responses before storing them
- Building small ETL-like flows
- Splitting parsing, filtering, enrichment, and reporting into separate responsibilities
The pattern is especially useful when each step can be described as:
take a stream of values, transform or filter it, and return another stream of values.
Why Use It
Pipelines are useful because they keep each stage focused.
- Separation of concerns: parsing, filtering, and reporting do not live in the same loop
- Composability: stages can be reused and reordered
- Natural backpressure: channels slow down upstream stages when downstream stages cannot keep up
- Testability: each stage can be tested with a small input channel
- Readable concurrency: the data flow is visible from the stage composition
The pattern is not magic. It is mostly discipline: one stage, one responsibility.
How It Works
A pipeline stage usually has this shape:
func stage(in <-chan Input) <-chan Output { out := make(chan Output)
go func() { defer close(out)
for value := range in { out <- transform(value) } }()
return out}The stage receives a read-only channel, creates its own output channel, starts a goroutine, then closes the output channel when the input channel is exhausted.
This gives us a chain:
raw := source()parsed := parse(raw)filtered := filter(parsed)enriched := enrich(filtered)sink(enriched)Simple Example
Before looking at logs, let’s start with a tiny pipeline:
numbers -> square -> keep even -> printpackage main
import "fmt"
func numbers(max int) <-chan int { out := make(chan int)
go func() { defer close(out)
for i := 1; i <= max; i++ { out <- i } }()
return out}
func square(in <-chan int) <-chan int { out := make(chan int)
go func() { defer close(out)
for n := range in { out <- n * n } }()
return out}
func keepEven(in <-chan int) <-chan int { out := make(chan int)
go func() { defer close(out)
for n := range in { if n%2 == 0 { out <- n } } }()
return out}
func main() { values := numbers(10) squared := square(values) evenSquares := keepEven(squared)
for n := range evenSquares { fmt.Println(n) }}Each function owns one small part of the work.
numbers produces values, square transforms them, keepEven filters them, and main consumes the final stream.
That is the pipeline pattern in its smallest useful form.
Real-World Example: Log Processing Pipeline
Now let’s use a more realistic example.
Imagine we receive raw log lines and want to turn them into useful information. We can model that as a pipeline:
raw log lines -> parse logs -> filter errors -> enrich logs -> print reportFor a complete runnable version, this example needs:
import ( "fmt" "strings" "time")First, define the data we want to pass between stages:
type RawLog string
type LogEntry struct { Timestamp time.Time Level string Service string Message string Alert bool}The source stage sends raw log lines:
func logSource(lines []string) <-chan RawLog { out := make(chan RawLog)
go func() { defer close(out)
for _, line := range lines { out <- RawLog(line) } }()
return out}The parser turns each raw line into a structured LogEntry:
func parseLogs(in <-chan RawLog) <-chan LogEntry { out := make(chan LogEntry)
go func() { defer close(out)
for raw := range in { parts := strings.SplitN(string(raw), "|", 4) if len(parts) != 4 { continue }
timestamp, err := time.Parse(time.RFC3339, parts[0]) if err != nil { continue }
out <- LogEntry{ Timestamp: timestamp, Level: parts[1], Service: parts[2], Message: parts[3], } } }()
return out}The filter stage keeps only errors:
func filterErrors(in <-chan LogEntry) <-chan LogEntry { out := make(chan LogEntry)
go func() { defer close(out)
for entry := range in { if entry.Level == "ERROR" { out <- entry } } }()
return out}The enrichment stage adds a small piece of derived information:
func enrichLogs(in <-chan LogEntry) <-chan LogEntry { out := make(chan LogEntry)
go func() { defer close(out)
for entry := range in { entry.Alert = entry.Service == "payment" || entry.Service == "auth" out <- entry } }()
return out}Finally, the sink consumes the enriched entries:
func printReport(in <-chan LogEntry) { for entry := range in { alert := "" if entry.Alert { alert = " [ALERT]" }
fmt.Printf("%s %s: %s%s\n", entry.Service, entry.Level, entry.Message, alert) }}The full pipeline becomes very readable:
func main() { lines := []string{ "2026-04-24T10:00:00Z|INFO|api|request completed", "2026-04-24T10:00:01Z|ERROR|payment|card authorization failed", "2026-04-24T10:00:02Z|ERROR|worker|job timeout", "2026-04-24T10:00:03Z|ERROR|auth|invalid token", }
raw := logSource(lines) parsed := parseLogs(raw) errors := filterErrors(parsed) enriched := enrichLogs(errors)
printReport(enriched)}The important part is not the log format. The important part is that the flow is explicit.
Each stage can be read, tested, and replaced independently.
Error Handling
The log parser above silently skips invalid lines. That keeps the example small, but it is not always what you want in production.
Two common approaches are:
- Send errors to a separate error channel
- Pass a result type through the pipeline
For example:
type LogResult struct { Entry LogEntry Err error}This makes failures explicit without panicking inside a goroutine. It also lets the final consumer decide whether to log, count, retry, or ignore invalid records.
Cancellation
The examples above work for finite inputs.
For long-running pipelines, use context.Context so every stage can stop when the caller is done.
The shape usually looks like this:
func parseLogs(ctx context.Context, in <-chan RawLog) <-chan LogEntry { out := make(chan LogEntry)
go func() { defer close(out)
for { select { case <-ctx.Done(): return case raw, ok := <-in: if !ok { return }
entry, ok := parseLog(raw) if ok { out <- entry } } } }()
return out}Without cancellation, a pipeline that reads from a never-ending source can leak goroutines when the consumer stops early.
Best Practices and Pitfalls
Best Practices:
- Keep each stage focused on one responsibility
- Return receive-only channels (
<-chan T) from stages - Close the output channel from the goroutine that writes to it
- Use
context.Contextfor long-running or cancellable pipelines - Test each stage independently with small input channels
Pitfalls:
- Forgetting to close output channels
- Stopping early without cancelling upstream stages
- Creating too many tiny stages that hide simple logic
- Mixing parsing, filtering, enrichment, and reporting in one function
- Assuming ordering will stay the same if you later parallelize a stage
Related Patterns
- Generator Pattern: Creates the initial stream of values
- Producer-Consumer Pattern: Separates production from consumption
- Worker Pool Pattern: Parallelizes expensive stages
- Fan-Out/Fan-In Pattern: Distributes one stage across multiple workers and merges the results
Summary
The Pipeline Pattern is one of the most readable ways to structure data processing in Go. It lets you split a flow into small stages, connect them with channels, and keep each responsibility isolated.
It works well when data naturally moves through a sequence: read, parse, filter, enrich, report.
The pattern is also a bridge to more advanced concurrency designs. Once one stage becomes too slow, you can combine a pipeline with a Worker Pool or Fan-Out/Fan-In to parallelize only that part of the flow.
Series Navigation
This article is part of the Go Patterns series:
- Previous: Mastering the Worker Pool Pattern in Go
- Series: Go Patterns
If you want to experiment with the code examples, you can find them on my GitHub repository.
The Backend Blueprint
Get weekly backend engineering insights delivered to your inbox.
Related Posts
Mastering the Worker Pool Pattern in Go
Master the Worker Pool Pattern in Go to manage concurrent tasks efficiently. Control resource usage, improve throughput, and scale your applications.
Mastering the Generator Pattern in Go
Master the Generator Pattern in Go using goroutines and channels. Learn lazy evaluation, composability, and practical examples for data streams and iterators.
Understanding the Producer-Consumer Pattern in Go
Understanding the Producer-Consumer Pattern in Go with channels. Modular architecture, flexible scaling, and real-world concurrent data processing examples.