After working in Go for some time now, I learned how to use an unbuffered channel to build a pool of goroutines. I like this implementation better than what is implemented in this post. That being said, this post still has value in what it describes.
https://github.com/goinggo/work
Introduction
In my world of server development thread pooling has been the key to building robust code on the Microsoft stack. Microsoft has failed in .Net by giving each Process a single thread pool with thousands of threads and thinking they could manage the concurrency at runtime. Early on I realized this was never going to work. At least not for the servers I was developing.
When I was building servers in C/C++ using the Win32 API, I created a class that abstracted IOCP to give me thread pools I could post work into. This has always worked very well because I could define the number of threads in the pool and the concurrency level (the number of threads allowed to be active at any given time). I ported this code for all of my C# development. If you want to learn more about this, I wrote an article years ago (http://www.theukwebdesigncompany.com/articles/iocp-thread-pooling.php). Using IOCP gave me the performance and flexibility I needed. BTW, the .NET thread pool uses IOCP underneath.
The idea of the thread pool is fairly simple. Work comes into the server and needs to get processed. Most of this work is asynchronous in nature but it doesn’t have to be. Many times the work is coming off a socket or from an internal routine. The thread pool queues up the work and then a thread from the pool is assigned to perform the work. The work is processed in the order it was received. The pool provides a great pattern for performing work efficiently. Spawning a new thread everytime work needs to be processed can put heavy loads on the operating system and cause major performance problems.
So how is the thread pool performance tuned? You need to identify the number of threads each pool should contain to get the work done the quickest. When all the routines are busy processing work, new work stays queued. You want this because at some point having more routines processing work slow things down. This can be for a myriad of reasons such as, the number of cores you have in your machine to the ability of your database to handle requests. During testing you can find that happy number.
I always start with looking at how many cores I have and the type of work being processed. Does this work get blocked and for how long on average. On the Microsoft stack I found that three active threads per core seemed to yield the best performance for most tasks. I have no idea yet what the numbers will be in Go.
You can also create different thread pools for the different types of work the server will need to process. Because each thread pool can be configured, you can spend time performance tuning the server for maximum throughput. Having this type of command and control to maximize performance is crucial.
In Go we don’t create threads but routines. The routines function like multi-threaded functions but Go manages the actual use of OS level threading. To learn more about concurrency in Go check out this document: http://golang.org/doc/effective_go.html#concurrency.
The packages I have created are called workpool and jobpool. These use the channel and go routine constructs to implement pooling.
Workpool
This package creates a pool of go routines that are dedicated to processing work posted into the pool. A single Go routine is used to queue the work. The queue routine provides the safe queuing of work, keeps track of the amount of work in the queue and reports an error if the queue is full.
Posting work into the queue is a blocking call. This is so the caller can verify that the work is queued. Counts for the number of active worker routines are maintained.
Here is some sample code on how to use the workpool:
package main
import (
"bufio"
"fmt"
"os"
"runtime"
"strconv"
"time"
"github.com/goinggo/workpool"
)
type MyWork struct {
Name string
BirthYear int
WP *workpool.WorkPool
}
func (mw *MyWork) DoWork(workRoutine int) {
fmt.Printf("%s : %d\n", mw.Name, mw.BirthYear)
fmt.Printf("Q:%d R:%d\n", mw.WP.QueuedWork(), mw.WP.ActiveRoutines())
// Simulate some delay
time.Sleep(100 * time.Millisecond)
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
workPool := workpool.New(runtime.NumCPU(), 800)
shutdown := false // Race Condition, Sorry
go func() {
for i := 0; i < 1000; i++ {
work := MyWork {
Name: "A" + strconv.Itoa(i),
BirthYear: i,
WP: workPool,
}
if err := workPool.PostWork("routine", &work); err != nil {
fmt.Printf("ERROR: %s\n", err)
time.Sleep(100 * time.Millisecond)
}
if shutdown == true {
return
}
}
}()
fmt.Println("Hit any key to exit")
reader := bufio.NewReader(os.Stdin)
reader.ReadString('\n')
shutdown = true
fmt.Println("Shutting Down")
workPool.Shutdown("routine")
}
If we look at main, we create a thread pool where the number of routines to use is based on the number of cores we have on the machine. This means we have a routine for each core. You can't do any more work if each core is busy. Again, performance testing will determine what this number should be. The second parameter is the size of the queue. In this case I have made the queue large enough to handle all the requests coming in.
The MyWork type defines the state I need to perform the work. The member function DoWork is required because it implements an interface required by the PostWork call. To pass any work into the thread pool this method must be implement by the type.
The DoWork method is doing two things. First it is displaying the state of the object. Second it is reporting the number of items in queue and the active number of Go Routines. These numbers can be used to determining the health of the thread pool and for performance testing.
Finally I have a Go routine posting work into the work pool inside of a loop. At the same time this is happening, the work pool is executing DoWork for each object queued. Eventually the Go routine is done and the work pool keeps on doing its job. If we hit enter at anytime the programming shuts down gracefully.
The PostWork method could return an error in this sample program. This is because the PostWork method will guarantee work is placed in queue or it will fail. The only reason for this to fail is if the queue is full. Setting the queue length is an important consideration.
Jobpool
The jobpool package is similar to the workpool package except for one implementation detail. This package maintains two queues, one for normal processing and one for priority processing. Pending jobs in the priority queue always get processed before pending jobs in the normal queue.
The use of two queues makes jobpool a bit more complex than workpool. If you don't need priority processing, then using a workpool is going to be faster and more efficient.
Here is some sample code on how to use the jobpool:
package main
import (
"fmt"
"time"
"github.com/goinggo/jobpool"
)
type WorkProvider1 struct {
Name string
}
func (wp *WorkProvider1) RunJob(jobRoutine int) {
fmt.Printf("Perform Job : Provider 1 : Started: %s\n", wp.Name)
time.Sleep(2 * time.Second)
fmt.Printf("Perform Job : Provider 1 : DONE: %s\n", wp.Name)
}
type WorkProvider2 struct {
Name string
}
func (wp *WorkProvider2) RunJob(jobRoutine int) {
fmt.Printf("Perform Job : Provider 2 : Started: %s\n", wp.Name)
time.Sleep(5 * time.Second)
fmt.Printf("Perform Job : Provider 2 : DONE: %s\n", wp.Name)
}
func main() {
jobPool := jobpool.New(2, 1000)
jobPool.QueueJob("main", &WorkProvider1{"Normal Priority : 1"}, false)
fmt.Printf("*******> QW: %d AR: %d\n",
jobPool.QueuedJobs(),
jobPool.ActiveRoutines())
time.Sleep(1 * time.Second)
jobPool.QueueJob("main", &WorkProvider1{"Normal Priority : 2"}, false)
jobPool.QueueJob("main", &WorkProvider1{"Normal Priority : 3"}, false)
jobPool.QueueJob("main", &WorkProvider2{"High Priority : 4"}, true)
fmt.Printf("*******> QW: %d AR: %d\n",
jobPool.QueuedJobs(),
jobPool.ActiveRoutines())
time.Sleep(15 * time.Second)
jobPool.Shutdown("main")
}
In this sample code we create two worker type structs. It's best to think that each worker is some independent job in the system.
In main we create a job pool with 2 job routines and support for 1000 pending jobs. First we create 3 different WorkProvider1 objects and post them into the queue, setting the priority flag to false. Next we create a WorkProvider2 object and post that into the queue, setting the priority flag to true.
The first two jobs that are queued will be processed first since the job pool has 2 routines. As soon as one of those jobs are completed, the next job is retrieved from the queue. The WorkProvider2 job will be processed next because it was placed in the priority queue.
To get a copy of the workpool and jobpool packages, go to
github.com/goinggoAs always I hope this code can help you in some small way.