Implemented NATS cluster connector
This commit is contained in:
		
							parent
							
								
									3c30adab5c
								
							
						
					
					
						commit
						9e6de5ac37
					
				
					 2 changed files with 56 additions and 30 deletions
				
			
		
							
								
								
									
										56
									
								
								cluster.go
									
										
									
									
									
										Normal file
									
								
							
							
						
						
									
										56
									
								
								cluster.go
									
										
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,56 @@ | ||||||
|  | could not parse "irc.go": /home/an/src/iqcomm_/src/irc/irc.go:128:20: expected boolean expression, found simple statement (missing parentheses around composite literal?) | ||||||
|  | // vim:ts=4:sts=4:sw=4:noet:tw=72 | ||||||
|  | 
 | ||||||
|  | package ircd | ||||||
|  | 
 | ||||||
|  | import ( | ||||||
|  | 	"strings" | ||||||
|  | 
 | ||||||
|  | 	"code.dnix.de/an/irc" | ||||||
|  | 
 | ||||||
|  | 	"github.com/nats-io/nats" | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type ClusterConnector struct { | ||||||
|  | 	conn *nats.Conn | ||||||
|  | 	subs map[string]*nats.Subscription | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func NewClusterConnector(servers string, ssl bool) *NatsConnector { | ||||||
|  | 	opts := nats.DefaultOptions | ||||||
|  | 	opts.Servers = strings.Split(servers, ",") | ||||||
|  | 	for i, s := range opts.Servers { | ||||||
|  | 		opts.Servers[i] = strings.Trim(s, " ") | ||||||
|  | 	} | ||||||
|  | 	opts.Secure = ssl | ||||||
|  | 	conn, err := opts.Connect() | ||||||
|  | 	if err != nil { | ||||||
|  | 		// foo | ||||||
|  | 	} | ||||||
|  | 	subs := make(map[string]*nats.Subscription) | ||||||
|  | 	return &NatsConnector{conn: conn, subs: subs} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (cc *ClusterConnector) Subscribe(subj string, ch chan *irc.Message) { | ||||||
|  | 	if _, exists := c.subs[subj]; exists { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	sub, err := c.natsConn.Subscribe(subj, func(n *nats.Msg) { | ||||||
|  | 		m := irc.Parse(string(n.Data)) | ||||||
|  | 		ch <- m | ||||||
|  | 	}) | ||||||
|  | 	if err != nil { | ||||||
|  | 		c.subs[subj] = sub | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (cc *ClusterConnector) Unsubscribe(subj string) { | ||||||
|  | 	cc.conn.Unsubscribe(subj) | ||||||
|  | 	delete(cc.subs, subj) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (cc *ClusterConnector) Publish(msg *irc.Message) { | ||||||
|  | 	subj := strings.ToLower(msg.Pre) | ||||||
|  | 	cc.conn.Publish(subj, []bytes(msg.String())) | ||||||
|  | } | ||||||
							
								
								
									
										30
									
								
								nats.go
									
										
									
									
									
								
							
							
						
						
									
										30
									
								
								nats.go
									
										
									
									
									
								
							|  | @ -1,30 +0,0 @@ | ||||||
| // vim:ts=4:sts=4:sw=4:noet:tw=72 |  | ||||||
| 
 |  | ||||||
| package ircd |  | ||||||
| 
 |  | ||||||
| import ( |  | ||||||
| 	"strings" |  | ||||||
| 
 |  | ||||||
| 	"github.com/nats-io/nats" |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| type NatsConnector struct { |  | ||||||
| 	natsConn      *nats.Conn |  | ||||||
| 	subscriptions map[string]*nats.Subscription |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| func NewNatsConnector(servers *string) *NatsConnector { |  | ||||||
| 	opts := nats.DefaultOptions |  | ||||||
| 	opts.Servers = strings.Split(*servers, ",") |  | ||||||
| 	for i, s := range opts.Servers { |  | ||||||
| 		opts.Servers[i] = strings.Trim(s, " ") |  | ||||||
| 	} |  | ||||||
| 	//opts.Secure = *ssl |  | ||||||
| 	opts.Secure = false |  | ||||||
| 	conn, err := opts.Connect() |  | ||||||
| 	if err != nil { |  | ||||||
| 		// foo |  | ||||||
| 	} |  | ||||||
| 	subs := make(map[string]*nats.Subscription) |  | ||||||
| 	return &NatsConnector{natsConn: conn, subscriptions: subs} |  | ||||||
| } |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue