Go forth and
Multiply
Concurrency and Communication in Go
Go makes concurrency simple
Supports parallelism, too...
They are not the same
Concurrency: "several independent activities, each of which executes at its own pace"1
Parallelism: Tasks actually happening simultaneously
Concurrency is a conceptual property of a program
Parallelism is a runtime detail
We'll talk mostly concurrency
...let the runtime deal with parallelism
Fork-Join
(aka Scatter/Gather)
Bunch of items to process
Idempotent work; can be done concurrently
func ForkJoin(work []*JobSpec) {
jobs := make(chan *JobSpec, len(work))
results := make(chan *JobResult)
//...
// we're going to work on up to NumCPU items at once
for i := 0; i < runtime.NumCPU(); i++ {
go func() {
// see how easy it is to iterate a channel?
for job := range jobs {
results <- ProcessJob(job)
}
}()
}
// load all work into the input channel
for _, spec := range work {
jobs <- spec
}
// indicate when to shutdown by closing channel
close(jobs)
// now iterate each work item and handle the result
for i := 0; i < len(work); i++ {
result := <-results
// handle result here...
}
}
func ForkJoin(work []*JobSpec) {
jobs := make(chan *JobSpec, len(work))
results := make(chan *JobResult)
for i := 0; i < runtime.NumCPU(); i++ {
go func() {
for job := range jobs {
results <- ProcessJob(job)
}
}()
}
for _, spec := range work { jobs <- spec }
close(jobs)
for i := 0; i < len(work); i++ {
result := <-results
// handle result here...
}
}
Multiple clients connecting over TCP
Sending requests concurrently
Expecting responses concurrently
type TCPService interface {
Addr() *net.TCPAddr
SetClientOptions(*net.TCPConn) error
Handle(*net.TCPConn)
Shutdown()
}
func RunTCP(t TCPService) {
l, err := net.ListenTCP("tcp", t.Addr())
if err != nil {
t.Shutdown()
return
}
defer l.Close()
for {
conn, err := l.AcceptTCP()
if err != nil { continue }
if err := t.SetClientOptions(conn); err != nil {
conn.Close()
continue
}
go t.Handle(conn)
}
}
type Verdict int
const (
Continue = iota
Terminate
)
type SMTPService struct {
cfg Config
addr *net.TCPAddr
exited chan int
draining bool
}
func NewSMTPService(c Config, exited chan int) *SMTPService {
return &SMTPService{
cfg: c,
addr: c.GetListenAddr(),
exited: exited,
draining: false,
}
}
func (s *SMTPService) Shutdown() {
s.draining = true
s.exited <- 1
}
func (s *SMTPService) Addr() *net.TCPAddr {
return s.addr
}
func (s *SMTPService) SetClientOptions(conn *net.TCPConn) error {
if err := conn.SetKeepAlive(false); err != nil {
return err
}
if err := conn.SetLinger(-1); err != nil {
return err
}
return nil
}
func (s *SMTPService) Handle(conn *net.TCPConn) {
defer conn.Close()
if s.draining {
conn.Write([]byte("421 Service not available\r\n"))
return
}
session := NewSMTPSession(conn, c.cfg)
if verdict := session.Greet(); verdict == Terminate {
return
}
for {
if verdict := session.Process(); verdict == Terminate {
return
}
}
}
func main() {
//...
exitChan := make(chan int)
signalChan := make(chan os.Signal, 1)
go func() {
<-signalChan
exitChan <- 1
}()
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
go RunTCP(NewSMTPService(cfg, exitChan))
<-exitChan
}
Multiple replicas from which to choose
Send request to all
Use first one to respond
Enforce timeout on all of them
type Replica interface {
GetResponse(Request) Response
}
type Client struct {
Replicas []Replica
Timeout time.Duration
}
func (c *Client) getFirst(req Request) chan Response {
r := make(chan Response)
for _, replica := range c.Replicas {
go func() {
r <- replica.GetResponse(req)
}()
}
return r
}
func (c *Client) Get(req Request) (Response, error) {
r := c.getFirst(req)
timeout := time.After(c.Timeout)
for {
select {
case resp := <-r:
return resp, nil
case <-timeout:
return nil, fmt.Errorf("timed out!")
}
}
}
GoCircuit - distributed goroutines and cross-machine channels (Erlang-style)
Donut - library for building clustered services in Go, patterned after Ordasity
NSQ - distributed pub/sub, used by bit.ly