change implementation of handler dispatching

This implementation has lockless handler lists, which reduces the time
during which the handler map lock must be held.  It doesn't unlink empty
lists from the map, but as long as the set of events to handle is fixed
and finite it's arguably better that way.
This commit is contained in:
Louis Bettens 2017-03-04 18:04:22 +01:00
parent 329a62d7d9
commit b294365fc1
No known key found for this signature in database
GPG Key ID: DCA09F2AF16E1A51
2 changed files with 134 additions and 73 deletions

View File

@ -39,110 +39,133 @@ func (hf HandlerFunc) Handle(conn *Conn, line *Line) {
hf(conn, line) hf(conn, line)
} }
// Handlers are organised using a map of linked-lists, with each map // Handlers are organised using a map of lockless singly linked lists, with
// key representing an IRC verb or numeric, and the linked list values // each map key representing an IRC verb or numeric, and the linked list
// being handlers that are executed in parallel when a Line from the // values being handlers that are executed in parallel when a Line from the
// server with that verb or numeric arrives. // server with that verb or numeric arrives.
type hSet struct { type hSet struct {
set map[string]*hList set map[string]*hList
sync.RWMutex sync.RWMutex
} }
type hList struct { func (hs *hSet) getList(ev string) (hl *hList, ok bool) {
start, end *hNode ev = strings.ToLower(ev)
hs.RLock()
defer hs.RUnlock()
hl, ok = hs.set[ev]
return
} }
// Storing the forward and backward links in the node allows O(1) removal. func (hs *hSet) getOrMakeList(ev string) (hl *hList) {
// This probably isn't strictly necessary but I think it's kinda nice. ev = strings.ToLower(ev)
type hNode struct { hs.Lock()
next, prev *hNode defer hs.Unlock()
set *hSet hl, ok := hs.set[ev]
event string if !ok {
handler Handler hl = makeHList()
hs.set[ev] = hl
}
return hl
} }
// Lists are lockless thanks to atomic pointers. (which hNodePtr wraps)
type hList struct {
first, last hNodePtr
}
// In order for the whole thing to be goroutine-safe, each list must contain a
// zero-valued node at any given time as its last element. You'll see why
// later down.
func makeHList() (hl *hList) {
hl, hn0 := &hList{}, &hNode{}
hl.first.store(hn0)
hl.last.store(hn0)
return
}
// (hNodeState is also an atomic wrapper.)
type hNode struct {
next hNodePtr
state hNodeState
handler Handler
}
// Nodes progress through these three stages in order as the program runs.
const (
unready hNodeState = iota
active
unlinkable
)
// A hNode implements both Handler (with configurable panic recovery)... // A hNode implements both Handler (with configurable panic recovery)...
func (hn *hNode) Handle(conn *Conn, line *Line) { func (hn *hNode) Handle(conn *Conn, line *Line) {
defer conn.cfg.Recover(conn, line) defer conn.cfg.Recover(conn, line)
hn.handler.Handle(conn, line) hn.handler.Handle(conn, line)
} }
// ... and Remover. // ... and Remover, which works by flagging the node so the goroutines running
// hSet.dispatch know to ignore its handler and to dispose of it.
func (hn *hNode) Remove() { func (hn *hNode) Remove() {
hn.set.remove(hn) hn.state.store(unlinkable)
} }
func handlerSet() *hSet { func handlerSet() *hSet {
return &hSet{set: make(map[string]*hList)} return &hSet{set: make(map[string]*hList)}
} }
// When a new Handler is added for an event, it is wrapped in a hNode and // When a new Handler is added for an event, it is assigned into a hNode,
// returned as a Remover so the caller can remove it at a later time. // which is returned as a Remover so the caller can remove it at a later time.
//
// Concerning goroutine-safety, the point is that the atomic swap there
// reserves the previous last node for this handler and puts up a new one.
// The former node has the desirable property that the rest of the list points
// to it, and the latter inherits this property once the former becomes part
// of the list. It's also the case that handler should't be read by
// hSet.dispatch before the node is marked as ready via state.
func (hs *hSet) add(ev string, h Handler) Remover { func (hs *hSet) add(ev string, h Handler) Remover {
hs.Lock() hl := hs.getOrMakeList(ev)
defer hs.Unlock() hn0 := &hNode{}
ev = strings.ToLower(ev) hn := hl.last.swap(hn0)
l, ok := hs.set[ev] hn.next.store(hn0)
if !ok { hn.handler = h
l = &hList{} hn.state.compareAndSwap(unready, active)
}
hn := &hNode{
set: hs,
event: ev,
handler: h,
}
if !ok {
l.start = hn
} else {
hn.prev = l.end
l.end.next = hn
}
l.end = hn
hs.set[ev] = l
return hn return hn
} }
func (hs *hSet) remove(hn *hNode) { // And finally, dispatch works like so: it goes through the whole list while
hs.Lock() // remembering the adress of the pointer that led it to the current node,
defer hs.Unlock() // which allows it to unlink it if it must be. Since the pointers are atomic,
l, ok := hs.set[hn.event] // if many goroutine enter the same unlinkable node at the same time, they
if !ok { // will all end up writing the same value to the pointer anyway. Even in
logging.Error("Removing node for unknown event '%s'", hn.event) // cases where consecutive nodes are flagged and unlinking node n revives node
return // n+1 which had been unlinked by making node n point to n+2 without the
} // unlinker of n+1 noticing, all dead nodes are unmistakable and will
if hn.next == nil { // eventually be definitely unlinked and garbage-collected. Also note that
l.end = hn.prev // the fact that the last node is always a zero node, as well as letting the
} else { // list grow concurrently, allows the next-to-last node to be unlinked safely.
hn.next.prev = hn.prev
}
if hn.prev == nil {
l.start = hn.next
} else {
hn.prev.next = hn.next
}
hn.next = nil
hn.prev = nil
hn.set = nil
if l.start == nil || l.end == nil {
delete(hs.set, hn.event)
}
}
func (hs *hSet) dispatch(conn *Conn, line *Line) { func (hs *hSet) dispatch(conn *Conn, line *Line) {
hs.RLock() hl, ok := hs.getList(line.Cmd)
defer hs.RUnlock()
ev := strings.ToLower(line.Cmd)
list, ok := hs.set[ev]
if !ok { if !ok {
return return // nothing to do
} }
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for hn := list.start; hn != nil; hn = hn.next { hn, hnptr := hl.first.load(), &hl.first
wg.Add(1) for hn != nil {
go func(hn *hNode) { switch hn.state.load() {
hn.Handle(conn, line.Copy()) case active:
wg.Done() wg.Add(1)
}(hn) go func(hn *hNode) {
hn.Handle(conn, line.Copy())
wg.Done()
}(hn)
fallthrough
case unready:
hnptr = &hn.next
hn = hnptr.load()
case unlinkable:
hn = hn.next.load()
hnptr.store(hn)
}
} }
wg.Wait() wg.Wait()
} }

38
client/dispatch_unsafe.go Normal file
View File

@ -0,0 +1,38 @@
package client
import (
"sync/atomic"
"unsafe"
)
type hNodePtr struct {
ptr unsafe.Pointer
}
func (p *hNodePtr) load() *hNode {
return (*hNode)(atomic.LoadPointer(&p.ptr))
}
func (p *hNodePtr) store(new *hNode) {
atomic.StorePointer(&p.ptr, unsafe.Pointer(new))
}
func (p *hNodePtr) swap(new *hNode) (old *hNode) {
return (*hNode)(atomic.SwapPointer(&p.ptr, unsafe.Pointer(new)))
}
func (p *hNodePtr) compareAndSwap(old, new *hNode) (swapped bool) {
return atomic.CompareAndSwapPointer(&p.ptr, unsafe.Pointer(old), unsafe.Pointer(new))
}
type hNodeState uintptr
func (s *hNodeState) load() hNodeState {
return hNodeState(atomic.LoadUintptr((*uintptr)(s)))
}
func (s *hNodeState) store(new hNodeState) {
atomic.StoreUintptr((*uintptr)(s), uintptr(new))
}
func (s *hNodeState) swap(new hNodeState) (old hNodeState) {
return hNodeState(atomic.SwapUintptr((*uintptr)(s), uintptr(new)))
}
func (s *hNodeState) compareAndSwap(old, new hNodeState) (swapped bool) {
return atomic.CompareAndSwapUintptr((*uintptr)(s), uintptr(old), uintptr(new))
}