Set a maximum duration for fg handlers to run

This commit is contained in:
James McGuire 2017-05-31 14:35:14 -07:00
parent 329a62d7d9
commit f72772b72a
2 changed files with 59 additions and 14 deletions

View File

@ -110,6 +110,10 @@ type Config struct {
// Split PRIVMSGs, NOTICEs and CTCPs longer than SplitLen characters // Split PRIVMSGs, NOTICEs and CTCPs longer than SplitLen characters
// over multiple lines. Default to 450 if not set. // over multiple lines. Default to 450 if not set.
SplitLen int SplitLen int
// The duration before a handler timeout is triggered. Defaults to 1m.
// Set to 0 to wait indefinitely.
HandlerTimeout time.Duration
} }
// NewConfig creates a Config struct containing sensible defaults. // NewConfig creates a Config struct containing sensible defaults.
@ -118,12 +122,13 @@ type Config struct {
// name, but these are optional. // name, but these are optional.
func NewConfig(nick string, args ...string) *Config { func NewConfig(nick string, args ...string) *Config {
cfg := &Config{ cfg := &Config{
Me: &state.Nick{Nick: nick}, Me: &state.Nick{Nick: nick},
PingFreq: 3 * time.Minute, PingFreq: 2 * time.Minute,
NewNick: func(s string) string { return s + "_" }, NewNick: func(s string) string { return s + "_" },
Recover: (*Conn).LogPanic, // in dispatch.go Recover: (*Conn).LogPanic, // in dispatch.go
SplitLen: defaultSplit, SplitLen: defaultSplit,
Timeout: 60 * time.Second, Timeout: 60 * time.Second,
HandlerTimeout: 60 * time.Second,
} }
cfg.Me.Ident = "goirc" cfg.Me.Ident = "goirc"
if len(args) > 0 && args[0] != "" { if len(args) > 0 && args[0] != "" {
@ -458,13 +463,19 @@ func (conn *Conn) ping() {
// It pulls Lines from the input channel and dispatches them to any // It pulls Lines from the input channel and dispatches them to any
// handlers that have been registered for that IRC verb. // handlers that have been registered for that IRC verb.
func (conn *Conn) runLoop() { func (conn *Conn) runLoop() {
defer conn.wg.Done()
for { for {
select { select {
case line := <-conn.in: case line := <-conn.in:
conn.dispatch(line) err := conn.dispatch(line)
if err != nil {
// Close() will wait on this, can't defer it.
conn.wg.Done()
conn.Close()
return
}
case <-conn.die: case <-conn.die:
// control channel closed, bail out // control channel closed, bail out
conn.wg.Done()
return return
} }
} }

View File

@ -1,10 +1,13 @@
package client package client
import ( import (
"errors"
"github.com/fluffle/goirc/logging" "github.com/fluffle/goirc/logging"
"reflect"
"runtime" "runtime"
"strings" "strings"
"sync" "sync"
"time"
) )
// Handlers are triggered on incoming Lines from the server, with the handler // Handlers are triggered on incoming Lines from the server, with the handler
@ -59,6 +62,7 @@ type hNode struct {
set *hSet set *hSet
event string event string
handler Handler handler Handler
name string
} }
// A hNode implements both Handler (with configurable panic recovery)... // A hNode implements both Handler (with configurable panic recovery)...
@ -90,6 +94,7 @@ func (hs *hSet) add(ev string, h Handler) Remover {
set: hs, set: hs,
event: ev, event: ev,
handler: h, handler: h,
name: runtime.FuncForPC(reflect.ValueOf(h).Pointer()).Name(),
} }
if !ok { if !ok {
l.start = hn l.start = hn
@ -128,23 +133,45 @@ func (hs *hSet) remove(hn *hNode) {
} }
} }
func (hs *hSet) dispatch(conn *Conn, line *Line) { func (hs *hSet) dispatch(conn *Conn, line *Line) error {
hs.RLock() hs.RLock()
defer hs.RUnlock() defer hs.RUnlock()
ev := strings.ToLower(line.Cmd) ev := strings.ToLower(line.Cmd)
list, ok := hs.set[ev] list, ok := hs.set[ev]
if !ok { if !ok {
return return nil
} }
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for hn := list.start; hn != nil; hn = hn.next { for hn := list.start; hn != nil; hn = hn.next {
wg.Add(1) wg.Add(1)
go func(hn *hNode) { go func(hn *hNode) {
logging.Debug("Starting %s handler for event %s", hn.name, line.Cmd)
hn.Handle(conn, line.Copy()) hn.Handle(conn, line.Copy())
logging.Debug("Finished %s handler for event %s", hn.name, line.Cmd)
wg.Done() wg.Done()
}(hn) }(hn)
} }
wg.Wait()
// If we don't care about how long handlers run, wait and bail out early
if conn.cfg.HandlerTimeout == 0 {
wg.Wait()
return nil
}
// Limit the amount of time we wait.
endChan := make(chan struct{})
go func() {
defer close(endChan)
wg.Wait()
}()
select {
case <-endChan:
return nil
case <-time.After(conn.cfg.HandlerTimeout):
msg := "Timeout waiting for handlers to complete"
logging.Error(msg)
return errors.New(msg)
}
} }
// Handle adds the provided handler to the foreground set for the named event. // Handle adds the provided handler to the foreground set for the named event.
@ -171,13 +198,20 @@ func (conn *Conn) HandleFunc(name string, hf HandlerFunc) Remover {
return conn.Handle(name, hf) return conn.Handle(name, hf)
} }
func (conn *Conn) dispatch(line *Line) { func (conn *Conn) dispatch(line *Line) error {
// We run the internal handlers first, including all state tracking ones. // We run the internal handlers first, including all state tracking ones.
// This ensures that user-supplied handlers that use the tracker have a // This ensures that user-supplied handlers that use the tracker have a
// consistent view of the connection state in handlers that mutate it. // consistent view of the connection state in handlers that mutate it.
conn.intHandlers.dispatch(conn, line) err := conn.intHandlers.dispatch(conn, line)
if err != nil {
return err
}
go conn.bgHandlers.dispatch(conn, line) go conn.bgHandlers.dispatch(conn, line)
conn.fgHandlers.dispatch(conn, line) err = conn.fgHandlers.dispatch(conn, line)
if err != nil {
return err
}
return nil
} }
// LogPanic is used as the default panic catcher for the client. If, like me, // LogPanic is used as the default panic catcher for the client. If, like me,