Move remaining Go-NEB code into gomatrix

This commit is contained in:
Kegan Dougal 2016-11-30 17:24:46 +00:00
parent 652dacf366
commit ca6f598808
3 changed files with 403 additions and 37 deletions

208
client.go
View File

@ -4,26 +4,46 @@
package gomatrix
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"strconv"
"sync"
"time"
)
// Client represents a Matrix client.
type Client struct {
HomeserverURL *url.URL // The base homeserver URL
Prefix string // The API prefix eg '/_matrix/client/r0'
UserID string // The user ID of the client. Used for forming HTTP paths which use the client's user ID.
AccessToken string // The access_token for the client.
syncingMutex sync.Mutex // protects syncingID
syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time.
Client *http.Client // The underlying HTTP client which will be used to make HTTP requests.
FilterStorer FilterStorer // Interface for saving and loading the filter ID for sync.
NextBatchStorer NextBatchStorer // Interface for saving and loading the "next_batch" sync token.
HomeserverURL *url.URL // The base homeserver URL
Prefix string // The API prefix eg '/_matrix/client/r0'
UserID string // The user ID of the client. Used for forming HTTP paths which use the client's user ID.
AccessToken string // The access_token for the client.
syncingMutex sync.Mutex // protects syncingID
syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time.
Client *http.Client // The underlying HTTP client which will be used to make HTTP requests.
Syncer Syncer // The thing which can process /sync responses
// TODO: Worker and Rooms
}
// HTTPError An HTTP Error response, which may wrap an underlying native Go Error.
type HTTPError struct {
WrappedError error
Message string
Code int
}
func (e HTTPError) Error() string {
var wrappedErrMsg string
if e.WrappedError != nil {
wrappedErrMsg = e.WrappedError.Error()
}
return fmt.Sprintf("%s: %d: %s", e.Message, e.Code, wrappedErrMsg)
}
// BuildURL builds a URL with the Client's homserver/prefix/access_token set already.
func (cli *Client) BuildURL(urlPath ...string) string {
ps := []string{cli.Prefix}
@ -58,6 +78,154 @@ func (cli *Client) BuildURLWithQuery(urlPath []string, urlQuery map[string]strin
return u.String()
}
// Sync starts syncing with the provided Homeserver. This function will block until a fatal /sync error occurs, so should
// almost always be started as a new goroutine. If Sync() is called twice then the first sync will be stopped.
func (cli *Client) Sync() error {
// Mark the client as syncing.
// We will keep syncing until the syncing state changes. Either because
// Sync is called or StopSync is called.
syncingID := cli.incrementSyncingID()
nextBatch := cli.Syncer.NextBatchStorer().LoadNextBatch(cli.UserID)
filterID := cli.Syncer.FilterStorer().LoadFilter(cli.UserID)
if filterID == "" {
filterJSON := cli.Syncer.FilterStorer().GetFilterJSON(cli.UserID)
resFilter, err := cli.CreateFilter(filterJSON)
if err != nil {
return err
}
filterID = resFilter.FilterID
cli.Syncer.FilterStorer().SaveFilter(cli.UserID, filterID)
}
for {
resSync, err := cli.SyncRequest(30000, nextBatch, filterID, false, "")
if err != nil {
duration, err2 := cli.Syncer.OnFailedSync(resSync, err)
if err2 != nil {
return err2
}
time.Sleep(duration)
continue
}
// Check that the syncing state hasn't changed
// Either because we've stopped syncing or another sync has been started.
// We discard the response from our sync.
if cli.getSyncingID() != syncingID {
return nil
}
// Save the token now *before* processing it. This means it's possible
// to not process some events, but it means that we won't get constantly stuck processing
// a malformed/buggy event which keeps making us panic.
cli.Syncer.NextBatchStorer().SaveNextBatch(cli.UserID, resSync.NextBatch)
if err = cli.Syncer.ProcessResponse(resSync, nextBatch); err != nil {
return err
}
nextBatch = resSync.NextBatch
}
}
func (cli *Client) incrementSyncingID() uint32 {
cli.syncingMutex.Lock()
defer cli.syncingMutex.Unlock()
cli.syncingID++
return cli.syncingID
}
func (cli *Client) getSyncingID() uint32 {
cli.syncingMutex.Lock()
defer cli.syncingMutex.Unlock()
return cli.syncingID
}
// StopSync stops the ongoing sync started by Sync.
func (cli *Client) StopSync() {
// Advance the syncing state so that any running Syncs will terminate.
cli.incrementSyncingID()
}
// SendJSON sends JSON to the given URL. Returns an error if the response is not 2xx.
func (cli *Client) SendJSON(method string, httpURL string, contentJSON interface{}) ([]byte, error) {
jsonStr, err := json.Marshal(contentJSON)
if err != nil {
return nil, err
}
req, err := http.NewRequest(method, httpURL, bytes.NewBuffer(jsonStr))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
res, err := cli.Client.Do(req)
if res != nil {
defer res.Body.Close()
}
if err != nil {
return nil, err
}
contents, err := ioutil.ReadAll(res.Body)
if res.StatusCode >= 300 || res.StatusCode < 200 {
return nil, HTTPError{
Code: res.StatusCode,
Message: "Failed to " + method + " JSON: HTTP " + strconv.Itoa(res.StatusCode),
}
}
if err != nil {
return nil, err
}
return contents, nil
}
// CreateFilter makes an HTTP request according to http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-user-userid-filter
func (cli *Client) CreateFilter(filter json.RawMessage) (*RespCreateFilter, error) {
urlPath := cli.BuildURL("user", cli.UserID, "filter")
resBytes, err := cli.SendJSON("POST", urlPath, &filter)
if err != nil {
return nil, err
}
var filterResponse RespCreateFilter
if err = json.Unmarshal(resBytes, &filterResponse); err != nil {
return nil, err
}
return &filterResponse, nil
}
// SyncRequest makes an HTTP request according to http://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
func (cli *Client) SyncRequest(timeout int, since, filterID string, fullState bool, setPresence string) (*RespSync, error) {
query := map[string]string{
"timeout": strconv.Itoa(timeout),
}
if since != "" {
query["since"] = since
}
if filterID != "" {
query["filter"] = filterID
}
if setPresence != "" {
query["set_presence"] = setPresence
}
if fullState {
query["full_state"] = "true"
}
urlPath := cli.BuildURLWithQuery([]string{"sync"}, query)
req, err := http.NewRequest("GET", urlPath, nil)
if err != nil {
return nil, err
}
res, err := cli.Client.Do(req)
if res != nil {
defer res.Body.Close()
}
if err != nil {
return nil, err
}
var syncResponse RespSync
err = json.NewDecoder(res.Body).Decode(&syncResponse)
return &syncResponse, err
}
// NewClient creates a new Matrix Client ready for syncing
func NewClient(homeserverURL, userID, accessToken string) (*Client, error) {
hsURL, err := url.Parse(homeserverURL)
@ -69,15 +237,21 @@ func NewClient(homeserverURL, userID, accessToken string) (*Client, error) {
HomeserverURL: hsURL,
UserID: userID,
Prefix: "/_matrix/client/r0",
Syncer: NewDefaultSyncer(
userID,
// By default, use an in-memory next_batch storer which will never save tokens to disk.
// The client will work with this storer: it just won't
// remember the token across restarts. In practice, a database backend should be used.
&InMemoryNextBatchStore{make(map[string]string)},
// By default, use an in-memory filter storer which will never save the filter ID to disk.
// The client will work with this storer: it just won't remember the filter
// ID across restarts and hence request a new one. In practice, a database backend should be used.
&InMemoryFilterStore{
Filter: json.RawMessage(`{"room":{"timeline":{"limit":50}}}`),
UserToFilter: make(map[string]string),
},
),
}
// By default, use a no-op next_batch storer which will never save tokens and always
// "load" the empty string as a token. The client will work with this storer: it just won't
// remember the token across restarts. In practice, a database backend should be used.
cli.NextBatchStorer = NopNextBatchStore{}
// By default, use a no-op filter storer which will never save the filter ID and always
// "load" nothing. The client will work with this storer: it just won't remember the filter
// ID across restarts and hence request a new one. In practice, a database backend should be used.
cli.FilterStorer = NopFilterStore{}
// By default, use the default HTTP client.
cli.Client = http.DefaultClient

View File

@ -39,3 +39,12 @@ func (room Room) GetMembershipState(userID string) string {
}
return state
}
// NewRoom creates a new Room with the given ID
func NewRoom(roomID string) *Room {
// Init the State map and return a pointer to the Room
return &Room{
ID: roomID,
State: make(map[string]map[string]*Event),
}
}

223
sync.go
View File

@ -1,35 +1,218 @@
package gomatrix
import (
"encoding/json"
"fmt"
"runtime/debug"
"time"
)
// Syncer represents an interface that must be satisfied in order to do /sync requests on a client.
type Syncer interface {
// Process the /sync response. The since parameter is the since= value that was used to produce the response.
// This is useful for detecting the very first sync (since=""). If an error is return, Syncing will be stopped
// permanently.
ProcessResponse(resp *RespSync, since string) error
// Interface for saving and loading the "next_batch" sync token.
NextBatchStorer() NextBatchStorer
// Interface for saving and loading the filter ID for sync.
FilterStorer() FilterStorer
// OnFailedSync returns either the time to wait before retrying or an error to stop syncing permanently.
OnFailedSync(res *RespSync, err error) (time.Duration, error)
}
// DefaultSyncer is the default syncing implementation. You can either write your own syncer, or selectively
// replace parts of this default syncer (e.g. the NextBatch/Filter storers, or the ProcessResponse method).
type DefaultSyncer struct {
UserID string
Rooms map[string]*Room
NextBatchStore NextBatchStorer
FilterStore FilterStorer
listeners map[string][]OnEventListener // event type to listeners array
}
// OnEventListener can be used with DefaultSyncer.OnEventType to be informed of incoming events.
type OnEventListener func(*Event)
// NewDefaultSyncer returns an instantiated DefaultSyncer
func NewDefaultSyncer(userID string, nextBatch NextBatchStorer, filterStore FilterStorer) *DefaultSyncer {
return &DefaultSyncer{
UserID: userID,
Rooms: make(map[string]*Room),
NextBatchStore: nextBatch,
FilterStore: filterStore,
listeners: make(map[string][]OnEventListener),
}
}
// ProcessResponse processes the /sync response in a way suitable for bots. "Suitable for bots" means a stream of
// unrepeating events.
func (s *DefaultSyncer) ProcessResponse(res *RespSync, since string) (err error) {
if !s.shouldProcessResponse(res, since) {
return
}
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("ProcessResponse panicked! userID=%s since=%s panic=%s\n%s", s.UserID, since, r, debug.Stack())
}
}()
for roomID, roomData := range res.Rooms.Join {
room := s.getOrCreateRoom(roomID)
for _, event := range roomData.State.Events {
event.RoomID = roomID
room.UpdateState(&event)
s.notifyListeners(&event)
}
for _, event := range roomData.Timeline.Events {
event.RoomID = roomID
s.notifyListeners(&event)
}
}
for roomID, roomData := range res.Rooms.Invite {
room := s.getOrCreateRoom(roomID)
for _, event := range roomData.State.Events {
event.RoomID = roomID
room.UpdateState(&event)
s.notifyListeners(&event)
}
}
return
}
// OnEventType allows callers to be notified when there are new events for the given event type.
// There are no duplicate checks.
func (s *DefaultSyncer) OnEventType(eventType string, callback OnEventListener) {
_, exists := s.listeners[eventType]
if !exists {
s.listeners[eventType] = []OnEventListener{}
}
s.listeners[eventType] = append(s.listeners[eventType], callback)
}
// shouldProcessResponse returns true if the response should be processed. May modify the response to remove
// stuff that shouldn't be processed.
func (s *DefaultSyncer) shouldProcessResponse(resp *RespSync, since string) bool {
if since == "" {
return false
}
// This is a horrible hack because /sync will return the most recent messages for a room
// as soon as you /join it. We do NOT want to process those events in that particular room
// because they may have already been processed (if you toggle the bot in/out of the room).
//
// Work around this by inspecting each room's timeline and seeing if an m.room.member event for us
// exists and is "join" and then discard processing that room entirely if so.
// TODO: We probably want to process messages from after the last join event in the timeline.
for roomID, roomData := range resp.Rooms.Join {
for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- {
e := roomData.Timeline.Events[i]
if e.Type == "m.room.member" && e.StateKey == s.UserID {
m := e.Content["membership"]
mship, ok := m.(string)
if !ok {
continue
}
if mship == "join" {
_, ok := resp.Rooms.Join[roomID]
if !ok {
continue
}
delete(resp.Rooms.Join, roomID) // don't re-process messages
delete(resp.Rooms.Invite, roomID) // don't re-process invites
break
}
}
}
}
return true
}
// getOrCreateRoom must only be called by the Sync() goroutine which calls ProcessResponse()
func (s *DefaultSyncer) getOrCreateRoom(roomID string) *Room {
room := s.Rooms[roomID]
if room == nil { // create a new Room
room = NewRoom(roomID)
s.Rooms[roomID] = room
}
return room
}
func (s *DefaultSyncer) notifyListeners(event *Event) {
listeners, exists := s.listeners[event.Type]
if !exists {
return
}
for _, fn := range listeners {
fn(event)
}
}
// NextBatchStorer returns the provided NextBatchStorer
func (s *DefaultSyncer) NextBatchStorer() NextBatchStorer {
return s.NextBatchStore
}
// FilterStorer returns the provided FilterStorer
func (s *DefaultSyncer) FilterStorer() FilterStorer {
return s.FilterStore
}
// OnFailedSync always returns a 10 second wait period between failed /syncs.
func (s *DefaultSyncer) OnFailedSync(res *RespSync, err error) (time.Duration, error) {
return 10 * time.Second, nil
}
// NextBatchStorer controls loading/saving of next_batch tokens for users
type NextBatchStorer interface {
// Save a next_batch token for a given user. Best effort.
Save(userID, nextBatch string)
// Load a next_batch token for a given user. Return an empty string if no token exists.
Load(userID string) string
// SaveNextBatch saves a next_batch token for a given user. Best effort.
SaveNextBatch(userID, nextBatch string)
// LoadNextBatch loads a next_batch token for a given user. Return an empty string if no token exists.
LoadNextBatch(userID string) string
}
// NopNextBatchStore does not load or save next_batch tokens.
type NopNextBatchStore struct{}
// InMemoryNextBatchStore stores next batch tokens in memory.
type InMemoryNextBatchStore struct {
UserToNextBatch map[string]string
}
// Save does nothing
func (s NopNextBatchStore) Save(userID, nextBatch string) {}
// SaveNextBatch saves the mapping in-memory.
func (s *InMemoryNextBatchStore) SaveNextBatch(userID, nextBatch string) {
s.UserToNextBatch[userID] = nextBatch
}
// Load does nothing
func (s NopNextBatchStore) Load(userID string) string { return "" }
// LoadNextBatch loads an existing mapping. Returns an empty string if not found
func (s *InMemoryNextBatchStore) LoadNextBatch(userID string) string {
return s.UserToNextBatch[userID]
}
// FilterStorer controls loading/saving of filter IDs for users
type FilterStorer interface {
// Save a filter ID for a given user. Best effort.
Save(userID, filterID string)
// Load a filter ID for a given user. Return an empty string if no token exists.
Load(userID string) string
// SaveFilter saves a filter ID for a given user. Best effort.
SaveFilter(userID, filterID string)
// LoadFilter loads a filter ID for a given user. Return an empty string if no token exists.
LoadFilter(userID string) string
// GetFilterJSON for the given user ID.
GetFilterJSON(userID string) json.RawMessage
}
// NopFilterStore does not load or save filter IDs.
type NopFilterStore struct{}
// InMemoryFilterStore stores filter IDs in memory. It always returns the filter JSON given in the struct.
type InMemoryFilterStore struct {
Filter json.RawMessage
UserToFilter map[string]string
}
// Save does nothing
func (s NopFilterStore) Save(userID, filterID string) {}
// SaveFilter saves the user->filter ID mapping in memory
func (s *InMemoryFilterStore) SaveFilter(userID, filterID string) {
s.UserToFilter[userID] = filterID
}
// Load does nothing
func (s NopFilterStore) Load(userID string) string { return "" }
// LoadFilter loads a previously saved user->filter ID mapping from memory. Returns the empty string if not found.
func (s *InMemoryFilterStore) LoadFilter(userID string) string {
return s.UserToFilter[userID]
}
// GetFilterJSON returns InMemoryFilterStore.Filter
func (s *InMemoryFilterStore) GetFilterJSON(userID string) json.RawMessage {
return s.Filter
}