diff --git a/client.go b/client.go index 15bf202..db0eccf 100644 --- a/client.go +++ b/client.go @@ -4,26 +4,46 @@ package gomatrix import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" "net/http" "net/url" "path" + "strconv" "sync" + "time" ) // Client represents a Matrix client. type Client struct { - HomeserverURL *url.URL // The base homeserver URL - Prefix string // The API prefix eg '/_matrix/client/r0' - UserID string // The user ID of the client. Used for forming HTTP paths which use the client's user ID. - AccessToken string // The access_token for the client. - syncingMutex sync.Mutex // protects syncingID - syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. - Client *http.Client // The underlying HTTP client which will be used to make HTTP requests. - FilterStorer FilterStorer // Interface for saving and loading the filter ID for sync. - NextBatchStorer NextBatchStorer // Interface for saving and loading the "next_batch" sync token. + HomeserverURL *url.URL // The base homeserver URL + Prefix string // The API prefix eg '/_matrix/client/r0' + UserID string // The user ID of the client. Used for forming HTTP paths which use the client's user ID. + AccessToken string // The access_token for the client. + syncingMutex sync.Mutex // protects syncingID + syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time. + Client *http.Client // The underlying HTTP client which will be used to make HTTP requests. + Syncer Syncer // The thing which can process /sync responses // TODO: Worker and Rooms } +// HTTPError An HTTP Error response, which may wrap an underlying native Go Error. +type HTTPError struct { + WrappedError error + Message string + Code int +} + +func (e HTTPError) Error() string { + var wrappedErrMsg string + if e.WrappedError != nil { + wrappedErrMsg = e.WrappedError.Error() + } + return fmt.Sprintf("%s: %d: %s", e.Message, e.Code, wrappedErrMsg) +} + // BuildURL builds a URL with the Client's homserver/prefix/access_token set already. func (cli *Client) BuildURL(urlPath ...string) string { ps := []string{cli.Prefix} @@ -58,6 +78,154 @@ func (cli *Client) BuildURLWithQuery(urlPath []string, urlQuery map[string]strin return u.String() } +// Sync starts syncing with the provided Homeserver. This function will block until a fatal /sync error occurs, so should +// almost always be started as a new goroutine. If Sync() is called twice then the first sync will be stopped. +func (cli *Client) Sync() error { + // Mark the client as syncing. + // We will keep syncing until the syncing state changes. Either because + // Sync is called or StopSync is called. + syncingID := cli.incrementSyncingID() + nextBatch := cli.Syncer.NextBatchStorer().LoadNextBatch(cli.UserID) + filterID := cli.Syncer.FilterStorer().LoadFilter(cli.UserID) + if filterID == "" { + filterJSON := cli.Syncer.FilterStorer().GetFilterJSON(cli.UserID) + resFilter, err := cli.CreateFilter(filterJSON) + if err != nil { + return err + } + filterID = resFilter.FilterID + cli.Syncer.FilterStorer().SaveFilter(cli.UserID, filterID) + } + + for { + resSync, err := cli.SyncRequest(30000, nextBatch, filterID, false, "") + if err != nil { + duration, err2 := cli.Syncer.OnFailedSync(resSync, err) + if err2 != nil { + return err2 + } + time.Sleep(duration) + continue + } + + // Check that the syncing state hasn't changed + // Either because we've stopped syncing or another sync has been started. + // We discard the response from our sync. + if cli.getSyncingID() != syncingID { + return nil + } + + // Save the token now *before* processing it. This means it's possible + // to not process some events, but it means that we won't get constantly stuck processing + // a malformed/buggy event which keeps making us panic. + cli.Syncer.NextBatchStorer().SaveNextBatch(cli.UserID, resSync.NextBatch) + if err = cli.Syncer.ProcessResponse(resSync, nextBatch); err != nil { + return err + } + + nextBatch = resSync.NextBatch + } +} + +func (cli *Client) incrementSyncingID() uint32 { + cli.syncingMutex.Lock() + defer cli.syncingMutex.Unlock() + cli.syncingID++ + return cli.syncingID +} + +func (cli *Client) getSyncingID() uint32 { + cli.syncingMutex.Lock() + defer cli.syncingMutex.Unlock() + return cli.syncingID +} + +// StopSync stops the ongoing sync started by Sync. +func (cli *Client) StopSync() { + // Advance the syncing state so that any running Syncs will terminate. + cli.incrementSyncingID() +} + +// SendJSON sends JSON to the given URL. Returns an error if the response is not 2xx. +func (cli *Client) SendJSON(method string, httpURL string, contentJSON interface{}) ([]byte, error) { + jsonStr, err := json.Marshal(contentJSON) + if err != nil { + return nil, err + } + req, err := http.NewRequest(method, httpURL, bytes.NewBuffer(jsonStr)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + res, err := cli.Client.Do(req) + if res != nil { + defer res.Body.Close() + } + if err != nil { + return nil, err + } + contents, err := ioutil.ReadAll(res.Body) + if res.StatusCode >= 300 || res.StatusCode < 200 { + return nil, HTTPError{ + Code: res.StatusCode, + Message: "Failed to " + method + " JSON: HTTP " + strconv.Itoa(res.StatusCode), + } + } + if err != nil { + return nil, err + } + return contents, nil +} + +// CreateFilter makes an HTTP request according to http://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-user-userid-filter +func (cli *Client) CreateFilter(filter json.RawMessage) (*RespCreateFilter, error) { + urlPath := cli.BuildURL("user", cli.UserID, "filter") + resBytes, err := cli.SendJSON("POST", urlPath, &filter) + if err != nil { + return nil, err + } + var filterResponse RespCreateFilter + if err = json.Unmarshal(resBytes, &filterResponse); err != nil { + return nil, err + } + return &filterResponse, nil +} + +// SyncRequest makes an HTTP request according to http://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync +func (cli *Client) SyncRequest(timeout int, since, filterID string, fullState bool, setPresence string) (*RespSync, error) { + query := map[string]string{ + "timeout": strconv.Itoa(timeout), + } + if since != "" { + query["since"] = since + } + if filterID != "" { + query["filter"] = filterID + } + if setPresence != "" { + query["set_presence"] = setPresence + } + if fullState { + query["full_state"] = "true" + } + urlPath := cli.BuildURLWithQuery([]string{"sync"}, query) + req, err := http.NewRequest("GET", urlPath, nil) + if err != nil { + return nil, err + } + res, err := cli.Client.Do(req) + if res != nil { + defer res.Body.Close() + } + if err != nil { + return nil, err + } + + var syncResponse RespSync + err = json.NewDecoder(res.Body).Decode(&syncResponse) + return &syncResponse, err +} + // NewClient creates a new Matrix Client ready for syncing func NewClient(homeserverURL, userID, accessToken string) (*Client, error) { hsURL, err := url.Parse(homeserverURL) @@ -69,15 +237,21 @@ func NewClient(homeserverURL, userID, accessToken string) (*Client, error) { HomeserverURL: hsURL, UserID: userID, Prefix: "/_matrix/client/r0", + Syncer: NewDefaultSyncer( + userID, + // By default, use an in-memory next_batch storer which will never save tokens to disk. + // The client will work with this storer: it just won't + // remember the token across restarts. In practice, a database backend should be used. + &InMemoryNextBatchStore{make(map[string]string)}, + // By default, use an in-memory filter storer which will never save the filter ID to disk. + // The client will work with this storer: it just won't remember the filter + // ID across restarts and hence request a new one. In practice, a database backend should be used. + &InMemoryFilterStore{ + Filter: json.RawMessage(`{"room":{"timeline":{"limit":50}}}`), + UserToFilter: make(map[string]string), + }, + ), } - // By default, use a no-op next_batch storer which will never save tokens and always - // "load" the empty string as a token. The client will work with this storer: it just won't - // remember the token across restarts. In practice, a database backend should be used. - cli.NextBatchStorer = NopNextBatchStore{} - // By default, use a no-op filter storer which will never save the filter ID and always - // "load" nothing. The client will work with this storer: it just won't remember the filter - // ID across restarts and hence request a new one. In practice, a database backend should be used. - cli.FilterStorer = NopFilterStore{} // By default, use the default HTTP client. cli.Client = http.DefaultClient diff --git a/room.go b/room.go index dde6aba..0533b3e 100644 --- a/room.go +++ b/room.go @@ -39,3 +39,12 @@ func (room Room) GetMembershipState(userID string) string { } return state } + +// NewRoom creates a new Room with the given ID +func NewRoom(roomID string) *Room { + // Init the State map and return a pointer to the Room + return &Room{ + ID: roomID, + State: make(map[string]map[string]*Event), + } +} diff --git a/sync.go b/sync.go index 03c2a2e..ca21be0 100644 --- a/sync.go +++ b/sync.go @@ -1,35 +1,218 @@ package gomatrix +import ( + "encoding/json" + "fmt" + "runtime/debug" + "time" +) + +// Syncer represents an interface that must be satisfied in order to do /sync requests on a client. +type Syncer interface { + // Process the /sync response. The since parameter is the since= value that was used to produce the response. + // This is useful for detecting the very first sync (since=""). If an error is return, Syncing will be stopped + // permanently. + ProcessResponse(resp *RespSync, since string) error + // Interface for saving and loading the "next_batch" sync token. + NextBatchStorer() NextBatchStorer + // Interface for saving and loading the filter ID for sync. + FilterStorer() FilterStorer + // OnFailedSync returns either the time to wait before retrying or an error to stop syncing permanently. + OnFailedSync(res *RespSync, err error) (time.Duration, error) +} + +// DefaultSyncer is the default syncing implementation. You can either write your own syncer, or selectively +// replace parts of this default syncer (e.g. the NextBatch/Filter storers, or the ProcessResponse method). +type DefaultSyncer struct { + UserID string + Rooms map[string]*Room + NextBatchStore NextBatchStorer + FilterStore FilterStorer + listeners map[string][]OnEventListener // event type to listeners array +} + +// OnEventListener can be used with DefaultSyncer.OnEventType to be informed of incoming events. +type OnEventListener func(*Event) + +// NewDefaultSyncer returns an instantiated DefaultSyncer +func NewDefaultSyncer(userID string, nextBatch NextBatchStorer, filterStore FilterStorer) *DefaultSyncer { + return &DefaultSyncer{ + UserID: userID, + Rooms: make(map[string]*Room), + NextBatchStore: nextBatch, + FilterStore: filterStore, + listeners: make(map[string][]OnEventListener), + } +} + +// ProcessResponse processes the /sync response in a way suitable for bots. "Suitable for bots" means a stream of +// unrepeating events. +func (s *DefaultSyncer) ProcessResponse(res *RespSync, since string) (err error) { + if !s.shouldProcessResponse(res, since) { + return + } + + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("ProcessResponse panicked! userID=%s since=%s panic=%s\n%s", s.UserID, since, r, debug.Stack()) + } + }() + + for roomID, roomData := range res.Rooms.Join { + room := s.getOrCreateRoom(roomID) + for _, event := range roomData.State.Events { + event.RoomID = roomID + room.UpdateState(&event) + s.notifyListeners(&event) + } + for _, event := range roomData.Timeline.Events { + event.RoomID = roomID + s.notifyListeners(&event) + } + } + for roomID, roomData := range res.Rooms.Invite { + room := s.getOrCreateRoom(roomID) + for _, event := range roomData.State.Events { + event.RoomID = roomID + room.UpdateState(&event) + s.notifyListeners(&event) + } + } + return +} + +// OnEventType allows callers to be notified when there are new events for the given event type. +// There are no duplicate checks. +func (s *DefaultSyncer) OnEventType(eventType string, callback OnEventListener) { + _, exists := s.listeners[eventType] + if !exists { + s.listeners[eventType] = []OnEventListener{} + } + s.listeners[eventType] = append(s.listeners[eventType], callback) +} + +// shouldProcessResponse returns true if the response should be processed. May modify the response to remove +// stuff that shouldn't be processed. +func (s *DefaultSyncer) shouldProcessResponse(resp *RespSync, since string) bool { + if since == "" { + return false + } + // This is a horrible hack because /sync will return the most recent messages for a room + // as soon as you /join it. We do NOT want to process those events in that particular room + // because they may have already been processed (if you toggle the bot in/out of the room). + // + // Work around this by inspecting each room's timeline and seeing if an m.room.member event for us + // exists and is "join" and then discard processing that room entirely if so. + // TODO: We probably want to process messages from after the last join event in the timeline. + for roomID, roomData := range resp.Rooms.Join { + for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- { + e := roomData.Timeline.Events[i] + if e.Type == "m.room.member" && e.StateKey == s.UserID { + m := e.Content["membership"] + mship, ok := m.(string) + if !ok { + continue + } + if mship == "join" { + _, ok := resp.Rooms.Join[roomID] + if !ok { + continue + } + delete(resp.Rooms.Join, roomID) // don't re-process messages + delete(resp.Rooms.Invite, roomID) // don't re-process invites + break + } + } + } + } + return true +} + +// getOrCreateRoom must only be called by the Sync() goroutine which calls ProcessResponse() +func (s *DefaultSyncer) getOrCreateRoom(roomID string) *Room { + room := s.Rooms[roomID] + if room == nil { // create a new Room + room = NewRoom(roomID) + s.Rooms[roomID] = room + } + return room +} + +func (s *DefaultSyncer) notifyListeners(event *Event) { + listeners, exists := s.listeners[event.Type] + if !exists { + return + } + for _, fn := range listeners { + fn(event) + } +} + +// NextBatchStorer returns the provided NextBatchStorer +func (s *DefaultSyncer) NextBatchStorer() NextBatchStorer { + return s.NextBatchStore +} + +// FilterStorer returns the provided FilterStorer +func (s *DefaultSyncer) FilterStorer() FilterStorer { + return s.FilterStore +} + +// OnFailedSync always returns a 10 second wait period between failed /syncs. +func (s *DefaultSyncer) OnFailedSync(res *RespSync, err error) (time.Duration, error) { + return 10 * time.Second, nil +} + // NextBatchStorer controls loading/saving of next_batch tokens for users type NextBatchStorer interface { - // Save a next_batch token for a given user. Best effort. - Save(userID, nextBatch string) - // Load a next_batch token for a given user. Return an empty string if no token exists. - Load(userID string) string + // SaveNextBatch saves a next_batch token for a given user. Best effort. + SaveNextBatch(userID, nextBatch string) + // LoadNextBatch loads a next_batch token for a given user. Return an empty string if no token exists. + LoadNextBatch(userID string) string } -// NopNextBatchStore does not load or save next_batch tokens. -type NopNextBatchStore struct{} +// InMemoryNextBatchStore stores next batch tokens in memory. +type InMemoryNextBatchStore struct { + UserToNextBatch map[string]string +} -// Save does nothing -func (s NopNextBatchStore) Save(userID, nextBatch string) {} +// SaveNextBatch saves the mapping in-memory. +func (s *InMemoryNextBatchStore) SaveNextBatch(userID, nextBatch string) { + s.UserToNextBatch[userID] = nextBatch +} -// Load does nothing -func (s NopNextBatchStore) Load(userID string) string { return "" } +// LoadNextBatch loads an existing mapping. Returns an empty string if not found +func (s *InMemoryNextBatchStore) LoadNextBatch(userID string) string { + return s.UserToNextBatch[userID] +} // FilterStorer controls loading/saving of filter IDs for users type FilterStorer interface { - // Save a filter ID for a given user. Best effort. - Save(userID, filterID string) - // Load a filter ID for a given user. Return an empty string if no token exists. - Load(userID string) string + // SaveFilter saves a filter ID for a given user. Best effort. + SaveFilter(userID, filterID string) + // LoadFilter loads a filter ID for a given user. Return an empty string if no token exists. + LoadFilter(userID string) string + // GetFilterJSON for the given user ID. + GetFilterJSON(userID string) json.RawMessage } -// NopFilterStore does not load or save filter IDs. -type NopFilterStore struct{} +// InMemoryFilterStore stores filter IDs in memory. It always returns the filter JSON given in the struct. +type InMemoryFilterStore struct { + Filter json.RawMessage + UserToFilter map[string]string +} -// Save does nothing -func (s NopFilterStore) Save(userID, filterID string) {} +// SaveFilter saves the user->filter ID mapping in memory +func (s *InMemoryFilterStore) SaveFilter(userID, filterID string) { + s.UserToFilter[userID] = filterID +} -// Load does nothing -func (s NopFilterStore) Load(userID string) string { return "" } +// LoadFilter loads a previously saved user->filter ID mapping from memory. Returns the empty string if not found. +func (s *InMemoryFilterStore) LoadFilter(userID string) string { + return s.UserToFilter[userID] +} + +// GetFilterJSON returns InMemoryFilterStore.Filter +func (s *InMemoryFilterStore) GetFilterJSON(userID string) json.RawMessage { + return s.Filter +}