diff --git a/cluster.go b/cluster.go index d13ca36..76bdb2b 100644 --- a/cluster.go +++ b/cluster.go @@ -6,6 +6,7 @@ import ( "strings" "code.dnix.de/an/irc" + "code.dnix.de/an/xlog" "github.com/nats-io/nats" ) @@ -24,7 +25,7 @@ func NewClusterConnector(servers string, ssl bool) *ClusterConnector { opts.Secure = ssl conn, err := opts.Connect() if err != nil { - // foo + xlog.Error(err.Error()) } subs := make(map[string]*nats.Subscription) return &ClusterConnector{conn: conn, subs: subs} @@ -39,9 +40,10 @@ func (cc *ClusterConnector) Subscribe(subj string, ch chan *irc.Message) { ch <- m }) if err != nil { - cc.subs[subj] = sub + xlog.Error(err.Error()) + return } - + cc.subs[subj] = sub } func (cc *ClusterConnector) Unsubscribe(subj string) { @@ -52,6 +54,6 @@ func (cc *ClusterConnector) Unsubscribe(subj string) { } func (cc *ClusterConnector) Publish(msg *irc.Message) { - subj := strings.ToLower(msg.Pre) + subj := strings.ToLower(msg.Args[0]) cc.conn.Publish(subj, []byte(msg.String())) } diff --git a/handlers.go b/handlers.go index c8e35b7..f9a25b6 100644 --- a/handlers.go +++ b/handlers.go @@ -90,6 +90,7 @@ func handleCmdJoin(sv *Server, msg *irc.Message) { if !sv.localOrigin(msg) { return } + sv.cluster.Subscribe(chid, sv.remoteq) sv.sendReply(msg.Pre, RPL_TOPIC, msg.Args[0], sv.chTopics[msg.Args[0]]) sv.channelNames(msg.Pre, msg.Args[0]) m, isoper := sv.opers[clid] @@ -124,17 +125,7 @@ func handleCmdPart(sv *Server, msg *irc.Message) { return } sv.sendMsg(msg) - delete(sv.chUsers[chid], clid) - //delete(sv.chModes[chid], "q "+clid) - //delete(sv.chModes[chid], "a "+clid) - delete(sv.chModes[chid], "o "+clid) - delete(sv.chModes[chid], "h "+clid) - delete(sv.chModes[chid], "v "+clid) - if len(sv.chUsers[chid]) == 0 { - delete(sv.chUsers, chid) - delete(sv.chTopics, chid) - delete(sv.chModes, chid) - } + sv.channelRemoveUser(chid, clid) } func handleCmdQuit(sv *Server, msg *irc.Message) { @@ -233,12 +224,7 @@ func handleCmdKick(sv *Server, msg *irc.Message) { sv.sendReply(clid, ERR_CHANOPRIVSNEEDED, chid, "You're not channel operator") } sv.sendMsg(msg) - delete(sv.chUsers[chid], target) - //delete(sv.chModes[chid], "q "+target) - //delete(sv.chModes[chid], "a "+target) - delete(sv.chModes[chid], "o "+target) - delete(sv.chModes[chid], "h "+target) - delete(sv.chModes[chid], "v "+target) + sv.channelRemoveUser(chid, target) } func handleCmdNames(sv *Server, msg *irc.Message) { diff --git a/monitoring.go b/monitoring.go index 0d99aa4..a32a506 100644 --- a/monitoring.go +++ b/monitoring.go @@ -4,19 +4,26 @@ package ircd import ( "net/http" + "runtime" "time" "github.com/prometheus/client_golang/prometheus" ) var ( + gaugeGoroutinesRunning prometheus.Gauge gaugePacketsTransferred prometheus.Gauge gaugeConnectionsCurrent prometheus.Gauge gaugeConnectionsCount prometheus.Gauge - gaugeQueueLen prometheus.Gauge + gaugeLocalQueueLen prometheus.Gauge + gaugeRemoteQueueLen prometheus.Gauge ) func monitoringRun(sv *Server) { + gaugeGoroutinesRunning = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "ircd_goroutines_running", + Help: "Goroutines runnning", + }) gaugePacketsTransferred = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "ircd_packets_transferred", Help: "Packets handled", @@ -29,14 +36,20 @@ func monitoringRun(sv *Server) { Name: "ircd_connections_count", Help: "Client connections", }) - gaugeQueueLen = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "ircd_queue_len", - Help: "Unhandled msgs in dispatcher queue", + gaugeLocalQueueLen = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "ircd_local_queue_len", + Help: "Unhandled msgs in dispatcher local queue", }) + gaugeRemoteQueueLen = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "ircd_remote_queue_len", + Help: "Unhandled msgs in dispatcher remote queue", + }) + prometheus.MustRegister(gaugeGoroutinesRunning) prometheus.MustRegister(gaugePacketsTransferred) prometheus.MustRegister(gaugeConnectionsCurrent) prometheus.MustRegister(gaugeConnectionsCount) - prometheus.MustRegister(gaugeQueueLen) + prometheus.MustRegister(gaugeLocalQueueLen) + prometheus.MustRegister(gaugeRemoteQueueLen) go monitoringUpdater(sv) http.Handle("/metrics", prometheus.Handler()) laddr, _ := sv.config.GetString("net", "listen_prom") @@ -46,9 +59,11 @@ func monitoringRun(sv *Server) { func monitoringUpdater(sv *Server) { for { time.Sleep(5 * time.Second) + gaugeGoroutinesRunning.Set(float64(runtime.NumGoroutine())) gaugePacketsTransferred.Set(sv.packetsTransferred) - gaugeConnectionsCurrent.Set(sv.connectionsCurrent) + gaugeConnectionsCurrent.Set(float64(len(sv.clients))) gaugeConnectionsCount.Set(sv.connectionsCount) - gaugeQueueLen.Set(sv.queueLen) + gaugeLocalQueueLen.Set(float64(len(sv.localq))) + gaugeRemoteQueueLen.Set(float64(len(sv.remoteq))) } } diff --git a/server.go b/server.go index 321e895..f3086e2 100644 --- a/server.go +++ b/server.go @@ -8,7 +8,6 @@ import ( "fmt" "net" "os" - "runtime" "strings" "time" @@ -25,13 +24,14 @@ const ( var myinfo string = "%s %s/%s * *" -//var isupport string = "CASEMAPPING=rfc1459 CHANTYPES=# NICKLEN=32 MODES=1 PREFIX=(qaohv)~&@%+" var isupport string = "CASEMAPPING=rfc1459 CHANTYPES=# NICKLEN=32 MODES=1 PREFIX=(ohv)@%+" type Server struct { - queue chan *irc.Message - addq chan Client - delq chan Client + localq chan *irc.Message + remoteq chan *irc.Message + + addq chan Client + delq chan Client host string info string @@ -40,6 +40,8 @@ type Server struct { created string motd string + cluster *ClusterConnector + opers map[string]string clients map[string]Client @@ -53,19 +55,18 @@ type Server struct { configPath string packetsTransferred float64 - connectionsCurrent float64 connectionsCount float64 - queueLen float64 authCallback func(name, pass string) bool } -// Create a new server instance. func NewServer(configPath, software, version string) *Server { sv := &Server{software: software, version: version, created: time.Now().String()} - sv.queue = make(chan *irc.Message, 1024) + sv.localq = make(chan *irc.Message, 65536) + sv.remoteq = make(chan *irc.Message, 65536) + sv.addq = make(chan Client, 128) sv.delq = make(chan Client, 128) @@ -88,6 +89,9 @@ func NewServer(configPath, software, version string) *Server { sv.info, _ = sv.config.GetString("server", "info") sv.motd, _ = sv.config.GetString("server", "motd") + urls, _ := sv.config.GetString("cluster", "urls") + sv.cluster = NewClusterConnector(urls, false) + nicks, _ := sv.config.GetString("control", "a") for _, nick := range strings.Split(nicks, ",") { sv.opers[nick] = "a" @@ -98,9 +102,7 @@ func NewServer(configPath, software, version string) *Server { } sv.packetsTransferred = 0 - sv.connectionsCurrent = 0 sv.connectionsCount = 0 - sv.queueLen = 0 return sv } @@ -109,7 +111,6 @@ func (sv *Server) SetAuthCallback(authCB func(name, pass string) bool) { sv.authCallback = authCB } -// Open the listening port and start the main server loop. func (sv *Server) Run() { xlog.Info("%s/%s", sv.software, sv.version) go monitoringRun(sv) @@ -139,7 +140,7 @@ func (sv *Server) Run() { } func (sv *Server) Dispatch(msg *irc.Message) { - sv.queue <- msg + sv.localq <- msg } func (sv *Server) AddClient(cl Client) { @@ -202,9 +203,14 @@ func (sv *Server) dispatcher() (err error) { } }() for { - sv.queueLen = float64(len(sv.queue)) select { - case msg := <-sv.queue: + case msg := <-sv.localq: + sv.recvMsg(msg) + sv.packetsTransferred++ + case msg := <-sv.remoteq: + if sv.localOrigin(msg) { + continue + } sv.recvMsg(msg) sv.packetsTransferred++ case cl := <-sv.addq: @@ -249,27 +255,25 @@ func (sv *Server) addClient(cl Client) { } sv.clients[clid] = cl sv.sendLogon(cl.Name()) - sv.connectionsCurrent = float64(len(sv.clients)) cl.Register(true) + sv.cluster.Subscribe(clid, sv.remoteq) xlog.Info("Client registered: '%s'", clid) xlog.Info("Server has %d client(s)", len(sv.clients)) - xlog.Debug("Goroutines running: %d", runtime.NumGoroutine()) } func (sv *Server) delClient(cl Client) { clid := strings.ToLower(cl.Name()) + sv.cluster.Unsubscribe(clid) cl.Destroy() - for chname, ch := range sv.chUsers { - if _, exists := ch[clid]; exists { - delete(ch, clid) - sv.sendMsg(irc.M(cl.Name(), "PART", chname, "quit")) + for chid, _ := range sv.chUsers { + if _, exists := sv.chUsers[chid][clid]; exists { + sv.channelRemoveUser(chid, clid) + sv.sendMsg(irc.M(cl.Name(), "PART", chid, "quit")) } } delete(sv.clients, clid) - sv.connectionsCurrent = float64(len(sv.clients)) xlog.Info("Client deleted: '%s'", clid) xlog.Info("Server has %d client(s)", len(sv.clients)) - xlog.Debug("Goroutines running: %d", runtime.NumGoroutine()) } func (sv *Server) recvMsg(msg *irc.Message) { @@ -304,13 +308,11 @@ func (sv *Server) recvMsg(msg *irc.Message) { hook.HookFn(sv, msg) } -// Forward an irc message to cluster and deliver locally func (sv *Server) sendMsg(msg *irc.Message) { sv.deliverMsg(msg) sv.forwardMsg(msg) } -// Local delivery of an irc message to channel or client func (sv *Server) deliverMsg(msg *irc.Message) { if strings.HasPrefix(msg.Args[0], "#") { chid := strings.ToLower(msg.Args[0]) @@ -337,16 +339,12 @@ func (sv *Server) deliverMsg(msg *irc.Message) { } } -// Forward to cluster func (sv *Server) forwardMsg(msg *irc.Message) { if sv.localOrigin(msg) { - // TODO: forward to cluster - println("forward:", msg.String()) + sv.cluster.Publish(msg) } } -// Send irc reply to local client and drop, if client doesnt -// exist locally func (sv *Server) sendReply(nick, cmd, args, trail string) { clid := strings.ToLower(nick) if _, exists := sv.clients[clid]; !exists { @@ -361,7 +359,6 @@ func (sv *Server) sendReply(nick, cmd, args, trail string) { cl.Receive(irc.M(sv.host, cmd, args, trail)) } -// Check if message's origin is local func (sv *Server) localOrigin(msg *irc.Message) bool { _, localClient := sv.clients[strings.ToLower(msg.Pre)] localServer := (msg.Pre == sv.host) @@ -395,7 +392,6 @@ func (sv *Server) sendLogon(nick string) { } } -// Send channel name list to client func (sv *Server) channelNames(nick, ch string) { chid := strings.ToLower(ch) if _, exists := sv.chUsers[chid]; !exists { @@ -425,7 +421,6 @@ func (sv *Server) channelNames(nick, ch string) { sv.sendReply(nick, RPL_ENDOFNAMES, ch, "End of /NAMES list") } -// Check if mode exists for the specified channel func (sv *Server) channelCheckMode(chid, mode string) bool { if _, exists := sv.chModes[chid]; !exists { return false @@ -444,3 +439,19 @@ func (sv *Server) channelCheckPerm(chid, clid, modes string) bool { } return false } + +func (sv *Server) channelRemoveUser(chid, clid string) { + if _, exists := sv.chUsers[chid]; !exists { + return + } + delete(sv.chUsers[chid], clid) + delete(sv.chModes[chid], "o "+clid) + delete(sv.chModes[chid], "h "+clid) + delete(sv.chModes[chid], "v "+clid) + if len(sv.chUsers[chid]) == 0 { + sv.cluster.Unsubscribe(chid) + delete(sv.chUsers, chid) + delete(sv.chTopics, chid) + delete(sv.chModes, chid) + } +}