From 0200b741dcef3886567edf72bfe56efb4458c70d Mon Sep 17 00:00:00 2001 From: Alex Bramley Date: Fri, 22 Jul 2011 01:08:42 +0100 Subject: [PATCH] Fix for issues/6 (2/2): Move to using control channels and select. --- client.go | 27 ++++++----- client/connection.go | 105 +++++++++++++++++++++++++++++-------------- 2 files changed, 88 insertions(+), 44 deletions(-) diff --git a/client.go b/client.go index 7e16004..d2040d1 100644 --- a/client.go +++ b/client.go @@ -15,6 +15,11 @@ func main() { c.AddHandler("connected", func(conn *irc.Conn, line *irc.Line) { conn.Join("#go-nuts") }) + // Set up a handler to notify of disconnect events. + quit := make(chan bool) + c.AddHandler("disconnected", + func(conn *irc.Conn, line *irc.Line) { quit <- true }) + // connect to server if err := c.Connect("irc.freenode.net"); err != nil { fmt.Printf("Connection error: %s\n", err) @@ -74,18 +79,18 @@ func main() { } }() - // stall here waiting for asplode on error channel - for { - for err := range c.Err { + for !reallyquit { + select { + case err := <-c.Err: fmt.Printf("goirc error: %s\n", err) - } - if reallyquit { - break - } - fmt.Println("Reconnecting...") - if err := c.Connect("irc.freenode.net"); err != nil { - fmt.Printf("Connection error: %s\n", err) - break + case <-quit: + if !reallyquit { + fmt.Println("Reconnecting...") + if err := c.Connect("irc.freenode.net"); err != nil { + fmt.Printf("Connection error: %s\n", err) + reallyquit = true + } + } } } } diff --git a/client/connection.go b/client/connection.go index 2e27798..81af227 100644 --- a/client/connection.go +++ b/client/connection.go @@ -10,6 +10,10 @@ import ( "time" ) +const ( + second = int64(1e9) +) + // An IRC connection is represented by this struct. Once connected, any errors // encountered are piped down *Conn.Err; this channel is closed on disconnect. type Conn struct { @@ -32,6 +36,9 @@ type Conn struct { out chan string connected bool + // Control channels to goroutines + cSend, cLoop chan bool + // Error channel to transmit any fail back to the user Err chan os.Error @@ -46,6 +53,9 @@ type Conn struct { // Set this to true to disable flood protection and false to re-enable Flood bool + // Internal counters for flood protection + badness, lastsent int64 + // Function which returns a *time.Time for use as a timestamp Timestamp func() *time.Time @@ -73,9 +83,14 @@ func New(nick, user, name string) *Conn { in: make(chan *Line, 32), out: make(chan string, 32), Err: make(chan os.Error, 4), + cSend: make(chan bool), + cLoop: make(chan bool), SSL: false, SSLConfig: nil, Timeout: 300, + Flood: false, + badness: 0, + lastsent: 0, Timestamp: time.LocalTime, TSFormat: "15:04:05", } @@ -159,38 +174,15 @@ func hasPort(s string) bool { return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") } -// dispatch input from channel as \r\n terminated line to peer -// flood controlled using hybrid's algorithm if conn.Flood is true +// goroutine to pass data from output channel to write() func (conn *Conn) send() { - lastsent := time.Nanoseconds() - var badness, linetime, second int64 = 0, 0, 1000000000 - for line := range conn.out { - // Hybrid's algorithm allows for 2 seconds per line and an additional - // 1/120 of a second per character on that line. - linetime = 2*second + int64(len(line))*second/120 - if !conn.Flood && conn.connected { - // No point in tallying up flood protection stuff until connected - if badness += linetime + lastsent - time.Nanoseconds(); badness < 0 { - // negative badness times are badness... - badness = int64(0) - } - } - lastsent = time.Nanoseconds() - - // If we've sent more than 10 second's worth of lines according to the - // calculation above, then we're at risk of "Excess Flood". - if badness > 10*second && !conn.Flood { - // so sleep for the current line's time value before sending it - time.Sleep(linetime) - } - if _, err := conn.io.WriteString(line + "\r\n"); err != nil { - conn.error("irc.send(): %s", err.String()) - conn.shutdown() - break - } - conn.io.Flush() - if conn.Debug { - fmt.Println(conn.Timestamp().Format(conn.TSFormat) + " -> " + line) + for { + select { + case line := <-conn.out: + conn.write(line) + case <-conn.cSend: + // strobe on control channel, bail out + return } } } @@ -248,15 +240,62 @@ func (conn *Conn) recv() { } } +// goroutine to dispatch events for lines received on input channel func (conn *Conn) runLoop() { - for line := range conn.in { - conn.dispatchEvent(line) + for { + select { + case line := <-conn.in: + conn.dispatchEvent(line) + case <-conn.cLoop: + // strobe on control channel, bail out + return + } + } +} + +// Write a \r\n terminated line of output to the connected server, +// using Hybrid's algorithm to rate limit if conn.Flood is false. +func (conn *Conn) write(line string) { + if !conn.Flood { + conn.rateLimit(int64(len(line))) + } + + if _, err := conn.io.WriteString(line + "\r\n"); err != nil { + conn.error("irc.send(): %s", err.String()) + conn.shutdown() + return + } + conn.io.Flush() + if conn.Debug { + fmt.Println(conn.Timestamp().Format(conn.TSFormat) + " -> " + line) + } +} + +// Implement Hybrid's flood control algorithm to rate-limit outgoing lines. +func (conn *Conn) rateLimit(chars int64) { + // Hybrid's algorithm allows for 2 seconds per line and an additional + // 1/120 of a second per character on that line. + linetime := 2*second + chars*second/120 + elapsed := time.Nanoseconds() - conn.lastsent + if conn.badness += linetime - elapsed; conn.badness < 0 { + // negative badness times are badness... + conn.badness = int64(0) + } + conn.lastsent = time.Nanoseconds() + // If we've sent more than 10 second's worth of lines according to the + // calculation above, then we're at risk of "Excess Flood". + if conn.badness > 10*second && !conn.Flood { + // so sleep for the current line's time value before sending it + time.Sleep(linetime) } } func (conn *Conn) shutdown() { conn.connected = false conn.sock.Close() + conn.cSend <- true + conn.cLoop <- true + conn.dispatchEvent(&Line{Cmd: "DISCONNECTED"}) // reinit datastructures ready for next connection // do this here rather than after runLoop()'s for due to race conn.initialise()