Pocket Gophers

Expanding Convenience Functions

I don’t know about you, but I am frequently trying to use an unfamiliar library to solve some problem. Perhaps because I am not familiar with the library, it is not immediately obvious how to use the library to solve the problem.

While reading "howto kill pipe process using gopkg.in/pipe.v2" on golang-nuts, I recognized this situation and thought I would write down my approach. Before working on this article, I had not used gopkg.in/pipe.v2.

A decent problem description was given:

Im using pipe.v2 and works fine, but I need cancel the process running, How can I do it?

The process is running, but in certain circumstances I need to cancel.

Along with some code:

p := pipe.Line(
    pipe.Exec("command1", args_command1...),
    pipe.Exec("command2", args_command2...),
)
output, pipe_err := pipe.CombinedOutput(p)

if pipe_err != nil {
    // ERROR
} else {
 //OK
}

Lets get started.

Unpacking the example

Because I am unfamiliar with gopkg.in/pipe.v2, my first step is finding the documentation for the library. Going to gopkg.in/pipe.v2 in my web browser provides links to the source code and API documentation. The API documentation link to labix.org/pipe, which includes examples and a link to an introductory blog post.

I learned that gopkg.in/pipe.v2 "implements unix-like pipelines for Go." Since I am familiar with unix pipelines, I see that the example code is trying to replicate something like this shell script:

command1 args_command1 | command2 args_command2

Where args_command1 and args_command2 are lists of arguments, properly escaped.

CombinedOutputs merges the stdout and stderr output from the pipeline into output and then some error handling occurs.

Thanks to the documentation, I have a basic understanding of what the example intends to do.

A Working Example

This code looks like it came from a working code base but was cut down and anonymized for posting. This is a common pattern and is useful in prototyping a solution that does whatever you need the code to do:

  1. Extract a working example of the existing state (e.g., "Im using pipe.v2 and work fine")
  2. Modify the example to do what you need it to (e.g., "In certain circumstances I need to cancel")
  3. Take what you learned (how to cancel a running pipe) and implement it in your working code base

Since you are building a working example, you can make it easy to see if your modifications work. Since I don't have a motivating example, such as a user story, I have to make up my own.

One reason that a running process might need to be cancelled is that is runs for a long time and the user decides to stop it. Therefore an example with some time component would be useful.

I would like to see the following output from the pipeline:

starting
done

Implemented on as the command line as:

echo "starting" | awk '{print $0; system("sleep 5"); print "done"}'

And in go:

p := pipe.Line(
	pipe.Exec("echo", "starting"),
	pipe.Exec("awk", "{print $0; system(\"sleep 5\"); print \"done\"}"),
)

The awk statement will first print its input, sleep for 5 seconds, and then print done.

While this pipeline is a bit artificial, it has the following benefits:

  1. Shows that starting is piped from echo to awk
  2. starting is output immediately
  3. Some lengthy process is simulated by sleep
  4. If the pipeline is cancelled during sleep, we should see starting and not see done
  5. If the pipeline completes, we see both starting and done

Here is the working example:

pipeline.go
package main

import (
	"log"
	"os"

	"gopkg.in/pipe.v2"
)

func main() {
	log.SetFlags(log.Lshortfile)

	// echo "starting" | awk '{print $0; system("sleep 5"); print "done"}'
	p := pipe.Line(
		pipe.Exec("echo", "starting"),
		pipe.Exec("awk", "{print $0; system(\"sleep 5\"); print \"done\"}"),
	)
	output, pipe_err := pipe.CombinedOutput(p)

	if pipe_err != nil {
		log.Println(pipe_err)
	}

	os.Stdout.Write(output)
}

A few notes on this implementation:

pipeline.go:22: timeout
starting

Getting and Running the Example Code

You can use go get to download the source code:

go get -d pocketgophers.com/expanding-convenience-functions

Each example is in a separate file and can be ran like:

go run pipeline.go

replacing pipeline.go with the example you want to run.

Requirements and Planning

Now that I have a working example, I like to figure out the difference between what I have and what I want to have. This allows me to have a general way forward that will guide my actions while I am figuring out the details of the solution. This is an informal process for me and usually ends up with a simple list of phases the work could progress in.

Starting with the goal:

The process is running, but in certain circumstances I need to cancel.

I see the main goal is to cancel a running process. However, "In certain circumstances I need to cancel" implies there is some program flow outside of the pipeline execution that I control. The working example does not do this.

This thought process produces a plan with two phases:

  1. Concurrent process execution
  2. Cancel the running pipeline

These phases must be done in this order because there is no way to tell the process to stop if you can’t run code while the process is running.

Finding a Concurrent API

I start on the first phase by looking as the documentation for a concurrent interface for running the pipeline. Sadly there is no Start/Wait or channel based interface.

Since there is no obvious interface to concurrently running, and then cancelling a pipeline, I need to figure out if there is a way to do so. I start by looking at the function the example uses to run the pipeline, CombinedOutput:

// CombinedOutput runs the p pipe and returns its stdout and
// stderr outputs merged together.
//
// See functions Run, Output, and DividedOutput.
func CombinedOutput(p Pipe) ([]byte, error) {
	outb := &OutputBuffer{}
	s := NewState(outb, outb)
	err := p(s)
	if err == nil {
		err = s.RunTasks()
	}
	return outb.Bytes(), err
}

The first step in understanding what CombinedOutput does is to understand what it works with.

Scanning the function for library specific types and functions results in:

Notice that none of the types or functions are private to the library. I call this a convenience function because the library could be used without it (i.e., the user of the library writes the code themselves), but because it is a frequent or expected use for the library, the library author wrote it for us. The problem comes when none of the convenience functions fit your situation; in this case cancelling a running process.

Since this is a convenience function, I can use it as the basis for my own solution. To write my own solution, I start by looking at the documentation and find out:

Before implementing anything, it would be nice to know if there is a possibility that we can cancel the pipeline. Since State controls the running environment

Does State have a cancel function? Yes:

Kill sends a kill notice to all pending tasks.

And from there looking at Tasks.Kill :

Kill abruptly interrupts in-progress activities being done by Run.

Therefore running s.Kill should cancel the running pipeline

I also noticed that State does support a Timeout, however we want to cancel "in certain circumstances". The prototype uses time to simulate some process, but is not actually what needs to happen; therefore a timeout is not appropriate.

Expanding CombinedOutput

Now that I am more confident I will not be wasting my time, I will expand CombinedOutput into my working example. The initial goal of this expansion is to replicate the example calling CombinedOutput by replacing the function call with the contents of the function.

When expanding a convenience function I do so as mechanically as possible:

  1. Adding the package name to any package functions/variables
  2. Renaming any of the functions variables that clash with mine
  3. Replacing variables that were from the arguments with my variables
  4. Replacing return statements by setting the variables as if they were returned

In effect, I try to replace the function call with the contents of the function without modifying any other part of my prototype. You can always clean it up after you know that it still works.

expanded.go
package main

import (
	"log"
	"os"

	"gopkg.in/pipe.v2"
)

func main() {
	log.SetFlags(log.Lshortfile)

	// echo "starting" | awk '{print $0; system("sleep 5"); print "done"}'
	p := pipe.Line(
		pipe.Exec("echo", "starting"),
		pipe.Exec("awk", "{print $0; system(\"sleep 5\"); print \"done\"}"),
	)

	outb := &pipe.OutputBuffer{}
	s := pipe.NewState(outb, outb)
	err := p(s)
	if err == nil {
		err = s.RunTasks()
	}
	output := outb.Bytes()
	pipe_err := err

	if pipe_err != nil {
		log.Println(pipe_err)
	}

	os.Stdout.Write(output)
}

Running diff pipeline.go expanded.go shows that I only replaced the function call:

18c18,26
< 	output, pipe_err := pipe.CombinedOutput(p)
---
>
> 	outb := &pipe.OutputBuffer{}
> 	s := pipe.NewState(outb, outb)
> 	err := p(s)
> 	if err == nil {
> 		err = s.RunTasks()
> 	}
> 	output := outb.Bytes()
> 	pipe_err := err

Cleanup the expansion

Now that I know the expansion works, I clean it up by:

expanded_cleanup.go
package main

import (
	"log"
	"os"

	"gopkg.in/pipe.v2"
)

func main() {
	log.SetFlags(log.Lshortfile)

	// echo "starting" | awk '{print $0; system("sleep 5"); print "done"}'
	p := pipe.Line(
		pipe.Exec("echo", "starting"),
		pipe.Exec("awk", "{print $0; system(\"sleep 5\"); print \"done\"}"),
	)

	// setup the pipeline
	output := &pipe.OutputBuffer{}
	s := pipe.NewState(output, output)
	err := p(s)
	if err != nil {
		log.Fatalln(err)
	}

	// run the pipeline
	err = s.RunTasks()
	if err != nil {
		log.Println(err)
	}

	os.Stdout.Write(output.Bytes())
}

A quick run shows that it is still working as expected.

Make it Concurrent

As I previously figured out, the pipeline needs to be running when it is cancelled. Therefore starting and finishing need to be separated. Earlier I figured out that err = s.RunTasks() actually runs the pipeline. Once again, I will only replace a single line.

concurrent.go
package main

import (
	"log"
	"os"

	"gopkg.in/pipe.v2"
)

func main() {
	log.SetFlags(log.Lshortfile)

	// echo "starting" | awk '{print $0; system("sleep 5"); print "done"}'
	p := pipe.Line(
		pipe.Exec("echo", "starting"),
		pipe.Exec("awk", "{print $0; system(\"sleep 5\"); print \"done\"}"),
	)

	// setup the pipeline
	output := &pipe.OutputBuffer{}
	s := pipe.NewState(output, output)
	err := p(s)
	if err != nil {
		log.Fatalln(err)
	}

	// start the pipeline
	done := make(chan error)
	go func() {
		done <- s.RunTasks()
	}()

	// wait for the pipeline to finish
	err = <-done
	if err != nil {
		log.Println(err)
	}

	os.Stdout.Write(output.Bytes())
}

The idea here is to separate the running function, s.RunTasks, and when I get the result of that run. I create a channel that will deliver the result, start the function in a go routine, and then wait for the result to be returned over the channel.

The space between launching the go routine and receiving the result is where I will try to cancel the pipeline.

Cancelling the Pipeline

Now we can actually try to cancel the pipeline. There are a few times the pipeline could be cancelled that need to be tried:

I will try using time.Sleep to control these three options:

cancel.go
package main

import (
	"log"
	"os"
	"time"

	"gopkg.in/pipe.v2"
)

func main() {
	log.SetFlags(log.Lshortfile)

	// echo "starting" | awk '{print $0; system("sleep 5"); print "done"}'
	p := pipe.Line(
		pipe.Exec("echo", "starting"),
		pipe.Exec("awk", "{print $0; system(\"sleep 5\"); print \"done\"}"),
	)

	// setup the pipeline
	output := &pipe.OutputBuffer{}
	s := pipe.NewState(output, output)
	err := p(s)
	if err != nil {
		log.Fatalln(err)
	}

	// start the pipeline
	done := make(chan error)
	go func() {
		done <- s.RunTasks()
	}()

	// cancel the pipeline
	time.Sleep(2 * time.Second)
	s.Kill()

	// wait for the pipeline to finish
	err = <-done
	if err != nil {
		log.Println(err)
	}

	os.Stdout.Write(output.Bytes())
}

By manipulating the time.Sleep call we can try all three scenarios.

Not sleeping gives (testing immediate):

cancel.go:40: explicitly killed

Sleeping for 2 seconds gives (testing during):

cancel.go:41: explicitly killed
starting

Sleeping for 6 seconds gives (testing after):

starting
done

Running the three scenarios gives me confidence that I can cancel a running pipeline in response to whatever circumstances need to be handled.

Dig into the Fundamentals of Go

Subscribe to receive a weekly email covering a Go fundamental. Be it the language, its tooling, or its packages, you will learn what you need to know.