mirror of https://github.com/matrix-org/gomatrix
Remove NextBatchStorer/FilterStorer. Replace with Storer
This commit is contained in:
parent
5391ef3078
commit
70aad48fbf
31
client.go
31
client.go
|
@ -38,6 +38,7 @@ type Client struct {
|
||||||
syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time.
|
syncingID uint32 // Identifies the current Sync. Only one Sync can be active at any given time.
|
||||||
Client *http.Client // The underlying HTTP client which will be used to make HTTP requests.
|
Client *http.Client // The underlying HTTP client which will be used to make HTTP requests.
|
||||||
Syncer Syncer // The thing which can process /sync responses
|
Syncer Syncer // The thing which can process /sync responses
|
||||||
|
Store Storer // The thing which can store rooms/tokens/ids
|
||||||
}
|
}
|
||||||
|
|
||||||
// HTTPError An HTTP Error response, which may wrap an underlying native Go Error.
|
// HTTPError An HTTP Error response, which may wrap an underlying native Go Error.
|
||||||
|
@ -96,16 +97,16 @@ func (cli *Client) Sync() error {
|
||||||
// We will keep syncing until the syncing state changes. Either because
|
// We will keep syncing until the syncing state changes. Either because
|
||||||
// Sync is called or StopSync is called.
|
// Sync is called or StopSync is called.
|
||||||
syncingID := cli.incrementSyncingID()
|
syncingID := cli.incrementSyncingID()
|
||||||
nextBatch := cli.Syncer.NextBatchStorer().LoadNextBatch(cli.UserID)
|
nextBatch := cli.Store.LoadNextBatch(cli.UserID)
|
||||||
filterID := cli.Syncer.FilterStorer().LoadFilter(cli.UserID)
|
filterID := cli.Store.LoadFilterID(cli.UserID)
|
||||||
if filterID == "" {
|
if filterID == "" {
|
||||||
filterJSON := cli.Syncer.FilterStorer().GetFilterJSON(cli.UserID)
|
filterJSON := cli.Syncer.GetFilterJSON(cli.UserID)
|
||||||
resFilter, err := cli.CreateFilter(filterJSON)
|
resFilter, err := cli.CreateFilter(filterJSON)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
filterID = resFilter.FilterID
|
filterID = resFilter.FilterID
|
||||||
cli.Syncer.FilterStorer().SaveFilter(cli.UserID, filterID)
|
cli.Store.SaveFilterID(cli.UserID, filterID)
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
@ -129,7 +130,7 @@ func (cli *Client) Sync() error {
|
||||||
// Save the token now *before* processing it. This means it's possible
|
// Save the token now *before* processing it. This means it's possible
|
||||||
// to not process some events, but it means that we won't get constantly stuck processing
|
// to not process some events, but it means that we won't get constantly stuck processing
|
||||||
// a malformed/buggy event which keeps making us panic.
|
// a malformed/buggy event which keeps making us panic.
|
||||||
cli.Syncer.NextBatchStorer().SaveNextBatch(cli.UserID, resSync.NextBatch)
|
cli.Store.SaveNextBatch(cli.UserID, resSync.NextBatch)
|
||||||
if err = cli.Syncer.ProcessResponse(resSync, nextBatch); err != nil {
|
if err = cli.Syncer.ProcessResponse(resSync, nextBatch); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -243,25 +244,17 @@ func NewClient(homeserverURL, userID, accessToken string) (*Client, error) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
// By default, use an in-memory store which will never save filter ids / next batch tokens to disk.
|
||||||
|
// The client will work with this storer: it just won't remember across restarts.
|
||||||
|
// In practice, a database backend should be used.
|
||||||
|
store := NewInMemoryStore()
|
||||||
cli := Client{
|
cli := Client{
|
||||||
AccessToken: accessToken,
|
AccessToken: accessToken,
|
||||||
HomeserverURL: hsURL,
|
HomeserverURL: hsURL,
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
Prefix: "/_matrix/client/r0",
|
Prefix: "/_matrix/client/r0",
|
||||||
Syncer: NewDefaultSyncer(
|
Syncer: NewDefaultSyncer(userID, store),
|
||||||
userID,
|
Store: store,
|
||||||
// By default, use an in-memory next_batch storer which will never save tokens to disk.
|
|
||||||
// The client will work with this storer: it just won't
|
|
||||||
// remember the token across restarts. In practice, a database backend should be used.
|
|
||||||
&InMemoryNextBatchStore{make(map[string]string)},
|
|
||||||
// By default, use an in-memory filter storer which will never save the filter ID to disk.
|
|
||||||
// The client will work with this storer: it just won't remember the filter
|
|
||||||
// ID across restarts and hence request a new one. In practice, a database backend should be used.
|
|
||||||
&InMemoryFilterStore{
|
|
||||||
Filter: json.RawMessage(`{"room":{"timeline":{"limit":50}}}`),
|
|
||||||
UserToFilter: make(map[string]string),
|
|
||||||
},
|
|
||||||
),
|
|
||||||
}
|
}
|
||||||
// By default, use the default HTTP client.
|
// By default, use the default HTTP client.
|
||||||
cli.Client = http.DefaultClient
|
cli.Client = http.DefaultClient
|
||||||
|
|
|
@ -0,0 +1,51 @@
|
||||||
|
package gomatrix
|
||||||
|
|
||||||
|
// Storer is an interface which must be satisfied to store client data.
|
||||||
|
//
|
||||||
|
// You can either write a struct which persists this data to disk, or you can use the
|
||||||
|
// provided "InMemoryStore" which just keeps data around in-memory which is lost on
|
||||||
|
// restarts.
|
||||||
|
type Storer interface {
|
||||||
|
SaveFilterID(userID, filterID string)
|
||||||
|
LoadFilterID(userID string) string
|
||||||
|
SaveNextBatch(userID, nextBatchToken string)
|
||||||
|
LoadNextBatch(userID string) string
|
||||||
|
}
|
||||||
|
|
||||||
|
// InMemoryStore implements the Storer interface.
|
||||||
|
//
|
||||||
|
// Everything is persisted in-memory as maps. It is not safe to load/save filter IDs
|
||||||
|
// or next batch tokens on any goroutine other than the syncing goroutine: the one
|
||||||
|
// which called Client.Sync().
|
||||||
|
type InMemoryStore struct {
|
||||||
|
Filters map[string]string
|
||||||
|
NextBatch map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
// SaveFilterID to memory.
|
||||||
|
func (s *InMemoryStore) SaveFilterID(userID, filterID string) {
|
||||||
|
s.Filters[userID] = filterID
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadFilterID from memory.
|
||||||
|
func (s *InMemoryStore) LoadFilterID(userID string) string {
|
||||||
|
return s.Filters[userID]
|
||||||
|
}
|
||||||
|
|
||||||
|
// SaveNextBatch to memory.
|
||||||
|
func (s *InMemoryStore) SaveNextBatch(userID, nextBatchToken string) {
|
||||||
|
s.NextBatch[userID] = nextBatchToken
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadNextBatch from memory.
|
||||||
|
func (s *InMemoryStore) LoadNextBatch(userID string) string {
|
||||||
|
return s.NextBatch[userID]
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewInMemoryStore constructs a new InMemoryStore.
|
||||||
|
func NewInMemoryStore() *InMemoryStore {
|
||||||
|
return &InMemoryStore{
|
||||||
|
Filters: make(map[string]string),
|
||||||
|
NextBatch: make(map[string]string),
|
||||||
|
}
|
||||||
|
}
|
91
sync.go
91
sync.go
|
@ -13,35 +13,31 @@ type Syncer interface {
|
||||||
// This is useful for detecting the very first sync (since=""). If an error is return, Syncing will be stopped
|
// This is useful for detecting the very first sync (since=""). If an error is return, Syncing will be stopped
|
||||||
// permanently.
|
// permanently.
|
||||||
ProcessResponse(resp *RespSync, since string) error
|
ProcessResponse(resp *RespSync, since string) error
|
||||||
// Interface for saving and loading the "next_batch" sync token.
|
|
||||||
NextBatchStorer() NextBatchStorer
|
|
||||||
// Interface for saving and loading the filter ID for sync.
|
|
||||||
FilterStorer() FilterStorer
|
|
||||||
// OnFailedSync returns either the time to wait before retrying or an error to stop syncing permanently.
|
// OnFailedSync returns either the time to wait before retrying or an error to stop syncing permanently.
|
||||||
OnFailedSync(res *RespSync, err error) (time.Duration, error)
|
OnFailedSync(res *RespSync, err error) (time.Duration, error)
|
||||||
|
// GetFilterJSON for the given user ID. NOT the filter ID.
|
||||||
|
GetFilterJSON(userID string) json.RawMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultSyncer is the default syncing implementation. You can either write your own syncer, or selectively
|
// DefaultSyncer is the default syncing implementation. You can either write your own syncer, or selectively
|
||||||
// replace parts of this default syncer (e.g. the NextBatch/Filter storers, or the ProcessResponse method).
|
// replace parts of this default syncer (e.g. the NextBatch/Filter storers, or the ProcessResponse method).
|
||||||
type DefaultSyncer struct {
|
type DefaultSyncer struct {
|
||||||
UserID string
|
UserID string
|
||||||
Rooms map[string]*Room
|
Rooms map[string]*Room
|
||||||
NextBatchStore NextBatchStorer
|
Store Storer
|
||||||
FilterStore FilterStorer
|
listeners map[string][]OnEventListener // event type to listeners array
|
||||||
listeners map[string][]OnEventListener // event type to listeners array
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnEventListener can be used with DefaultSyncer.OnEventType to be informed of incoming events.
|
// OnEventListener can be used with DefaultSyncer.OnEventType to be informed of incoming events.
|
||||||
type OnEventListener func(*Event)
|
type OnEventListener func(*Event)
|
||||||
|
|
||||||
// NewDefaultSyncer returns an instantiated DefaultSyncer
|
// NewDefaultSyncer returns an instantiated DefaultSyncer
|
||||||
func NewDefaultSyncer(userID string, nextBatch NextBatchStorer, filterStore FilterStorer) *DefaultSyncer {
|
func NewDefaultSyncer(userID string, store Storer) *DefaultSyncer {
|
||||||
return &DefaultSyncer{
|
return &DefaultSyncer{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
Rooms: make(map[string]*Room),
|
Rooms: make(map[string]*Room),
|
||||||
NextBatchStore: nextBatch,
|
Store: store,
|
||||||
FilterStore: filterStore,
|
listeners: make(map[string][]OnEventListener),
|
||||||
listeners: make(map[string][]OnEventListener),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -148,71 +144,12 @@ func (s *DefaultSyncer) notifyListeners(event *Event) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NextBatchStorer returns the provided NextBatchStorer
|
|
||||||
func (s *DefaultSyncer) NextBatchStorer() NextBatchStorer {
|
|
||||||
return s.NextBatchStore
|
|
||||||
}
|
|
||||||
|
|
||||||
// FilterStorer returns the provided FilterStorer
|
|
||||||
func (s *DefaultSyncer) FilterStorer() FilterStorer {
|
|
||||||
return s.FilterStore
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnFailedSync always returns a 10 second wait period between failed /syncs.
|
// OnFailedSync always returns a 10 second wait period between failed /syncs.
|
||||||
func (s *DefaultSyncer) OnFailedSync(res *RespSync, err error) (time.Duration, error) {
|
func (s *DefaultSyncer) OnFailedSync(res *RespSync, err error) (time.Duration, error) {
|
||||||
return 10 * time.Second, nil
|
return 10 * time.Second, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NextBatchStorer controls loading/saving of next_batch tokens for users
|
// GetFilterJSON returns a filter with a timeline limit of 50.
|
||||||
type NextBatchStorer interface {
|
func (s *DefaultSyncer) GetFilterJSON(userID string) json.RawMessage {
|
||||||
// SaveNextBatch saves a next_batch token for a given user. Best effort.
|
return json.RawMessage(`{"room":{"timeline":{"limit":50}}}`)
|
||||||
SaveNextBatch(userID, nextBatch string)
|
|
||||||
// LoadNextBatch loads a next_batch token for a given user. Return an empty string if no token exists.
|
|
||||||
LoadNextBatch(userID string) string
|
|
||||||
}
|
|
||||||
|
|
||||||
// InMemoryNextBatchStore stores next batch tokens in memory.
|
|
||||||
type InMemoryNextBatchStore struct {
|
|
||||||
UserToNextBatch map[string]string
|
|
||||||
}
|
|
||||||
|
|
||||||
// SaveNextBatch saves the mapping in-memory.
|
|
||||||
func (s *InMemoryNextBatchStore) SaveNextBatch(userID, nextBatch string) {
|
|
||||||
s.UserToNextBatch[userID] = nextBatch
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadNextBatch loads an existing mapping. Returns an empty string if not found
|
|
||||||
func (s *InMemoryNextBatchStore) LoadNextBatch(userID string) string {
|
|
||||||
return s.UserToNextBatch[userID]
|
|
||||||
}
|
|
||||||
|
|
||||||
// FilterStorer controls loading/saving of filter IDs for users
|
|
||||||
type FilterStorer interface {
|
|
||||||
// SaveFilter saves a filter ID for a given user. Best effort.
|
|
||||||
SaveFilter(userID, filterID string)
|
|
||||||
// LoadFilter loads a filter ID for a given user. Return an empty string if no token exists.
|
|
||||||
LoadFilter(userID string) string
|
|
||||||
// GetFilterJSON for the given user ID.
|
|
||||||
GetFilterJSON(userID string) json.RawMessage
|
|
||||||
}
|
|
||||||
|
|
||||||
// InMemoryFilterStore stores filter IDs in memory. It always returns the filter JSON given in the struct.
|
|
||||||
type InMemoryFilterStore struct {
|
|
||||||
Filter json.RawMessage
|
|
||||||
UserToFilter map[string]string
|
|
||||||
}
|
|
||||||
|
|
||||||
// SaveFilter saves the user->filter ID mapping in memory
|
|
||||||
func (s *InMemoryFilterStore) SaveFilter(userID, filterID string) {
|
|
||||||
s.UserToFilter[userID] = filterID
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadFilter loads a previously saved user->filter ID mapping from memory. Returns the empty string if not found.
|
|
||||||
func (s *InMemoryFilterStore) LoadFilter(userID string) string {
|
|
||||||
return s.UserToFilter[userID]
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetFilterJSON returns InMemoryFilterStore.Filter
|
|
||||||
func (s *InMemoryFilterStore) GetFilterJSON(userID string) json.RawMessage {
|
|
||||||
return s.Filter
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue