Fix for issues/6 (2/2): Move to using control channels and select.

This commit is contained in:
Alex Bramley 2011-07-22 01:08:42 +01:00
parent eb51558009
commit 0200b741dc
2 changed files with 88 additions and 44 deletions

View File

@ -15,6 +15,11 @@ func main() {
c.AddHandler("connected", c.AddHandler("connected",
func(conn *irc.Conn, line *irc.Line) { conn.Join("#go-nuts") }) func(conn *irc.Conn, line *irc.Line) { conn.Join("#go-nuts") })
// Set up a handler to notify of disconnect events.
quit := make(chan bool)
c.AddHandler("disconnected",
func(conn *irc.Conn, line *irc.Line) { quit <- true })
// connect to server // connect to server
if err := c.Connect("irc.freenode.net"); err != nil { if err := c.Connect("irc.freenode.net"); err != nil {
fmt.Printf("Connection error: %s\n", err) fmt.Printf("Connection error: %s\n", err)
@ -74,18 +79,18 @@ func main() {
} }
}() }()
// stall here waiting for asplode on error channel for !reallyquit {
for { select {
for err := range c.Err { case err := <-c.Err:
fmt.Printf("goirc error: %s\n", err) fmt.Printf("goirc error: %s\n", err)
} case <-quit:
if reallyquit { if !reallyquit {
break fmt.Println("Reconnecting...")
} if err := c.Connect("irc.freenode.net"); err != nil {
fmt.Println("Reconnecting...") fmt.Printf("Connection error: %s\n", err)
if err := c.Connect("irc.freenode.net"); err != nil { reallyquit = true
fmt.Printf("Connection error: %s\n", err) }
break }
} }
} }
} }

View File

@ -10,6 +10,10 @@ import (
"time" "time"
) )
const (
second = int64(1e9)
)
// An IRC connection is represented by this struct. Once connected, any errors // An IRC connection is represented by this struct. Once connected, any errors
// encountered are piped down *Conn.Err; this channel is closed on disconnect. // encountered are piped down *Conn.Err; this channel is closed on disconnect.
type Conn struct { type Conn struct {
@ -32,6 +36,9 @@ type Conn struct {
out chan string out chan string
connected bool connected bool
// Control channels to goroutines
cSend, cLoop chan bool
// Error channel to transmit any fail back to the user // Error channel to transmit any fail back to the user
Err chan os.Error Err chan os.Error
@ -46,6 +53,9 @@ type Conn struct {
// Set this to true to disable flood protection and false to re-enable // Set this to true to disable flood protection and false to re-enable
Flood bool Flood bool
// Internal counters for flood protection
badness, lastsent int64
// Function which returns a *time.Time for use as a timestamp // Function which returns a *time.Time for use as a timestamp
Timestamp func() *time.Time Timestamp func() *time.Time
@ -73,9 +83,14 @@ func New(nick, user, name string) *Conn {
in: make(chan *Line, 32), in: make(chan *Line, 32),
out: make(chan string, 32), out: make(chan string, 32),
Err: make(chan os.Error, 4), Err: make(chan os.Error, 4),
cSend: make(chan bool),
cLoop: make(chan bool),
SSL: false, SSL: false,
SSLConfig: nil, SSLConfig: nil,
Timeout: 300, Timeout: 300,
Flood: false,
badness: 0,
lastsent: 0,
Timestamp: time.LocalTime, Timestamp: time.LocalTime,
TSFormat: "15:04:05", TSFormat: "15:04:05",
} }
@ -159,38 +174,15 @@ func hasPort(s string) bool {
return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") return strings.LastIndex(s, ":") > strings.LastIndex(s, "]")
} }
// dispatch input from channel as \r\n terminated line to peer // goroutine to pass data from output channel to write()
// flood controlled using hybrid's algorithm if conn.Flood is true
func (conn *Conn) send() { func (conn *Conn) send() {
lastsent := time.Nanoseconds() for {
var badness, linetime, second int64 = 0, 0, 1000000000 select {
for line := range conn.out { case line := <-conn.out:
// Hybrid's algorithm allows for 2 seconds per line and an additional conn.write(line)
// 1/120 of a second per character on that line. case <-conn.cSend:
linetime = 2*second + int64(len(line))*second/120 // strobe on control channel, bail out
if !conn.Flood && conn.connected { return
// No point in tallying up flood protection stuff until connected
if badness += linetime + lastsent - time.Nanoseconds(); badness < 0 {
// negative badness times are badness...
badness = int64(0)
}
}
lastsent = time.Nanoseconds()
// If we've sent more than 10 second's worth of lines according to the
// calculation above, then we're at risk of "Excess Flood".
if badness > 10*second && !conn.Flood {
// so sleep for the current line's time value before sending it
time.Sleep(linetime)
}
if _, err := conn.io.WriteString(line + "\r\n"); err != nil {
conn.error("irc.send(): %s", err.String())
conn.shutdown()
break
}
conn.io.Flush()
if conn.Debug {
fmt.Println(conn.Timestamp().Format(conn.TSFormat) + " -> " + line)
} }
} }
} }
@ -248,15 +240,62 @@ func (conn *Conn) recv() {
} }
} }
// goroutine to dispatch events for lines received on input channel
func (conn *Conn) runLoop() { func (conn *Conn) runLoop() {
for line := range conn.in { for {
conn.dispatchEvent(line) select {
case line := <-conn.in:
conn.dispatchEvent(line)
case <-conn.cLoop:
// strobe on control channel, bail out
return
}
}
}
// Write a \r\n terminated line of output to the connected server,
// using Hybrid's algorithm to rate limit if conn.Flood is false.
func (conn *Conn) write(line string) {
if !conn.Flood {
conn.rateLimit(int64(len(line)))
}
if _, err := conn.io.WriteString(line + "\r\n"); err != nil {
conn.error("irc.send(): %s", err.String())
conn.shutdown()
return
}
conn.io.Flush()
if conn.Debug {
fmt.Println(conn.Timestamp().Format(conn.TSFormat) + " -> " + line)
}
}
// Implement Hybrid's flood control algorithm to rate-limit outgoing lines.
func (conn *Conn) rateLimit(chars int64) {
// Hybrid's algorithm allows for 2 seconds per line and an additional
// 1/120 of a second per character on that line.
linetime := 2*second + chars*second/120
elapsed := time.Nanoseconds() - conn.lastsent
if conn.badness += linetime - elapsed; conn.badness < 0 {
// negative badness times are badness...
conn.badness = int64(0)
}
conn.lastsent = time.Nanoseconds()
// If we've sent more than 10 second's worth of lines according to the
// calculation above, then we're at risk of "Excess Flood".
if conn.badness > 10*second && !conn.Flood {
// so sleep for the current line's time value before sending it
time.Sleep(linetime)
} }
} }
func (conn *Conn) shutdown() { func (conn *Conn) shutdown() {
conn.connected = false conn.connected = false
conn.sock.Close() conn.sock.Close()
conn.cSend <- true
conn.cLoop <- true
conn.dispatchEvent(&Line{Cmd: "DISCONNECTED"})
// reinit datastructures ready for next connection // reinit datastructures ready for next connection
// do this here rather than after runLoop()'s for due to race // do this here rather than after runLoop()'s for due to race
conn.initialise() conn.initialise()