Go forth and

Multiply

Concurrency and Communication in Go

Toby DiPasquale

cbcg.net

Go makes concurrency simple

Supports parallelism, too...

They are not the same

Concurrency: "several independent activities, each of which executes at its own pace"1

[1] Concepts, Techniques, and Models of Computer Programming

Parallelism: Tasks actually happening simultaneously

Concurrency is a conceptual property of a program

Parallelism is a runtime detail

We'll talk mostly concurrency

...let the runtime deal with parallelism

Examples

Fork-Join

(aka Scatter/Gather)

Bunch of items to process

Idempotent work; can be done concurrently

 func ForkJoin(work []*JobSpec) {
   jobs := make(chan *JobSpec, len(work))
   results := make(chan *JobResult)
   //...
   // we're going to work on up to NumCPU items at once
   for i := 0; i < runtime.NumCPU(); i++ {
     go func() {
       // see how easy it is to iterate a channel?
       for job := range jobs {
         results <- ProcessJob(job)
       }
     }()
   }
   // load all work into the input channel
   for _, spec := range work {
     jobs <- spec
   }

   // indicate when to shutdown by closing channel
   close(jobs)
   // now iterate each work item and handle the result
   for i := 0; i < len(work); i++ {
     result := <-results
     // handle result here...
   }
 }
 func ForkJoin(work []*JobSpec) {
   jobs := make(chan *JobSpec, len(work))
   results := make(chan *JobResult)
   for i := 0; i < runtime.NumCPU(); i++ {
     go func() {
       for job := range jobs {
         results <- ProcessJob(job)
       }
     }()
   }
   for _, spec := range work { jobs <- spec }
   close(jobs)
   for i := 0; i < len(work); i++ {
     result := <-results
     // handle result here...
   }
 }

SMTP Service

Multiple clients connecting over TCP

Sending requests concurrently

Expecting responses concurrently

 type TCPService interface {
   Addr() *net.TCPAddr
   SetClientOptions(*net.TCPConn) error
   Handle(*net.TCPConn)
   Shutdown()
 }
 func RunTCP(t TCPService) {
   l, err := net.ListenTCP("tcp", t.Addr())
   if err != nil {
     t.Shutdown()
     return
   }
   defer l.Close()
   for {
     conn, err := l.AcceptTCP()
     if err != nil { continue }
     if err := t.SetClientOptions(conn); err != nil {
       conn.Close()
       continue
     }
     go t.Handle(conn)
   }
 }
 type Verdict int
 const (
   Continue = iota
   Terminate
 )

 type SMTPService struct {
   cfg      Config
   addr     *net.TCPAddr
   exited   chan int
   draining bool
 }
 func NewSMTPService(c Config, exited chan int) *SMTPService {
   return &SMTPService{
     cfg:      c,
     addr:     c.GetListenAddr(),
     exited:   exited,
     draining: false,
   }
 }
 func (s *SMTPService) Shutdown() {
   s.draining = true
   s.exited <- 1
 }
 func (s *SMTPService) Addr() *net.TCPAddr {
   return s.addr
 }
 func (s *SMTPService) SetClientOptions(conn *net.TCPConn) error {
   if err := conn.SetKeepAlive(false); err != nil {
     return err
   }
   if err := conn.SetLinger(-1); err != nil {
     return err
   }
   return nil
 }
 func (s *SMTPService) Handle(conn *net.TCPConn) {
   defer conn.Close()
   if s.draining {
     conn.Write([]byte("421 Service not available\r\n"))
     return
   }
   session := NewSMTPSession(conn, c.cfg)
   if verdict := session.Greet(); verdict == Terminate {
     return
   }
   for {
     if verdict := session.Process(); verdict == Terminate {
       return
     }
   }
 }
 func main() {
   //...
   exitChan := make(chan int)
   signalChan := make(chan os.Signal, 1)
   go func() {
     <-signalChan
     exitChan <- 1
   }()
   signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)

   go RunTCP(NewSMTPService(cfg, exitChan))
   <-exitChan
 }

Robust requests

Multiple replicas from which to choose

Send request to all

Use first one to respond

Enforce timeout on all of them

 type Replica interface {
   GetResponse(Request) Response
 }

 type Client struct {
   Replicas []Replica
   Timeout  time.Duration
 }
 func (c *Client) getFirst(req Request) chan Response {
   r := make(chan Response)
   for _, replica := range c.Replicas {
     go func() {
       r <- replica.GetResponse(req)
     }()
   }
   return r
 }
 func (c *Client) Get(req Request) (Response, error) {
   r := c.getFirst(req)
   timeout := time.After(c.Timeout)
   for {
     select {
     case resp := <-r:
       return resp, nil
     case <-timeout:
       return nil, fmt.Errorf("timed out!")
     }
   }
 }

Interesting

GoCircuit - distributed goroutines and cross-machine channels (Erlang-style)

www.GoCircuit.org

Donut - library for building clustered services in Go, patterned after Ordasity

github.com/dforsyth/donut github.com/boundary/ordasity

NSQ - distributed pub/sub, used by bit.ly

word.bitly.com/post/33232969144/nsq github.com/bitly/nsq

FIN