From 9e6de5ac3770f0cef7a0927b56d7377226d13b72 Mon Sep 17 00:00:00 2001 From: Andreas Neue Date: Fri, 12 Aug 2016 08:08:38 +0200 Subject: [PATCH] Implemented NATS cluster connector --- cluster.go | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ nats.go | 30 ----------------------------- 2 files changed, 56 insertions(+), 30 deletions(-) create mode 100644 cluster.go delete mode 100644 nats.go diff --git a/cluster.go b/cluster.go new file mode 100644 index 0000000..b83f006 --- /dev/null +++ b/cluster.go @@ -0,0 +1,56 @@ +could not parse "irc.go": /home/an/src/iqcomm_/src/irc/irc.go:128:20: expected boolean expression, found simple statement (missing parentheses around composite literal?) +// vim:ts=4:sts=4:sw=4:noet:tw=72 + +package ircd + +import ( + "strings" + + "code.dnix.de/an/irc" + + "github.com/nats-io/nats" +) + +type ClusterConnector struct { + conn *nats.Conn + subs map[string]*nats.Subscription +} + +func NewClusterConnector(servers string, ssl bool) *NatsConnector { + opts := nats.DefaultOptions + opts.Servers = strings.Split(servers, ",") + for i, s := range opts.Servers { + opts.Servers[i] = strings.Trim(s, " ") + } + opts.Secure = ssl + conn, err := opts.Connect() + if err != nil { + // foo + } + subs := make(map[string]*nats.Subscription) + return &NatsConnector{conn: conn, subs: subs} +} + +func (cc *ClusterConnector) Subscribe(subj string, ch chan *irc.Message) { + if _, exists := c.subs[subj]; exists { + return + } + sub, err := c.natsConn.Subscribe(subj, func(n *nats.Msg) { + m := irc.Parse(string(n.Data)) + ch <- m + }) + if err != nil { + c.subs[subj] = sub + } + +} + +func (cc *ClusterConnector) Unsubscribe(subj string) { + cc.conn.Unsubscribe(subj) + delete(cc.subs, subj) +} + +func (cc *ClusterConnector) Publish(msg *irc.Message) { + subj := strings.ToLower(msg.Pre) + cc.conn.Publish(subj, []bytes(msg.String())) +} diff --git a/nats.go b/nats.go deleted file mode 100644 index 8542677..0000000 --- a/nats.go +++ /dev/null @@ -1,30 +0,0 @@ -// vim:ts=4:sts=4:sw=4:noet:tw=72 - -package ircd - -import ( - "strings" - - "github.com/nats-io/nats" -) - -type NatsConnector struct { - natsConn *nats.Conn - subscriptions map[string]*nats.Subscription -} - -func NewNatsConnector(servers *string) *NatsConnector { - opts := nats.DefaultOptions - opts.Servers = strings.Split(*servers, ",") - for i, s := range opts.Servers { - opts.Servers[i] = strings.Trim(s, " ") - } - //opts.Secure = *ssl - opts.Secure = false - conn, err := opts.Connect() - if err != nil { - // foo - } - subs := make(map[string]*nats.Subscription) - return &NatsConnector{natsConn: conn, subscriptions: subs} -}