Pipes 101 with Go

Aug 16, 2016

Linux supports different IPC mechanisms, including pipes and named pipes. At first glance they look fantastic, in fact, they are, and they seem to be easy to use, but actually, they can be very tricky and sometimes hard to synchronize and debug.

In Garden-RunC, Cloud Foundry container runtime for Linux platform, whenever it runs a process in a container it forwards stdin/stdout/stderr when the client defines these properties on garden.ProcessIO{}, and that’s where pipes come to play:

process, _ := container.Run(garden.ProcessSpec{
  Path: "echo",
  Args: []string{"hello from the container"},
}, garden.ProcessIO{
  Stdin:  bytes.NewBufferString("hello stdin")
  Stdout: buffer,
  Stderr: buffer,
})

Pipes

It provides a unidirectional interprocess communication channel, where two ends are involved: reader and writer. Data written to the write end of the pipe can be read from the read end.

So let’s start with something simple: two processes, reader and writer, and one pipe. When we run our reader it creates a pair of pipes, reader and writer, and passes it to another program, writer.Writer opens the other end of the pipe for writing. While reader keeps blocked (pipeR.Read()) until it receives data from the other end.

reader | main.go

 1 var execerPath string
 2 
 3 func main() {
 4   flag.StringVar(&execerPath, "execer", "./execer", "path to execer")
 5   flag.Parse()
 6 
 7   pipeR, pipeW, _ := os.Pipe()
 8 
 9   cmd := exec.Command(execerPath)
10   cmd.ExtraFiles = []*os.File{
11     pipeW,
12   }
13 
14   cmd.Run()
15   data := make([]byte, 5)
16   pipeR.Read(data)
17   fmt.Printf("Data: %s\n", string(data))
18   pipeR.Close()
19   pipeW.Close()
20 }

writer | main.go

1 func main() {
2   fd3 := os.NewFile(3, "/proc/self/fd/3")
3   fd3.Write([]byte("hello"))
4   fd3.Close()
5 }

Named pipes

Named pipes works much like “regular” pipes, but it’s slightly different. It uses the filesystem when creating a named pipe and the processes can open it by name. Since it creates a file in the filesystem, it’s possible to re-use the pipe later, when the original I/O is completed. It’s worth to mention that process from different ancestry can share data between each other by using named pipes.

Create a new named pipe

1 mkfifo my_named_pipe
2 ls -l my_named_pipe
3 prw-r--r--   1 aleal  staff    0 16 Aug 09:02 my_named_pipe

Open named pipe as blocking

The example below opens the pipe for reading and blocks until the other end opens the pipe for writing. It happens because it opens the flag with the flag os.O_RDONLY and do not specify any other flag indicating that’s a non-blocking operation - covered later. Every time we use xONLY it opens the pipe as a blocking operation.

named_pipes | main.go

 1 var execerPath string
 2 
 3 func main() {
 4   flag.StringVar(&execerPath, "execer", "./execer", "path to execer")
 5   flag.Parse()
 6 
 7   tmpDir, _ := ioutil.TempDir("", "named-pipes")
 8 
 9   // Create named pipe
10   namedPipe := filepath.Join(tmpDir, "stdout")
11   syscall.Mkfifo(namedPipe, 0600)
12 
13   go func() {
14     cmd := exec.Command(execerPath, namedPipe)
15     // Just to forward the stdout
16     cmd.Stdout = os.Stdout
17     cmd.Run()
18   }()
19 
20   // Open named pipe for reading
21   fmt.Println("Opening named pipe for reading")
22   stdout, _ := os.OpenFile(namedPipe, os.O_RDONLY, 0600)
23   fmt.Println("Reading")
24 
25   var buff bytes.Buffer
26   fmt.Println("Waiting for someone to write something")
27   io.Copy(&buff, stdout)
28   stdout.Close()
29   fmt.Printf("Data: %s\n", buff.String())
30 }

execer | main.go

 1 func main() {
 2 	flag.Parse()
 3 	namedPipe := flag.Args()[0]
 4 
 5 	fmt.Println("Opening named pipe for writing")
 6 	stdout, _ := os.OpenFile(namedPipe, os.O_RDWR, 0600)
 7 	fmt.Println("Writing")
 8 	stdout.Write([]byte("hello"))
 9 	stdout.Close()
10 }

By running named_pipes it returns the output below. It clearly opens the pipe for reading and then blocks waiting for the other end:

1 # ./named_pipe -execer ./execer
2 Opening named pipe for reading
3 Opening named pipe for writing
4 Writing
5 Waiting for someone to write something
6 Data: hello

Non-Blocking

Sometimes, due to some application requirements, we need to open the pipes as non-blocking and let it fails only when the other end is not available. In order to open a named pipe as non-blocking, we can use the flag syscall.O_NOBLOCK. If data is not available when we try to read that, the syscall will fail immediately.

named_pipes | main.go

1 stdout, _ := os.OpenFile(namedPipe, os.O_RDONLY|syscall.O_NONBLOCK, 0600)
1 cd bin && ./named_pipe -execer ./execer
2 Opening named pipe for reading
3 Reading
4 Waiting for someone to write something
5 Data:
6 Opening named pipe for writing
7 Writing

When the data is not ready and we try to read it the err will be set to EAGAIN, and to avoid that we can block the file descriptor again:

named_pipes | main.go

1 syscall.SetNonblock(int(stdout.Fd()), false)

And now we can make our execer be blocking again because now we know that it will be unblocked by itself when the reader opens the fd:

execer | main.go

1 func main() {
2 	flag.Parse()
3 	namedPipe := flag.Args()[0]
4 
5 	fmt.Println("Opening named pipe for writing")
6 	stdout, _ := os.OpenFile(namedPipe, os.O_WRONLY, 0600)
7 	fmt.Println("Writing")
8 	stdout.Write([]byte("hello"))
9 }
1 Opening named pipe for writing
2 Opening named pipe for reading
3 Reading
4 Waiting for someone to write something
5 Writing
6 Data: hello

Another way to open a non-blocking pipe is by using a different flag: os.O_RDWR, but we need to make sure if that’s what we want.

Cover Photo by: Pascal Müller