Use a channel close to kill goroutines.

This tidies things up nicely, especially in some of the tests.
This commit is contained in:
Alex Bramley 2013-04-08 18:02:34 +01:00
parent de94609749
commit 8ae6733942
2 changed files with 52 additions and 88 deletions

View File

@ -35,8 +35,8 @@ type Conn struct {
out chan string out chan string
connected bool connected bool
// Control channels to goroutines // Control channel to goroutines
cSend, cLoop, cPing chan bool die chan struct{}
// Internal counters for flood protection // Internal counters for flood protection
badness time.Duration badness time.Duration
@ -115,9 +115,6 @@ func Client(cfg *Config) *Conn {
cfg: cfg, cfg: cfg,
in: make(chan *Line, 32), in: make(chan *Line, 32),
out: make(chan string, 32), out: make(chan string, 32),
cSend: make(chan bool),
cLoop: make(chan bool),
cPing: make(chan bool),
handlers: handlerSet(), handlers: handlerSet(),
stRemovers: make([]Remover, 0, len(stHandlers)), stRemovers: make([]Remover, 0, len(stHandlers)),
lastsent: time.Now(), lastsent: time.Now(),
@ -166,6 +163,7 @@ func (conn *Conn) DisableStateTracking() {
func (conn *Conn) initialise() { func (conn *Conn) initialise() {
conn.io = nil conn.io = nil
conn.sock = nil conn.sock = nil
conn.die = make(chan struct{})
if conn.st != nil { if conn.st != nil {
conn.st.Wipe() conn.st.Wipe()
} }
@ -216,25 +214,24 @@ func (conn *Conn) Connect() error {
} }
} }
conn.connected = true conn.connected = true
conn.postConnect() conn.postConnect(true)
conn.dispatch(&Line{Cmd: REGISTER}) conn.dispatch(&Line{Cmd: REGISTER})
return nil return nil
} }
// Post-connection setup (for ease of testing) // Post-connection setup (for ease of testing)
func (conn *Conn) postConnect() { func (conn *Conn) postConnect(start bool) {
conn.io = bufio.NewReadWriter( conn.io = bufio.NewReadWriter(
bufio.NewReader(conn.sock), bufio.NewReader(conn.sock),
bufio.NewWriter(conn.sock)) bufio.NewWriter(conn.sock))
go conn.send() if start {
go conn.recv() go conn.send()
if conn.cfg.PingFreq > 0 { go conn.recv()
go conn.ping() go conn.runLoop()
} else { if conn.cfg.PingFreq > 0 {
// Otherwise the send in shutdown will hang :-/ go conn.ping()
go func() { <-conn.cPing }() }
} }
go conn.runLoop()
} }
// copied from http.client for great justice // copied from http.client for great justice
@ -248,8 +245,8 @@ func (conn *Conn) send() {
select { select {
case line := <-conn.out: case line := <-conn.out:
conn.write(line) conn.write(line)
case <-conn.cSend: case <-conn.die:
// strobe on control channel, bail out // control channel closed, bail out
return return
} }
} }
@ -285,7 +282,8 @@ func (conn *Conn) ping() {
select { select {
case <-tick.C: case <-tick.C:
conn.Ping(fmt.Sprintf("%d", time.Now().UnixNano())) conn.Ping(fmt.Sprintf("%d", time.Now().UnixNano()))
case <-conn.cPing: case <-conn.die:
// control channel closed, bail out
tick.Stop() tick.Stop()
return return
} }
@ -298,8 +296,8 @@ func (conn *Conn) runLoop() {
select { select {
case line := <-conn.in: case line := <-conn.in:
conn.dispatch(line) conn.dispatch(line)
case <-conn.cLoop: case <-conn.die:
// strobe on control channel, bail out // control channel closed, bail out
return return
} }
} }
@ -361,9 +359,7 @@ func (conn *Conn) shutdown() {
conn.dispatch(&Line{Cmd: DISCONNECTED}) conn.dispatch(&Line{Cmd: DISCONNECTED})
conn.connected = false conn.connected = false
conn.sock.Close() conn.sock.Close()
conn.cSend <- true close(conn.die)
conn.cLoop <- true
conn.cPing <- true
// reinit datastructures ready for next connection // reinit datastructures ready for next connection
// do this here rather than after runLoop()'s for due to race // do this here rather than after runLoop()'s for due to race
conn.initialise() conn.initialise()

View File

@ -1,7 +1,6 @@
package client package client
import ( import (
"bufio"
"code.google.com/p/gomock/gomock" "code.google.com/p/gomock/gomock"
"github.com/fluffle/goirc/state" "github.com/fluffle/goirc/state"
"github.com/fluffle/golog/logging" "github.com/fluffle/golog/logging"
@ -29,13 +28,11 @@ func setUp(t *testing.T, start ...bool) (*Conn, *testState) {
c.sock = nc c.sock = nc
c.cfg.Flood = true // Tests can take a while otherwise c.cfg.Flood = true // Tests can take a while otherwise
c.connected = true c.connected = true
if len(start) == 0 { // If a second argument is passed to setUp, we tell postConnect not to
// Hack to allow tests of send, recv, write etc. // start the various goroutines that shuttle data around.
// NOTE: the value of the boolean doesn't matter. c.postConnect(len(start) == 0)
c.postConnect() // Sleep 1ms to allow background routines to start.
// Sleep 1ms to allow background routines to start. <-time.After(1e6)
<-time.After(1e6)
}
return c, &testState{ctrl, st, nc, c} return c, &testState{ctrl, st, nc, c}
} }
@ -141,16 +138,9 @@ func TestClientAndStateTracking(t *testing.T) {
} }
func TestSend(t *testing.T) { func TestSend(t *testing.T) {
// Passing a second value to setUp inhibits postConnect() // Passing a second value to setUp stops goroutines from starting
c, s := setUp(t, false) c, s := setUp(t, false)
// We can't use tearDown here, as it will cause a deadlock in shutdown() defer s.tearDown()
// trying to send kill messages down channels to nonexistent goroutines.
defer s.ctrl.Finish()
// ... so we have to do some of it's work here.
c.io = bufio.NewReadWriter(
bufio.NewReader(c.sock),
bufio.NewWriter(c.sock))
// Assert that before send is running, nothing should be sent to the socket // Assert that before send is running, nothing should be sent to the socket
// but writes to the buffered channel "out" should not block. // but writes to the buffered channel "out" should not block.
@ -177,7 +167,11 @@ func TestSend(t *testing.T) {
if exited { if exited {
t.Errorf("Exited before signal sent.") t.Errorf("Exited before signal sent.")
} }
c.cSend <- true // This sneakily uses the fact that the other two goroutines that would
// normally be waiting for die to close are not running, so we only send
// to the goroutine started above. Normally shutdown() closes c.die and
// signals to all three goroutines (send, ping, runLoop) to exit.
c.die <- struct{}{}
// Allow propagation time... // Allow propagation time...
<-time.After(1e6) <-time.After(1e6)
if !exited { if !exited {
@ -191,17 +185,12 @@ func TestSend(t *testing.T) {
} }
func TestRecv(t *testing.T) { func TestRecv(t *testing.T) {
// Passing a second value to setUp inhibits postConnect() // Passing a second value to setUp stops goroutines from starting
c, s := setUp(t, false) c, s := setUp(t, false)
// We can't tearDown here as we need to explicitly test recv exiting. // We can't use tearDown here because we're testing shutdown conditions
// The same shutdown() caveat in TestSend above also applies. // (and so need to EXPECT() a call to st.Wipe() in the right place)
defer s.ctrl.Finish() defer s.ctrl.Finish()
// ... so we have to do some of it's work here.
c.io = bufio.NewReadWriter(
bufio.NewReader(c.sock),
bufio.NewWriter(c.sock))
// Send a line before recv is started up, to verify nothing appears on c.in // Send a line before recv is started up, to verify nothing appears on c.in
s.nc.Send(":irc.server.org 001 test :First test line.") s.nc.Send(":irc.server.org 001 test :First test line.")
@ -254,11 +243,6 @@ func TestRecv(t *testing.T) {
s.st.EXPECT().Wipe() s.st.EXPECT().Wipe()
s.nc.Close() s.nc.Close()
// Since send and runloop aren't actually running, we need to empty their
// channels manually for recv() to be able to call shutdown correctly.
<-c.cSend
<-c.cLoop
<-c.cPing
// Give things time to shake themselves out... // Give things time to shake themselves out...
<-time.After(time.Millisecond) <-time.After(time.Millisecond)
if !exited { if !exited {
@ -272,11 +256,9 @@ func TestRecv(t *testing.T) {
} }
func TestPing(t *testing.T) { func TestPing(t *testing.T) {
// Passing a second value to setUp inhibits postConnect() // Passing a second value to setUp stops goroutines from starting
c, s := setUp(t, false) c, s := setUp(t, false)
// We can't use tearDown here, as it will cause a deadlock in shutdown() defer s.tearDown()
// trying to send kill messages down channels to nonexistent goroutines.
defer s.ctrl.Finish()
// Set a low ping frequency for testing. // Set a low ping frequency for testing.
c.cfg.PingFreq = 50 * time.Millisecond c.cfg.PingFreq = 50 * time.Millisecond
@ -329,8 +311,11 @@ func TestPing(t *testing.T) {
if exited { if exited {
t.Errorf("Exited before signal sent.") t.Errorf("Exited before signal sent.")
} }
// This sneakily uses the fact that the other two goroutines that would
c.cPing <- true // normally be waiting for die to close are not running, so we only send
// to the goroutine started above. Normally shutdown() closes c.die and
// signals to all three goroutines (send, ping, runLoop) to exit.
c.die <- struct{}{}
// Make sure we're no longer pinging by waiting ~2x PingFreq // Make sure we're no longer pinging by waiting ~2x PingFreq
<-time.After(105 * time.Millisecond) <-time.After(105 * time.Millisecond)
if s := reader(); s != "" { if s := reader(); s != "" {
@ -343,16 +328,9 @@ func TestPing(t *testing.T) {
} }
func TestRunLoop(t *testing.T) { func TestRunLoop(t *testing.T) {
// Passing a second value to setUp inhibits postConnect() // Passing a second value to setUp stops goroutines from starting
c, s := setUp(t, false) c, s := setUp(t, false)
// We can't use tearDown here, as it will cause a deadlock in shutdown() defer s.tearDown()
// trying to send kill messages down channels to nonexistent goroutines.
defer s.ctrl.Finish()
// ... so we have to do some of it's work here.
c.io = bufio.NewReadWriter(
bufio.NewReader(c.sock),
bufio.NewWriter(c.sock))
// Set up a handler to detect whether 001 handler is called // Set up a handler to detect whether 001 handler is called
h001 := false h001 := false
@ -399,7 +377,11 @@ func TestRunLoop(t *testing.T) {
if exited { if exited {
t.Errorf("Exited before signal sent.") t.Errorf("Exited before signal sent.")
} }
c.cLoop <- true // This sneakily uses the fact that the other two goroutines that would
// normally be waiting for die to close are not running, so we only send
// to the goroutine started above. Normally shutdown() closes c.die and
// signals to all three goroutines (send, ping, runLoop) to exit.
c.die <- struct{}{}
// Allow propagation time... // Allow propagation time...
<-time.After(time.Millisecond) <-time.After(time.Millisecond)
if !exited { if !exited {
@ -415,17 +397,12 @@ func TestRunLoop(t *testing.T) {
} }
func TestWrite(t *testing.T) { func TestWrite(t *testing.T) {
// Passing a second value to setUp inhibits postConnect() // Passing a second value to setUp stops goroutines from starting
c, s := setUp(t, false) c, s := setUp(t, false)
// We can't use tearDown here, as it will cause a deadlock in shutdown() // We can't use tearDown here because we're testing shutdown conditions
// trying to send kill messages down channels to nonexistent goroutines. // (and so need to EXPECT() a call to st.Wipe() in the right place)
defer s.ctrl.Finish() defer s.ctrl.Finish()
// ... so we have to do some of it's work here.
c.io = bufio.NewReadWriter(
bufio.NewReader(c.sock),
bufio.NewWriter(c.sock))
// Write should just write a line to the socket. // Write should just write a line to the socket.
c.write("yo momma") c.write("yo momma")
s.nc.Expect("yo momma") s.nc.Expect("yo momma")
@ -446,17 +423,8 @@ func TestWrite(t *testing.T) {
} }
// Finally, test the error state by closing the socket then writing. // Finally, test the error state by closing the socket then writing.
// This little function makes sure that all the blocking channels that are
// written to during the course of s.nc.Close() and c.write() are read from
// again, to prevent deadlocks when these are both called synchronously.
// XXX: This may well be a horrible hack.
go func() {
<-c.cSend
<-c.cLoop
<-c.cPing
}()
s.nc.Close()
s.st.EXPECT().Wipe() s.st.EXPECT().Wipe()
s.nc.Close()
c.write("she can't pass unit tests") c.write("she can't pass unit tests")
} }