Merge pull request #43 from JalfResi/database-handlers

Database and handler types
This commit is contained in:
Jim Teeuwen 2014-10-04 15:24:39 +02:00
commit c753ba0962
4 changed files with 226 additions and 31 deletions

57
databaseHandler.go Normal file
View File

@ -0,0 +1,57 @@
package feeder
type databaseHandler struct {
db *database
itemhandler ItemHandler
chanhandler ChannelHandler
}
func (d *databaseHandler) ProcessItems(f *Feed, ch *Channel, items []*Item) {
var newitems []*Item
for _, item := range items {
if d.db.request <- item.Key(); !<-d.db.response {
newitems = append(newitems, item)
}
}
if len(newitems) > 0 && d.itemhandler != nil {
d.itemhandler.ProcessItems(f, ch, newitems)
}
// No items to process, may as well end here
}
func (d *databaseHandler) ProcessChannels(f *Feed, ch []*Channel) {
var newchannels []*Channel
for _, channel := range ch {
if d.db.request <- channel.Key(); !<-d.db.response {
newchannels = append(newchannels, channel)
}
}
if len(newchannels) > 0 && d.chanhandler != nil {
d.chanhandler.ProcessChannels(f, newchannels)
}
// No channels to process, may as well end here
}
func NewDatabaseHandler(handler Handler) Handler {
database := new(databaseHandler)
database.db = NewDatabase()
database.itemhandler = handler
database.chanhandler = handler
return database
}
func NewDatabaseItemHandler(itemhandler ItemHandler) ItemHandler {
database := new(databaseHandler)
database.db = NewDatabase()
database.itemhandler = itemhandler
return database
}
func NewDatabaseChannelHandler(chanhandler ChannelHandler) ChannelHandler {
database := new(databaseHandler)
database.db = NewDatabase()
database.chanhandler = chanhandler
return database
}

93
feed.go
View File

@ -43,8 +43,50 @@ func (err *UnsupportedFeedError) Error() string {
return fmt.Sprintf("Unsupported feed: %s, version: %+v", err.Type, err.Version)
}
type ChannelHandler func(f *Feed, newchannels []*Channel)
type ItemHandler func(f *Feed, ch *Channel, newitems []*Item)
type ChannelHandlerFunc func(f *Feed, newchannels []*Channel)
func (h ChannelHandlerFunc) ProcessChannels(f *Feed, newchannels []*Channel) {
h(f, newchannels)
}
type ItemHandlerFunc func(f *Feed, ch *Channel, newitems []*Item)
func (h ItemHandlerFunc) ProcessItems(f *Feed, ch *Channel, newitems []*Item) {
h(f, ch, newitems)
}
type Handler interface {
ChannelHandler
ItemHandler
}
type ChannelHandler interface {
ProcessChannels(f *Feed, newchannels []*Channel)
}
type ItemHandler interface {
ProcessItems(f *Feed, ch *Channel, newitems []*Item)
}
type HandlerBonder struct {
itemhandler ItemHandler
chanhandler ChannelHandler
}
func (hb *HandlerBonder) ProcessChannels(f *Feed, newchannels []*Channel) {
hb.chanhandler.ProcessChannels(f, newchannels)
}
func (hb *HandlerBonder) ProcessItems(f *Feed, ch *Channel, newitems []*Item) {
hb.itemhandler.ProcessItems(f, ch, newitems)
}
func NewHandlerBonder(chanhandler ChannelHandler, itemhandler ItemHandler) Handler {
return &HandlerBonder{
itemhandler: itemhandler,
chanhandler: chanhandler,
}
}
type Feed struct {
// Custom cache timeout in minutes.
@ -66,30 +108,30 @@ type Feed struct {
// Url from which this feed was created.
Url string
// Database containing a list of known Items and Channels for this instance
database *database
// A notification function, used to notify the host when a new channel
// has been found.
chanhandler ChannelHandler
// A notification function, used to notify the host when a new item
// has been found for a given channel.
itemhandler ItemHandler
// The channel and item handler
handler Handler
// Last time content was fetched. Used in conjunction with CacheTimeout
// to ensure we don't get content too often.
lastupdate int64
}
func New(cachetimeout int, enforcecachelimit bool, ch ChannelHandler, ih ItemHandler) *Feed {
// New is a helper function to stay semi-compatible with
// the old code. Includes the databse handler to ensure
// that this approach is functionally identical to the
// old databse/handlers version
func New(cachetimeout int, enforcecachelimit bool, ch ChannelHandlerFunc, ih ItemHandlerFunc) *Feed {
return NewWithHandler(cachetimeout, enforcecachelimit, NewDatabaseHandler(NewHandlerBonder(ch, ih)))
}
// NewWithHandler creates a new feed with a handler
// People should use this appraoch from now on
func NewWithHandler(cachetimeout int, enforcecachelimit bool, h Handler) *Feed {
v := new(Feed)
v.CacheTimeout = cachetimeout
v.EnforceCacheLimit = enforcecachelimit
v.Type = "none"
v.database = NewDatabase()
v.chanhandler = ch
v.itemhandler = ih
v.handler = h
return v
}
@ -176,25 +218,14 @@ func (this *Feed) makeFeed(doc *xmlx.Document) (err error) {
}
func (this *Feed) notifyListeners() {
var newchannels []*Channel
for _, channel := range this.Channels {
if this.database.request <- channel.Key(); !<-this.database.response {
newchannels = append(newchannels, channel)
}
var newitems []*Item
for _, item := range channel.Items {
if this.database.request <- item.Key(); !<-this.database.response {
newitems = append(newitems, item)
}
}
if len(newitems) > 0 && this.itemhandler != nil {
this.itemhandler(this, channel, newitems)
if len(channel.Items) > 0 && this.handler != nil {
this.handler.ProcessItems(this, channel, channel.Items)
}
}
if len(newchannels) > 0 && this.chanhandler != nil {
this.chanhandler(this, newchannels)
if len(this.Channels) > 0 && this.handler != nil {
this.handler.ProcessChannels(this, this.Channels)
}
}

48
notes.md Normal file
View File

@ -0,0 +1,48 @@
## The problem with the current database solution
The purpose of the database is to ensure that the channel and item handlers are only called once for each new channel and each new item.It is clear that many users of go-pkg-rss are having problems with their channel and item handlers being called multiple times for the same items.
The current solution makes writing handlers very clean and safe as the user can be sure that their handlers are only executed once for each new item/channel.
### Use cases ###
The use cases where database shines is with regards to long running go routines what watch a specific feed url and over the lifetime of the program periodically check that feed url for updates. In that situation, having a database prevents the duplication of items as there is a high likelyhood that the refetch of the feed url will contain items already processed by the item handler.
The benefits of this include:
1) Batteries included: If the user is creating a program that processes a set of feed urls that it repeatedly polls on an existing feed then the built in database provides a "batteries included" solution to prevent calling the users channel or item handlers unnecessarily, greatly simplifying development of those handlers.
2) Zero maintenance: The database is maintence free. There is nothing the developer needs to do to take advantage of it. At the same time the developer doesnt need to query it at any time nor do they need to purge items from it. They need not even know it exists.
### Problems
The problem with the current solution is that it doesnt scale. Time for an example. I'm writing a program that will pull feed urls from a queue system to be processed. In order to execute several feed fetches at once it may run several hundred go routines, each fetching a feed url from the queue, processing it via go-pkg-rss and the putting the item urls into another queue for processing by another program. The feed url job is then released to the queue which will then delay the feed url from being processed for a set amount of time (usually the lastupdate/cacheTimeout). As there are several thousand feed urls to get through, I will be running my program on several servers, each fetching feed urls from the queue via its several hundred go routines.
In order to prevent duplication of effort, as across several thousand feed urls there is a very high likelyhood that items will be duplicated across feeds, I record a hash for each item in memcached. This provides a very quick and lightweight way of determining if I have already fetched that article before, and therefore do not need to fetch it again. This has the added benefit that the cache can be shared across several servers all collectiong feed urls and article urls as a centralised "database" (although I am also leaning on the caching features of memcache to store the entry for a limited time, allowing me to fetch an article again in the future, in case of updates to the article).
In addition to this, I also check and catch errors raised by network issues such as timeouts, unparsable urls, http error codes and unparsable documents. For these I also store a hash in memcache for each feed url, however this is an incrementing value, allowing me to keep track or the number of retry attempts made to fetch that feed url. After a certain threshold is met, and the feed url is still failing, I mark the job as bad in the queue system, which prevents me from constantly refetching a bad feed url.
The current database solution contributes the following issues:
1) The database is too simple: In the above example I need to track (and prevent) the number of fetches I make for article items. I also need to allow a number of retry attempts before preventing refetches.The current database is not sophisticated enough to handle these two different cases at once.
2) The database does expire or clean up entries: I expect my program to run for a very long time, processing many thousand feeds and even more article urls. The current implementation of the database is simple in that it continues to grow indefinately, consuming lots of memory.
3) The database replaces a job that is trivial to implement in a handler: The current database doesnt provide anything that couldnt be developed by a user of the package with ease.
4) The database doesnt provide a fine-grained enough key for duplicates: The current version uses a multitude of options for item keys and channels, all of which could very easily be falsly marked as duplicates. For example, the item title is not a very unique key, expecially when each item and channel each have unique key in the form of the url.
### Proposed solution
Looking across the stdlib provided with Go we can see and example where similar concerns have been met with elegance. The net/http package uses handlers, much like go-pkg-rss does, to off load implementation complexity to outside of the core http package. It also provides batteris included solutions to common problems that developers may have with built in handlers such as a FileServer, a NotFoundHandler, RedirectHandler, StripPrefix and TimeoutHandler. I propose that the current database implementation be stripped from the package and moved to a set of built in handlers.
The developer will then be provided with two powerful options:
1) Use the built in database handlers as part of a chain along with their own existing handlers to get the same functionality as currently provided by the existing database implementation.
2) Roll their own custom handlers which may be inserted into the handler chain. They even have the option of using the provided database handlers if they want.
This opens up exciting possibilities for developers and the future of the go-pkg-rss package. We could add handlers for caching via memcache, a retry count handler for channels, a whitelist/blacklist handler based on item title, a filter handler that strips out items that have a date older than 3 hours, etc.
Whats nice about this approach is that we can recreate a functionally identical approach that means this is backwards compatible with older code that implements the old way of dealing with handlers and duplicate channels and items.

59
testdata/handlers/handlerexample.go vendored Normal file
View File

@ -0,0 +1,59 @@
package main
/*
This is a minimal sample application, demonstrating how to set up an RSS feed
for regular polling of new channels/items.
Build & run with:
$ 6g example.go && 6l example.6 && ./6.out
*/
import (
"fmt"
rss "github.com/JalfResi/go-pkg-rss"
"os"
"time"
)
func main() {
// This sets up a new feed and polls it for new channels/items.
// Invoke it with 'go PollFeed(...)' to have the polling performed in a
// separate goroutine, so you can continue with the rest of your program.
PollFeed("http://blog.case.edu/news/feed.atom", 5)
}
func PollFeed(uri string, timeout int) {
feed := rss.NewWithHandler(timeout, true, rss.NewDatabaseHandler(NewMyHandler()))
for {
if err := feed.Fetch(uri, nil); err != nil {
fmt.Fprintf(os.Stderr, "[e] %s: %s", uri, err)
return
}
<-time.After(time.Duration(10 * time.Second))
}
}
/*
func itemHandler(feed *rss.Feed, ch *rss.Channel, newitems []*rss.Item) {
fmt.Printf("%d new item(s) in %s\n", len(newitems), feed.Url)
}
*/
type MyHandler struct{}
func NewMyHandler() rss.Handler {
return &MyHandler{}
}
func (m *MyHandler) ProcessChannels(feed *rss.Feed, newchannels []*rss.Channel) {
fmt.Printf("%d new channel(s) in %s\n", len(newchannels), feed.Url)
}
func (m *MyHandler) ProcessItems(feed *rss.Feed, ch *rss.Channel, newitems []*rss.Item) {
fmt.Printf("%d new rad item(s) in %s\n", len(newitems), feed.Url)
}