Files
crawler/feed.go
primal 798f79bfe9 Auto-deny feeds that are not RSS or Atom type
Feeds with type other than 'rss' or 'atom' (e.g., 'unknown') are now
automatically denied on discovery. Also updated 164 existing feeds.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 13:13:22 -05:00

1071 lines
32 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"regexp"
"strings"
"sync/atomic"
"time"
"github.com/jackc/pgx/v5"
)
// classifyFeed determines the category of a feed based on URL patterns
// Returns: "main", "comments", "category", "author", "article", "podcast"
// Note: podcast detection is also done in parseRSSMetadata based on content
func classifyFeed(feedURL string) string {
lower := strings.ToLower(feedURL)
// Comment feeds
if strings.Contains(lower, "/comment") {
return "comments"
}
// Podcast URL patterns
podcastPatterns := []string{"/podcast", "/podcasts", "/episode", "/episodes", "/show/", "/shows/"}
for _, pattern := range podcastPatterns {
if strings.Contains(lower, pattern) {
return "podcast"
}
}
u, err := url.Parse(feedURL)
if err != nil {
return "main"
}
path := strings.ToLower(strings.TrimSuffix(u.Path, "/"))
// Author feeds
if strings.Contains(path, "/author/") {
return "author"
}
// Category/tag feeds
categoryPatterns := []string{"/category/", "/tag/", "/tags/", "/categories/", "/topic/", "/topics/"}
for _, pattern := range categoryPatterns {
if strings.Contains(path, pattern) {
return "category"
}
}
// Check for article feeds (path ending in /feed with content before it)
if strings.HasSuffix(path, "/feed") {
basePath := strings.TrimSuffix(path, "/feed")
basePath = strings.Trim(basePath, "/")
if basePath == "" {
return "main" // Just /feed - main feed
}
// Article if path contains date patterns
if matched, _ := regexp.MatchString(`/\d{4}/\d{2}`, basePath); matched {
return "article"
}
// Article if path has multiple segments (nested content)
segments := strings.Split(basePath, "/")
if len(segments) >= 2 {
return "article"
}
// Article if single segment looks like an article slug
if len(segments) == 1 && strings.Contains(segments[0], "-") && len(segments[0]) > 20 {
return "article"
}
}
return "main"
}
// classifyFeedByTitle refines category based on feed title (called after parsing)
func classifyFeedByTitle(title string, currentCategory string) string {
if currentCategory != "main" {
return currentCategory // Already classified by URL
}
lower := strings.ToLower(title)
if strings.HasPrefix(lower, "comments on:") || strings.HasPrefix(lower, "comments for:") {
return "comments"
}
return currentCategory
}
// Enclosure represents a media attachment (audio, video, image)
type Enclosure struct {
URL string `json:"url"`
Type string `json:"type"` // MIME type (audio/mpeg, image/jpeg, etc.)
Length int64 `json:"length"` // Size in bytes
}
// Item represents an individual entry/article from a feed
type Item struct {
ID int64 `json:"id,omitempty"`
FeedURL string `json:"feed_url"`
GUID string `json:"guid,omitempty"`
Title string `json:"title,omitempty"`
Link string `json:"link,omitempty"`
Description string `json:"description,omitempty"`
Content string `json:"content,omitempty"`
Author string `json:"author,omitempty"`
PubDate time.Time `json:"pub_date,omitempty"`
DiscoveredAt time.Time `json:"discovered_at"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
// Media attachments
Enclosure *Enclosure `json:"enclosure,omitempty"` // Primary enclosure (podcast audio, etc.)
ImageURLs []string `json:"image_urls,omitempty"` // Image URLs extracted from content
// Publishing to PDS
PublishedAt time.Time `json:"published_at,omitempty"`
PublishedUri string `json:"published_uri,omitempty"`
}
// Feed represents a discovered RSS/Atom feed with metadata
type Feed struct {
URL string `json:"url"`
Type string `json:"type"` // "rss", "atom", or "unknown"
Category string `json:"category"` // "main", "comments", "category", "author", "article", "podcast"
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
Language string `json:"language,omitempty"`
SiteURL string `json:"site_url,omitempty"` // The website the feed belongs to
// Timing
DiscoveredAt time.Time `json:"discovered_at"`
LastCrawledAt time.Time `json:"last_crawled_at,omitempty"`
NextCrawlAt time.Time `json:"next_crawl_at,omitempty"`
LastBuildDate time.Time `json:"last_build_date,omitempty"` // From feed's lastBuildDate/updated
// Cache headers for conditional requests
ETag string `json:"etag,omitempty"`
LastModified string `json:"last_modified,omitempty"`
// Feed hints for crawl scheduling
TTLMinutes int `json:"ttl_minutes,omitempty"` // From RSS <ttl> element
UpdatePeriod string `json:"update_period,omitempty"` // From sy:updatePeriod (hourly, daily, weekly, monthly, yearly)
UpdateFreq int `json:"update_freq,omitempty"` // From sy:updateFrequency
// Health tracking
Status string `json:"status"` // "active", "dead", "redirect", "error"
ErrorCount int `json:"error_count"`
LastError string `json:"last_error,omitempty"`
LastErrorAt time.Time `json:"last_error_at,omitempty"`
// Discovery source
SourceURL string `json:"source_url,omitempty"`
SourceHost string `json:"source_host,omitempty"`
TLD string `json:"tld,omitempty"`
// Content stats
ItemCount int `json:"item_count,omitempty"` // Number of items in last crawl
AvgPostFreqHrs float64 `json:"avg_post_freq_hrs,omitempty"` // Average hours between posts
OldestItemDate time.Time `json:"oldest_item_date,omitempty"`
NewestItemDate time.Time `json:"newest_item_date,omitempty"`
// Adaptive check interval
NoUpdate int `json:"no_update"` // Consecutive checks with no change
// Publishing to PDS
PublishStatus string `json:"publish_status"` // "held", "pass", "deny"
PublishAccount string `json:"publish_account,omitempty"` // e.g., "news.ycombinator.com.1440.news"
}
// saveFeed stores a feed in PostgreSQL
func (c *Crawler) saveFeed(feed *Feed) error {
// Default publishStatus to "held" if not set
// Auto-deny feeds with no language or non-RSS/Atom type
publishStatus := feed.PublishStatus
if publishStatus == "" {
if feed.Language == "" {
publishStatus = "deny"
} else if feed.Type != "rss" && feed.Type != "atom" {
publishStatus = "deny"
} else {
publishStatus = "held"
}
}
_, err := c.db.Exec(`
INSERT INTO feeds (
url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30)
ON CONFLICT(url) DO UPDATE SET
type = EXCLUDED.type,
category = EXCLUDED.category,
title = EXCLUDED.title,
description = EXCLUDED.description,
language = EXCLUDED.language,
site_url = EXCLUDED.site_url,
last_crawled_at = EXCLUDED.last_crawled_at,
next_crawl_at = EXCLUDED.next_crawl_at,
last_build_date = EXCLUDED.last_build_date,
etag = EXCLUDED.etag,
last_modified = EXCLUDED.last_modified,
ttl_minutes = EXCLUDED.ttl_minutes,
update_period = EXCLUDED.update_period,
update_freq = EXCLUDED.update_freq,
status = EXCLUDED.status,
error_count = EXCLUDED.error_count,
last_error = EXCLUDED.last_error,
last_error_at = EXCLUDED.last_error_at,
item_count = EXCLUDED.item_count,
avg_post_freq_hrs = EXCLUDED.avg_post_freq_hrs,
oldest_item_date = EXCLUDED.oldest_item_date,
newest_item_date = EXCLUDED.newest_item_date,
no_update = EXCLUDED.no_update,
publish_status = EXCLUDED.publish_status,
publish_account = EXCLUDED.publish_account
`,
feed.URL, feed.Type, feed.Category, NullableString(feed.Title), NullableString(feed.Description),
NullableString(feed.Language), NullableString(feed.SiteURL),
feed.DiscoveredAt, NullableTime(feed.LastCrawledAt), NullableTime(feed.NextCrawlAt), NullableTime(feed.LastBuildDate),
NullableString(feed.ETag), NullableString(feed.LastModified),
feed.TTLMinutes, NullableString(feed.UpdatePeriod), feed.UpdateFreq,
feed.Status, feed.ErrorCount, NullableString(feed.LastError), NullableTime(feed.LastErrorAt),
NullableString(feed.SourceURL), NullableString(feed.SourceHost), NullableString(feed.TLD),
feed.ItemCount, feed.AvgPostFreqHrs, NullableTime(feed.OldestItemDate), NullableTime(feed.NewestItemDate),
feed.NoUpdate,
publishStatus, NullableString(feed.PublishAccount),
)
return err
}
// getFeed retrieves a feed from PostgreSQL
func (c *Crawler) getFeed(feedURL string) (*Feed, error) {
feed := &Feed{}
var category, title, description, language, siteURL *string
var lastCrawledAt, nextCrawlAt, lastBuildDate, lastErrorAt, oldestItemDate, newestItemDate *time.Time
var etag, lastModified, updatePeriod, lastError, sourceURL, sourceHost, tld *string
var avgPostFreqHrs *float64
var publishStatus, publishAccount *string
err := c.db.QueryRow(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds WHERE url = $1
`, normalizeURL(feedURL)).Scan(
&feed.URL, &feed.Type, &category, &title, &description, &language, &siteURL,
&feed.DiscoveredAt, &lastCrawledAt, &nextCrawlAt, &lastBuildDate,
&etag, &lastModified,
&feed.TTLMinutes, &updatePeriod, &feed.UpdateFreq,
&feed.Status, &feed.ErrorCount, &lastError, &lastErrorAt,
&sourceURL, &sourceHost, &tld,
&feed.ItemCount, &avgPostFreqHrs, &oldestItemDate, &newestItemDate,
&feed.NoUpdate,
&publishStatus, &publishAccount,
)
if err == pgx.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
// Handle nullable fields
if category != nil {
feed.Category = *category
} else {
feed.Category = "main"
}
feed.Title = StringValue(title)
feed.Description = StringValue(description)
feed.Language = StringValue(language)
feed.SiteURL = StringValue(siteURL)
feed.LastCrawledAt = TimeValue(lastCrawledAt)
feed.NextCrawlAt = TimeValue(nextCrawlAt)
feed.LastBuildDate = TimeValue(lastBuildDate)
feed.ETag = StringValue(etag)
feed.LastModified = StringValue(lastModified)
feed.UpdatePeriod = StringValue(updatePeriod)
feed.LastError = StringValue(lastError)
feed.LastErrorAt = TimeValue(lastErrorAt)
feed.SourceURL = StringValue(sourceURL)
feed.SourceHost = StringValue(sourceHost)
feed.TLD = StringValue(tld)
if avgPostFreqHrs != nil {
feed.AvgPostFreqHrs = *avgPostFreqHrs
}
feed.OldestItemDate = TimeValue(oldestItemDate)
feed.NewestItemDate = TimeValue(newestItemDate)
if publishStatus != nil {
feed.PublishStatus = *publishStatus
} else {
feed.PublishStatus = "held"
}
feed.PublishAccount = StringValue(publishAccount)
return feed, nil
}
// feedExists checks if a feed URL already exists in the database
func (c *Crawler) feedExists(feedURL string) bool {
var exists bool
err := c.db.QueryRow("SELECT EXISTS(SELECT 1 FROM feeds WHERE url = $1)", normalizeURL(feedURL)).Scan(&exists)
return err == nil && exists
}
// GetAllFeeds returns all feeds from the database
func (c *Crawler) GetAllFeeds() ([]*Feed, error) {
rows, err := c.db.Query(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds
`)
if err != nil {
return nil, err
}
defer rows.Close()
return scanFeeds(rows)
}
// GetFeedCount returns the total number of feeds in the database
func (c *Crawler) GetFeedCount() (int, error) {
var count int
err := c.db.QueryRow("SELECT COUNT(*) FROM feeds").Scan(&count)
return count, err
}
// GetFeedCountByHost returns the number of feeds for a specific host
func (c *Crawler) GetFeedCountByHost(host string) (int, error) {
var count int
err := c.db.QueryRow("SELECT COUNT(*) FROM feeds WHERE source_host = $1", host).Scan(&count)
return count, err
}
// GetFeedsDueForCheck returns feeds where next_crawl_at <= now, ordered randomly, limited to n
func (c *Crawler) GetFeedsDueForCheck(limit int) ([]*Feed, error) {
rows, err := c.db.Query(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds
WHERE next_crawl_at <= NOW() AND status != 'dead'
ORDER BY RANDOM()
LIMIT $1
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanFeeds(rows)
}
// GetFeedsByHost returns all feeds from a specific host
func (c *Crawler) GetFeedsByHost(host string) ([]*Feed, error) {
rows, err := c.db.Query(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds WHERE source_host = $1
`, host)
if err != nil {
return nil, err
}
defer rows.Close()
return scanFeeds(rows)
}
// SearchFeeds performs a full-text search on feeds
func (c *Crawler) SearchFeeds(query string) ([]*Feed, error) {
tsquery := ToSearchQuery(query)
rows, err := c.db.Query(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds
WHERE search_vector @@ to_tsquery('english', $1)
ORDER BY ts_rank(search_vector, to_tsquery('english', $1)) DESC
`, tsquery)
if err != nil {
return nil, err
}
defer rows.Close()
return scanFeeds(rows)
}
// scanFeeds is a helper to scan multiple feed rows
func scanFeeds(rows pgx.Rows) ([]*Feed, error) {
var feeds []*Feed
for rows.Next() {
feed := &Feed{}
var feedType, category, title, description, language, siteURL *string
var lastCrawledAt, nextCrawlAt, lastBuildDate, lastErrorAt, oldestItemDate, newestItemDate *time.Time
var etag, lastModified, updatePeriod, lastError, sourceURL, sourceHost, tld *string
var ttlMinutes, updateFreq, errorCount, itemCount, noUpdate *int
var avgPostFreqHrs *float64
var status *string
var publishStatus, publishAccount *string
if err := rows.Scan(
&feed.URL, &feedType, &category, &title, &description, &language, &siteURL,
&feed.DiscoveredAt, &lastCrawledAt, &nextCrawlAt, &lastBuildDate,
&etag, &lastModified,
&ttlMinutes, &updatePeriod, &updateFreq,
&status, &errorCount, &lastError, &lastErrorAt,
&sourceURL, &sourceHost, &tld,
&itemCount, &avgPostFreqHrs, &oldestItemDate, &newestItemDate,
&noUpdate,
&publishStatus, &publishAccount,
); err != nil {
return nil, err
}
// Handle nullable fields
feed.Type = StringValue(feedType)
if category != nil && *category != "" {
feed.Category = *category
} else {
feed.Category = "main"
}
feed.Title = StringValue(title)
feed.Description = StringValue(description)
feed.Language = StringValue(language)
feed.SiteURL = StringValue(siteURL)
feed.LastCrawledAt = TimeValue(lastCrawledAt)
feed.NextCrawlAt = TimeValue(nextCrawlAt)
feed.LastBuildDate = TimeValue(lastBuildDate)
feed.ETag = StringValue(etag)
feed.LastModified = StringValue(lastModified)
if ttlMinutes != nil {
feed.TTLMinutes = *ttlMinutes
}
feed.UpdatePeriod = StringValue(updatePeriod)
if updateFreq != nil {
feed.UpdateFreq = *updateFreq
}
feed.Status = StringValue(status)
if errorCount != nil {
feed.ErrorCount = *errorCount
}
feed.LastError = StringValue(lastError)
feed.LastErrorAt = TimeValue(lastErrorAt)
feed.SourceURL = StringValue(sourceURL)
feed.SourceHost = StringValue(sourceHost)
feed.TLD = StringValue(tld)
if itemCount != nil {
feed.ItemCount = *itemCount
}
if avgPostFreqHrs != nil {
feed.AvgPostFreqHrs = *avgPostFreqHrs
}
feed.OldestItemDate = TimeValue(oldestItemDate)
feed.NewestItemDate = TimeValue(newestItemDate)
if noUpdate != nil {
feed.NoUpdate = *noUpdate
}
if publishStatus != nil {
feed.PublishStatus = *publishStatus
} else {
feed.PublishStatus = "held"
}
feed.PublishAccount = StringValue(publishAccount)
feeds = append(feeds, feed)
}
return feeds, rows.Err()
}
// saveItem stores an item in PostgreSQL (upsert by feed_url + guid)
func (c *Crawler) saveItem(item *Item) error {
// Serialize enclosure fields
var enclosureUrl, enclosureType *string
var enclosureLength *int64
if item.Enclosure != nil {
enclosureUrl = NullableString(item.Enclosure.URL)
enclosureType = NullableString(item.Enclosure.Type)
if item.Enclosure.Length > 0 {
enclosureLength = &item.Enclosure.Length
}
}
// Serialize imageUrls as JSON
var imageUrlsJSON *string
if len(item.ImageURLs) > 0 {
if data, err := json.Marshal(item.ImageURLs); err == nil {
s := string(data)
imageUrlsJSON = &s
}
}
_, err := c.db.Exec(`
INSERT INTO items (feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT(feed_url, guid) DO UPDATE SET
title = EXCLUDED.title,
link = EXCLUDED.link,
description = EXCLUDED.description,
content = EXCLUDED.content,
author = EXCLUDED.author,
pub_date = EXCLUDED.pub_date,
updated_at = EXCLUDED.updated_at,
enclosure_url = EXCLUDED.enclosure_url,
enclosure_type = EXCLUDED.enclosure_type,
enclosure_length = EXCLUDED.enclosure_length,
image_urls = EXCLUDED.image_urls
`,
item.FeedURL, item.GUID, NullableString(item.Title), NullableString(item.Link),
NullableString(item.Description), NullableString(item.Content), NullableString(item.Author),
NullableTime(item.PubDate), item.DiscoveredAt, NullableTime(item.UpdatedAt),
enclosureUrl, enclosureType, enclosureLength, imageUrlsJSON,
)
return err
}
// saveItems stores multiple items efficiently
func (c *Crawler) saveItems(items []*Item) error {
if len(items) == 0 {
return nil
}
tx, err := c.db.Begin()
if err != nil {
return err
}
defer tx.Rollback(context.Background())
for _, item := range items {
if item == nil || item.GUID == "" {
continue // Skip nil items or items without GUID
}
// Serialize enclosure fields
var enclosureUrl, enclosureType *string
var enclosureLength *int64
if item.Enclosure != nil {
enclosureUrl = NullableString(item.Enclosure.URL)
enclosureType = NullableString(item.Enclosure.Type)
if item.Enclosure.Length > 0 {
enclosureLength = &item.Enclosure.Length
}
}
// Serialize imageUrls as JSON
var imageUrlsJSON *string
if len(item.ImageURLs) > 0 {
if data, err := json.Marshal(item.ImageURLs); err == nil {
s := string(data)
imageUrlsJSON = &s
}
}
_, err := tx.Exec(context.Background(), `
INSERT INTO items (feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT(feed_url, guid) DO UPDATE SET
title = EXCLUDED.title,
link = EXCLUDED.link,
description = EXCLUDED.description,
content = EXCLUDED.content,
author = EXCLUDED.author,
pub_date = EXCLUDED.pub_date,
updated_at = EXCLUDED.updated_at,
enclosure_url = EXCLUDED.enclosure_url,
enclosure_type = EXCLUDED.enclosure_type,
enclosure_length = EXCLUDED.enclosure_length,
image_urls = EXCLUDED.image_urls
`,
item.FeedURL, item.GUID, NullableString(item.Title), NullableString(item.Link),
NullableString(item.Description), NullableString(item.Content), NullableString(item.Author),
NullableTime(item.PubDate), item.DiscoveredAt, NullableTime(item.UpdatedAt),
enclosureUrl, enclosureType, enclosureLength, imageUrlsJSON,
)
if err != nil {
continue // Skip failed items
}
}
return tx.Commit(context.Background())
}
// GetItemsByFeed returns all items for a specific feed
func (c *Crawler) GetItemsByFeed(feedURL string, limit int) ([]*Item, error) {
rows, err := c.db.Query(`
SELECT id, feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls,
published_at, published_uri
FROM items
WHERE feed_url = $1
ORDER BY pub_date DESC
LIMIT $2
`, feedURL, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanItems(rows)
}
// SearchItems performs a full-text search on items
func (c *Crawler) SearchItems(query string, limit int) ([]*Item, error) {
tsquery := ToSearchQuery(query)
rows, err := c.db.Query(`
SELECT id, feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls,
published_at, published_uri
FROM items
WHERE search_vector @@ to_tsquery('english', $1)
ORDER BY ts_rank(search_vector, to_tsquery('english', $1)) DESC, pub_date DESC
LIMIT $2
`, tsquery, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanItems(rows)
}
// scanItems is a helper to scan multiple item rows
func scanItems(rows pgx.Rows) ([]*Item, error) {
var items []*Item
for rows.Next() {
item := &Item{}
var guid, title, link, description, content, author *string
var pubDate, updatedAt, publishedAt *time.Time
var enclosureUrl, enclosureType *string
var enclosureLength *int64
var imageUrlsJSON *string
var publishedUri *string
if err := rows.Scan(
&item.ID, &item.FeedURL, &guid, &title, &link,
&description, &content, &author, &pubDate,
&item.DiscoveredAt, &updatedAt,
&enclosureUrl, &enclosureType, &enclosureLength, &imageUrlsJSON,
&publishedAt, &publishedUri,
); err != nil {
continue
}
item.GUID = StringValue(guid)
item.Title = StringValue(title)
item.Link = StringValue(link)
item.Description = StringValue(description)
item.Content = StringValue(content)
item.Author = StringValue(author)
item.PubDate = TimeValue(pubDate)
item.UpdatedAt = TimeValue(updatedAt)
// Parse enclosure
if enclosureUrl != nil && *enclosureUrl != "" {
item.Enclosure = &Enclosure{
URL: *enclosureUrl,
Type: StringValue(enclosureType),
}
if enclosureLength != nil {
item.Enclosure.Length = *enclosureLength
}
}
// Parse imageUrls JSON
if imageUrlsJSON != nil && *imageUrlsJSON != "" {
var urls []string
if err := json.Unmarshal([]byte(*imageUrlsJSON), &urls); err == nil {
item.ImageURLs = urls
}
}
item.PublishedAt = TimeValue(publishedAt)
item.PublishedUri = StringValue(publishedUri)
items = append(items, item)
}
return items, rows.Err()
}
// CleanupOldItems removes items older than 12 months
func (c *Crawler) CleanupOldItems() (int64, error) {
cutoff := time.Now().AddDate(-1, 0, 0) // 12 months ago
result, err := c.db.Exec(`
DELETE FROM items WHERE pub_date < $1 AND pub_date IS NOT NULL
`, cutoff)
if err != nil {
return 0, err
}
return result, nil
}
// processFeed parses and stores a feed with full metadata
func (c *Crawler) processFeed(feedURL, sourceHost, body string, headers http.Header) {
// Fast path: check without lock
if c.feedExists(feedURL) {
return
}
c.feedsMu.Lock()
defer c.feedsMu.Unlock()
// Double-check after acquiring lock
if c.feedExists(feedURL) {
return
}
feedType := c.detectFeedType(body)
now := time.Now()
feed := &Feed{
URL: normalizeURL(feedURL),
Type: feedType,
Category: classifyFeed(feedURL),
DiscoveredAt: now,
LastCrawledAt: now,
Status: "active",
SourceHost: sourceHost,
TLD: getTLD(sourceHost),
ETag: headers.Get("ETag"),
LastModified: headers.Get("Last-Modified"),
}
// Parse feed-specific metadata and items
var items []*Item
switch feedType {
case "rss":
items = c.parseRSSMetadata(body, feed)
case "atom":
items = c.parseAtomMetadata(body, feed)
}
// Refine category based on parsed title (e.g., "Comments on:")
feed.Category = classifyFeedByTitle(feed.Title, feed.Category)
// Calculate next crawl time
feed.NextCrawlAt = c.calculateNextCrawl(feed)
if err := c.saveFeed(feed); err != nil {
return
}
// Save items
if len(items) > 0 {
c.saveItems(items)
}
}
// addFeed adds a discovered feed URL (not yet fetched)
func (c *Crawler) addFeed(feedURL, feedType, sourceHost, sourceURL string) {
// Fast path: check without lock
if c.feedExists(feedURL) {
return
}
c.feedsMu.Lock()
defer c.feedsMu.Unlock()
// Double-check after acquiring lock
if c.feedExists(feedURL) {
return
}
now := time.Now()
normalizedURL := normalizeURL(feedURL)
feed := &Feed{
URL: normalizedURL,
Type: feedType,
Category: classifyFeed(feedURL),
DiscoveredAt: now,
Status: "active",
SourceURL: normalizeURL(sourceURL),
SourceHost: sourceHost,
TLD: getTLD(sourceHost),
NextCrawlAt: now, // Should be crawled immediately
}
if err := c.saveFeed(feed); err != nil {
return
}
}
// CheckFeed performs a conditional request to check if a feed has been updated
// Returns: changed (bool), error
func (c *Crawler) CheckFeed(feed *Feed) (bool, error) {
atomic.AddInt32(&c.feedsChecked, 1)
// Try different scheme/www combinations since we store URLs without scheme
urlVariants := []string{
"https://" + feed.URL,
"http://" + feed.URL,
"https://www." + feed.URL,
"http://www." + feed.URL,
}
var resp *http.Response
var err error
var successURL string
for _, tryURL := range urlVariants {
req, reqErr := http.NewRequest("GET", tryURL, nil)
if reqErr != nil {
continue
}
req.Header.Set("User-Agent", c.UserAgent)
// Add conditional headers if we have them
if feed.ETag != "" {
req.Header.Set("If-None-Match", feed.ETag)
}
if feed.LastModified != "" {
req.Header.Set("If-Modified-Since", feed.LastModified)
}
resp, err = c.client.Do(req)
if err == nil {
successURL = tryURL
break
}
}
_ = successURL // May be used later for logging/debugging
// If no request succeeded, resp will be nil
if resp == nil {
if err == nil {
err = fmt.Errorf("all URL variants failed")
}
now := time.Now()
feed.LastCrawledAt = now
feed.ErrorCount++
feed.NoUpdate++
feed.NextCrawlAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
feed.LastError = err.Error()
feed.LastErrorAt = now
feed.Status = "error"
c.saveFeed(feed)
return false, err
}
defer resp.Body.Close()
now := time.Now()
feed.LastCrawledAt = now
// 304 Not Modified - feed hasn't changed
if resp.StatusCode == http.StatusNotModified {
feed.NoUpdate++
// Adaptive backoff: 100s base + 100s per consecutive no-change
feed.NextCrawlAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
feed.ErrorCount = 0
feed.LastError = ""
feed.Status = "active"
c.saveFeed(feed)
return false, nil
}
// Non-200 response
if resp.StatusCode != http.StatusOK {
feed.ErrorCount++
feed.NoUpdate++
feed.NextCrawlAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
feed.LastError = resp.Status
feed.LastErrorAt = now
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusGone {
feed.Status = "dead"
} else {
feed.Status = "error"
}
c.saveFeed(feed)
return false, nil
}
// 200 OK - feed has new content
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
feed.ErrorCount++
feed.NoUpdate++
feed.NextCrawlAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
feed.LastError = err.Error()
feed.LastErrorAt = now
feed.Status = "error"
c.saveFeed(feed)
return false, err
}
body := string(bodyBytes)
// Update cache headers
feed.ETag = resp.Header.Get("ETag")
feed.LastModified = resp.Header.Get("Last-Modified")
// Re-detect type and parse metadata
feedType := c.detectFeedType(body)
feed.Type = feedType
var items []*Item
switch feedType {
case "rss":
items = c.parseRSSMetadata(body, feed)
case "atom":
items = c.parseAtomMetadata(body, feed)
}
// Content changed - reset backoff
feed.NoUpdate = 0
feed.NextCrawlAt = now.Add(100 * time.Second)
feed.ErrorCount = 0
feed.LastError = ""
feed.Status = "active"
c.saveFeed(feed)
// Save items
if len(items) > 0 {
c.saveItems(items)
}
return true, nil
}
// SetPublishStatus sets the publish status for a feed ('held', 'pass', 'deny')
// If status is 'pass', the account handle is also set (auto-derived if empty)
func (c *Crawler) SetPublishStatus(feedURL, status, account string) error {
feedURL = normalizeURL(feedURL)
// Auto-derive account if passing and not provided
if status == "pass" && account == "" {
account = DeriveHandleFromFeed(feedURL)
}
_, err := c.db.Exec(`
UPDATE feeds SET publish_status = $1, publish_account = $2 WHERE url = $3
`, status, NullableString(account), feedURL)
return err
}
// GetFeedsByPublishStatus returns all feeds with a specific publish status
func (c *Crawler) GetFeedsByPublishStatus(status string) ([]*Feed, error) {
rows, err := c.db.Query(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds
WHERE publish_status = $1
`, status)
if err != nil {
return nil, err
}
defer rows.Close()
return scanFeeds(rows)
}
// GetPublishCandidates returns feeds that are held for review and have items
func (c *Crawler) GetPublishCandidates(limit int) ([]*Feed, error) {
rows, err := c.db.Query(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds
WHERE publish_status = 'held' AND item_count > 0 AND status = 'active'
ORDER BY item_count DESC
LIMIT $1
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanFeeds(rows)
}
// GetUnpublishedItems returns items for a feed that haven't been published yet
func (c *Crawler) GetUnpublishedItems(feedURL string, limit int) ([]*Item, error) {
rows, err := c.db.Query(`
SELECT id, feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls,
published_at, published_uri
FROM items
WHERE feed_url = $1 AND published_at IS NULL
ORDER BY pub_date ASC
LIMIT $2
`, feedURL, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanItems(rows)
}
// MarkItemPublished marks an item as published with the given URI
func (c *Crawler) MarkItemPublished(itemID int64, uri string) error {
_, err := c.db.Exec(`
UPDATE items SET published_at = NOW(), published_uri = $1 WHERE id = $2
`, uri, itemID)
return err
}
// GetUnpublishedItemCount returns the count of unpublished items for a feed
func (c *Crawler) GetUnpublishedItemCount(feedURL string) (int, error) {
var count int
err := c.db.QueryRow(`
SELECT COUNT(*) FROM items WHERE feed_url = $1 AND published_at IS NULL
`, feedURL).Scan(&count)
return count, err
}