Files
crawler/feed.go
primal 2386d551fc Auto-deny all-digit domains, whitelist 1440.news
- Deny domains where hostname is all digits (e.g., 0000114.com)
- Never auto-deny 1440.news or subdomains
- Auto-pass feeds from 1440.news sources
- Updated 554,085 domains and 3,213 feeds in database

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 13:27:48 -05:00

1094 lines
33 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"regexp"
"strings"
"sync/atomic"
"time"
"github.com/jackc/pgx/v5"
)
// classifyFeed determines the category of a feed based on URL patterns
// Returns: "main", "comments", "category", "author", "article", "podcast"
// Note: podcast detection is also done in parseRSSMetadata based on content
func classifyFeed(feedURL string) string {
lower := strings.ToLower(feedURL)
// Comment feeds
if strings.Contains(lower, "/comment") {
return "comments"
}
// Podcast URL patterns
podcastPatterns := []string{"/podcast", "/podcasts", "/episode", "/episodes", "/show/", "/shows/"}
for _, pattern := range podcastPatterns {
if strings.Contains(lower, pattern) {
return "podcast"
}
}
u, err := url.Parse(feedURL)
if err != nil {
return "main"
}
path := strings.ToLower(strings.TrimSuffix(u.Path, "/"))
// Author feeds
if strings.Contains(path, "/author/") {
return "author"
}
// Category/tag feeds
categoryPatterns := []string{"/category/", "/tag/", "/tags/", "/categories/", "/topic/", "/topics/"}
for _, pattern := range categoryPatterns {
if strings.Contains(path, pattern) {
return "category"
}
}
// Check for article feeds (path ending in /feed with content before it)
if strings.HasSuffix(path, "/feed") {
basePath := strings.TrimSuffix(path, "/feed")
basePath = strings.Trim(basePath, "/")
if basePath == "" {
return "main" // Just /feed - main feed
}
// Article if path contains date patterns
if matched, _ := regexp.MatchString(`/\d{4}/\d{2}`, basePath); matched {
return "article"
}
// Article if path has multiple segments (nested content)
segments := strings.Split(basePath, "/")
if len(segments) >= 2 {
return "article"
}
// Article if single segment looks like an article slug
if len(segments) == 1 && strings.Contains(segments[0], "-") && len(segments[0]) > 20 {
return "article"
}
}
return "main"
}
// classifyFeedByTitle refines category based on feed title (called after parsing)
func classifyFeedByTitle(title string, currentCategory string) string {
if currentCategory != "main" {
return currentCategory // Already classified by URL
}
lower := strings.ToLower(title)
if strings.HasPrefix(lower, "comments on:") || strings.HasPrefix(lower, "comments for:") {
return "comments"
}
return currentCategory
}
// Enclosure represents a media attachment (audio, video, image)
type Enclosure struct {
URL string `json:"url"`
Type string `json:"type"` // MIME type (audio/mpeg, image/jpeg, etc.)
Length int64 `json:"length"` // Size in bytes
}
// Item represents an individual entry/article from a feed
type Item struct {
ID int64 `json:"id,omitempty"`
FeedURL string `json:"feed_url"`
GUID string `json:"guid,omitempty"`
Title string `json:"title,omitempty"`
Link string `json:"link,omitempty"`
Description string `json:"description,omitempty"`
Content string `json:"content,omitempty"`
Author string `json:"author,omitempty"`
PubDate time.Time `json:"pub_date,omitempty"`
DiscoveredAt time.Time `json:"discovered_at"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
// Media attachments
Enclosure *Enclosure `json:"enclosure,omitempty"` // Primary enclosure (podcast audio, etc.)
ImageURLs []string `json:"image_urls,omitempty"` // Image URLs extracted from content
// Publishing to PDS
PublishedAt time.Time `json:"published_at,omitempty"`
PublishedUri string `json:"published_uri,omitempty"`
}
// Feed represents a discovered RSS/Atom feed with metadata
type Feed struct {
URL string `json:"url"`
Type string `json:"type"` // "rss", "atom", or "unknown"
Category string `json:"category"` // "main", "comments", "category", "author", "article", "podcast"
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
Language string `json:"language,omitempty"`
SiteURL string `json:"site_url,omitempty"` // The website the feed belongs to
// Timing
DiscoveredAt time.Time `json:"discovered_at"`
LastCrawledAt time.Time `json:"last_crawled_at,omitempty"`
NextCrawlAt time.Time `json:"next_crawl_at,omitempty"`
LastBuildDate time.Time `json:"last_build_date,omitempty"` // From feed's lastBuildDate/updated
// Cache headers for conditional requests
ETag string `json:"etag,omitempty"`
LastModified string `json:"last_modified,omitempty"`
// Feed hints for crawl scheduling
TTLMinutes int `json:"ttl_minutes,omitempty"` // From RSS <ttl> element
UpdatePeriod string `json:"update_period,omitempty"` // From sy:updatePeriod (hourly, daily, weekly, monthly, yearly)
UpdateFreq int `json:"update_freq,omitempty"` // From sy:updateFrequency
// Health tracking
Status string `json:"status"` // "active", "dead", "redirect", "error"
ErrorCount int `json:"error_count"`
LastError string `json:"last_error,omitempty"`
LastErrorAt time.Time `json:"last_error_at,omitempty"`
// Discovery source
SourceURL string `json:"source_url,omitempty"`
SourceHost string `json:"source_host,omitempty"`
TLD string `json:"tld,omitempty"`
// Content stats
ItemCount int `json:"item_count,omitempty"` // Number of items in last crawl
AvgPostFreqHrs float64 `json:"avg_post_freq_hrs,omitempty"` // Average hours between posts
OldestItemDate time.Time `json:"oldest_item_date,omitempty"`
NewestItemDate time.Time `json:"newest_item_date,omitempty"`
// Adaptive check interval
NoUpdate int `json:"no_update"` // Consecutive checks with no change
// Publishing to PDS
PublishStatus string `json:"publish_status"` // "held", "pass", "deny"
PublishAccount string `json:"publish_account,omitempty"` // e.g., "news.ycombinator.com.1440.news"
}
// saveFeed stores a feed in PostgreSQL
func (c *Crawler) saveFeed(feed *Feed) error {
// Default publishStatus to "held" if not set
// Auto-deny feeds with no language or unsupported type
// Auto-pass feeds from our own domain
publishStatus := feed.PublishStatus
if publishStatus == "" {
if strings.HasSuffix(feed.SourceHost, "1440.news") || feed.SourceHost == "1440.news" {
publishStatus = "pass"
} else if feed.Language == "" {
publishStatus = "deny"
} else if feed.Type != "rss" && feed.Type != "atom" && feed.Type != "json" {
publishStatus = "deny"
} else {
publishStatus = "held"
}
}
_, err := c.db.Exec(`
INSERT INTO feeds (
url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30)
ON CONFLICT(url) DO UPDATE SET
type = EXCLUDED.type,
category = EXCLUDED.category,
title = EXCLUDED.title,
description = EXCLUDED.description,
language = EXCLUDED.language,
site_url = EXCLUDED.site_url,
last_crawled_at = EXCLUDED.last_crawled_at,
next_crawl_at = EXCLUDED.next_crawl_at,
last_build_date = EXCLUDED.last_build_date,
etag = EXCLUDED.etag,
last_modified = EXCLUDED.last_modified,
ttl_minutes = EXCLUDED.ttl_minutes,
update_period = EXCLUDED.update_period,
update_freq = EXCLUDED.update_freq,
status = EXCLUDED.status,
error_count = EXCLUDED.error_count,
last_error = EXCLUDED.last_error,
last_error_at = EXCLUDED.last_error_at,
item_count = EXCLUDED.item_count,
avg_post_freq_hrs = EXCLUDED.avg_post_freq_hrs,
oldest_item_date = EXCLUDED.oldest_item_date,
newest_item_date = EXCLUDED.newest_item_date,
no_update = EXCLUDED.no_update,
publish_status = EXCLUDED.publish_status,
publish_account = EXCLUDED.publish_account
`,
feed.URL, feed.Type, feed.Category, NullableString(feed.Title), NullableString(feed.Description),
NullableString(feed.Language), NullableString(feed.SiteURL),
feed.DiscoveredAt, NullableTime(feed.LastCrawledAt), NullableTime(feed.NextCrawlAt), NullableTime(feed.LastBuildDate),
NullableString(feed.ETag), NullableString(feed.LastModified),
feed.TTLMinutes, NullableString(feed.UpdatePeriod), feed.UpdateFreq,
feed.Status, feed.ErrorCount, NullableString(feed.LastError), NullableTime(feed.LastErrorAt),
NullableString(feed.SourceURL), NullableString(feed.SourceHost), NullableString(feed.TLD),
feed.ItemCount, feed.AvgPostFreqHrs, NullableTime(feed.OldestItemDate), NullableTime(feed.NewestItemDate),
feed.NoUpdate,
publishStatus, NullableString(feed.PublishAccount),
)
return err
}
// getFeed retrieves a feed from PostgreSQL
func (c *Crawler) getFeed(feedURL string) (*Feed, error) {
feed := &Feed{}
var category, title, description, language, siteURL *string
var lastCrawledAt, nextCrawlAt, lastBuildDate, lastErrorAt, oldestItemDate, newestItemDate *time.Time
var etag, lastModified, updatePeriod, lastError, sourceURL, sourceHost, tld *string
var avgPostFreqHrs *float64
var publishStatus, publishAccount *string
var ttlMinutes, updateFreq, errorCount, itemCount, noUpdate *int
err := c.db.QueryRow(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds WHERE url = $1
`, normalizeURL(feedURL)).Scan(
&feed.URL, &feed.Type, &category, &title, &description, &language, &siteURL,
&feed.DiscoveredAt, &lastCrawledAt, &nextCrawlAt, &lastBuildDate,
&etag, &lastModified,
&ttlMinutes, &updatePeriod, &updateFreq,
&feed.Status, &errorCount, &lastError, &lastErrorAt,
&sourceURL, &sourceHost, &tld,
&itemCount, &avgPostFreqHrs, &oldestItemDate, &newestItemDate,
&noUpdate,
&publishStatus, &publishAccount,
)
if err == pgx.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
// Handle nullable fields
if category != nil {
feed.Category = *category
} else {
feed.Category = "main"
}
feed.Title = StringValue(title)
feed.Description = StringValue(description)
feed.Language = StringValue(language)
feed.SiteURL = StringValue(siteURL)
feed.LastCrawledAt = TimeValue(lastCrawledAt)
feed.NextCrawlAt = TimeValue(nextCrawlAt)
feed.LastBuildDate = TimeValue(lastBuildDate)
feed.ETag = StringValue(etag)
feed.LastModified = StringValue(lastModified)
if ttlMinutes != nil {
feed.TTLMinutes = *ttlMinutes
}
feed.UpdatePeriod = StringValue(updatePeriod)
if updateFreq != nil {
feed.UpdateFreq = *updateFreq
}
if errorCount != nil {
feed.ErrorCount = *errorCount
}
feed.LastError = StringValue(lastError)
feed.LastErrorAt = TimeValue(lastErrorAt)
feed.SourceURL = StringValue(sourceURL)
feed.SourceHost = StringValue(sourceHost)
feed.TLD = StringValue(tld)
if itemCount != nil {
feed.ItemCount = *itemCount
}
if avgPostFreqHrs != nil {
feed.AvgPostFreqHrs = *avgPostFreqHrs
}
feed.OldestItemDate = TimeValue(oldestItemDate)
feed.NewestItemDate = TimeValue(newestItemDate)
if noUpdate != nil {
feed.NoUpdate = *noUpdate
}
if publishStatus != nil {
feed.PublishStatus = *publishStatus
} else {
feed.PublishStatus = "held"
}
feed.PublishAccount = StringValue(publishAccount)
return feed, nil
}
// feedExists checks if a feed URL already exists in the database
func (c *Crawler) feedExists(feedURL string) bool {
var exists bool
err := c.db.QueryRow("SELECT EXISTS(SELECT 1 FROM feeds WHERE url = $1)", normalizeURL(feedURL)).Scan(&exists)
return err == nil && exists
}
// GetAllFeeds returns all feeds from the database
func (c *Crawler) GetAllFeeds() ([]*Feed, error) {
rows, err := c.db.Query(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds
`)
if err != nil {
return nil, err
}
defer rows.Close()
return scanFeeds(rows)
}
// GetFeedCount returns the total number of feeds in the database
func (c *Crawler) GetFeedCount() (int, error) {
var count int
err := c.db.QueryRow("SELECT COUNT(*) FROM feeds").Scan(&count)
return count, err
}
// GetFeedCountByHost returns the number of feeds for a specific host
func (c *Crawler) GetFeedCountByHost(host string) (int, error) {
var count int
err := c.db.QueryRow("SELECT COUNT(*) FROM feeds WHERE source_host = $1", host).Scan(&count)
return count, err
}
// GetFeedsDueForCheck returns feeds where next_crawl_at <= now, ordered randomly, limited to n
func (c *Crawler) GetFeedsDueForCheck(limit int) ([]*Feed, error) {
rows, err := c.db.Query(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds
WHERE next_crawl_at <= NOW() AND status != 'dead'
ORDER BY RANDOM()
LIMIT $1
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanFeeds(rows)
}
// GetFeedsByHost returns all feeds from a specific host
func (c *Crawler) GetFeedsByHost(host string) ([]*Feed, error) {
rows, err := c.db.Query(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds WHERE source_host = $1
`, host)
if err != nil {
return nil, err
}
defer rows.Close()
return scanFeeds(rows)
}
// SearchFeeds performs a full-text search on feeds
func (c *Crawler) SearchFeeds(query string) ([]*Feed, error) {
tsquery := ToSearchQuery(query)
rows, err := c.db.Query(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds
WHERE search_vector @@ to_tsquery('english', $1)
ORDER BY ts_rank(search_vector, to_tsquery('english', $1)) DESC
`, tsquery)
if err != nil {
return nil, err
}
defer rows.Close()
return scanFeeds(rows)
}
// scanFeeds is a helper to scan multiple feed rows
func scanFeeds(rows pgx.Rows) ([]*Feed, error) {
var feeds []*Feed
for rows.Next() {
feed := &Feed{}
var feedType, category, title, description, language, siteURL *string
var lastCrawledAt, nextCrawlAt, lastBuildDate, lastErrorAt, oldestItemDate, newestItemDate *time.Time
var etag, lastModified, updatePeriod, lastError, sourceURL, sourceHost, tld *string
var ttlMinutes, updateFreq, errorCount, itemCount, noUpdate *int
var avgPostFreqHrs *float64
var status *string
var publishStatus, publishAccount *string
if err := rows.Scan(
&feed.URL, &feedType, &category, &title, &description, &language, &siteURL,
&feed.DiscoveredAt, &lastCrawledAt, &nextCrawlAt, &lastBuildDate,
&etag, &lastModified,
&ttlMinutes, &updatePeriod, &updateFreq,
&status, &errorCount, &lastError, &lastErrorAt,
&sourceURL, &sourceHost, &tld,
&itemCount, &avgPostFreqHrs, &oldestItemDate, &newestItemDate,
&noUpdate,
&publishStatus, &publishAccount,
); err != nil {
return nil, err
}
// Handle nullable fields
feed.Type = StringValue(feedType)
if category != nil && *category != "" {
feed.Category = *category
} else {
feed.Category = "main"
}
feed.Title = StringValue(title)
feed.Description = StringValue(description)
feed.Language = StringValue(language)
feed.SiteURL = StringValue(siteURL)
feed.LastCrawledAt = TimeValue(lastCrawledAt)
feed.NextCrawlAt = TimeValue(nextCrawlAt)
feed.LastBuildDate = TimeValue(lastBuildDate)
feed.ETag = StringValue(etag)
feed.LastModified = StringValue(lastModified)
if ttlMinutes != nil {
feed.TTLMinutes = *ttlMinutes
}
feed.UpdatePeriod = StringValue(updatePeriod)
if updateFreq != nil {
feed.UpdateFreq = *updateFreq
}
feed.Status = StringValue(status)
if errorCount != nil {
feed.ErrorCount = *errorCount
}
feed.LastError = StringValue(lastError)
feed.LastErrorAt = TimeValue(lastErrorAt)
feed.SourceURL = StringValue(sourceURL)
feed.SourceHost = StringValue(sourceHost)
feed.TLD = StringValue(tld)
if itemCount != nil {
feed.ItemCount = *itemCount
}
if avgPostFreqHrs != nil {
feed.AvgPostFreqHrs = *avgPostFreqHrs
}
feed.OldestItemDate = TimeValue(oldestItemDate)
feed.NewestItemDate = TimeValue(newestItemDate)
if noUpdate != nil {
feed.NoUpdate = *noUpdate
}
if publishStatus != nil {
feed.PublishStatus = *publishStatus
} else {
feed.PublishStatus = "held"
}
feed.PublishAccount = StringValue(publishAccount)
feeds = append(feeds, feed)
}
return feeds, rows.Err()
}
// saveItem stores an item in PostgreSQL (upsert by feed_url + guid)
func (c *Crawler) saveItem(item *Item) error {
// Serialize enclosure fields
var enclosureUrl, enclosureType *string
var enclosureLength *int64
if item.Enclosure != nil {
enclosureUrl = NullableString(item.Enclosure.URL)
enclosureType = NullableString(item.Enclosure.Type)
if item.Enclosure.Length > 0 {
enclosureLength = &item.Enclosure.Length
}
}
// Serialize imageUrls as JSON
var imageUrlsJSON *string
if len(item.ImageURLs) > 0 {
if data, err := json.Marshal(item.ImageURLs); err == nil {
s := string(data)
imageUrlsJSON = &s
}
}
_, err := c.db.Exec(`
INSERT INTO items (feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT(feed_url, guid) DO UPDATE SET
title = EXCLUDED.title,
link = EXCLUDED.link,
description = EXCLUDED.description,
content = EXCLUDED.content,
author = EXCLUDED.author,
pub_date = EXCLUDED.pub_date,
updated_at = EXCLUDED.updated_at,
enclosure_url = EXCLUDED.enclosure_url,
enclosure_type = EXCLUDED.enclosure_type,
enclosure_length = EXCLUDED.enclosure_length,
image_urls = EXCLUDED.image_urls
`,
item.FeedURL, item.GUID, NullableString(item.Title), NullableString(item.Link),
NullableString(item.Description), NullableString(item.Content), NullableString(item.Author),
NullableTime(item.PubDate), item.DiscoveredAt, NullableTime(item.UpdatedAt),
enclosureUrl, enclosureType, enclosureLength, imageUrlsJSON,
)
return err
}
// saveItems stores multiple items efficiently
func (c *Crawler) saveItems(items []*Item) error {
if len(items) == 0 {
return nil
}
tx, err := c.db.Begin()
if err != nil {
return err
}
defer tx.Rollback(context.Background())
for _, item := range items {
if item == nil || item.GUID == "" {
continue // Skip nil items or items without GUID
}
// Serialize enclosure fields
var enclosureUrl, enclosureType *string
var enclosureLength *int64
if item.Enclosure != nil {
enclosureUrl = NullableString(item.Enclosure.URL)
enclosureType = NullableString(item.Enclosure.Type)
if item.Enclosure.Length > 0 {
enclosureLength = &item.Enclosure.Length
}
}
// Serialize imageUrls as JSON
var imageUrlsJSON *string
if len(item.ImageURLs) > 0 {
if data, err := json.Marshal(item.ImageURLs); err == nil {
s := string(data)
imageUrlsJSON = &s
}
}
_, err := tx.Exec(context.Background(), `
INSERT INTO items (feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT(feed_url, guid) DO UPDATE SET
title = EXCLUDED.title,
link = EXCLUDED.link,
description = EXCLUDED.description,
content = EXCLUDED.content,
author = EXCLUDED.author,
pub_date = EXCLUDED.pub_date,
updated_at = EXCLUDED.updated_at,
enclosure_url = EXCLUDED.enclosure_url,
enclosure_type = EXCLUDED.enclosure_type,
enclosure_length = EXCLUDED.enclosure_length,
image_urls = EXCLUDED.image_urls
`,
item.FeedURL, item.GUID, NullableString(item.Title), NullableString(item.Link),
NullableString(item.Description), NullableString(item.Content), NullableString(item.Author),
NullableTime(item.PubDate), item.DiscoveredAt, NullableTime(item.UpdatedAt),
enclosureUrl, enclosureType, enclosureLength, imageUrlsJSON,
)
if err != nil {
continue // Skip failed items
}
}
return tx.Commit(context.Background())
}
// GetItemsByFeed returns all items for a specific feed
func (c *Crawler) GetItemsByFeed(feedURL string, limit int) ([]*Item, error) {
rows, err := c.db.Query(`
SELECT id, feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls,
published_at, published_uri
FROM items
WHERE feed_url = $1
ORDER BY pub_date DESC
LIMIT $2
`, feedURL, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanItems(rows)
}
// SearchItems performs a full-text search on items
func (c *Crawler) SearchItems(query string, limit int) ([]*Item, error) {
tsquery := ToSearchQuery(query)
rows, err := c.db.Query(`
SELECT id, feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls,
published_at, published_uri
FROM items
WHERE search_vector @@ to_tsquery('english', $1)
ORDER BY ts_rank(search_vector, to_tsquery('english', $1)) DESC, pub_date DESC
LIMIT $2
`, tsquery, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanItems(rows)
}
// scanItems is a helper to scan multiple item rows
func scanItems(rows pgx.Rows) ([]*Item, error) {
var items []*Item
for rows.Next() {
item := &Item{}
var guid, title, link, description, content, author *string
var pubDate, updatedAt, publishedAt *time.Time
var enclosureUrl, enclosureType *string
var enclosureLength *int64
var imageUrlsJSON *string
var publishedUri *string
if err := rows.Scan(
&item.ID, &item.FeedURL, &guid, &title, &link,
&description, &content, &author, &pubDate,
&item.DiscoveredAt, &updatedAt,
&enclosureUrl, &enclosureType, &enclosureLength, &imageUrlsJSON,
&publishedAt, &publishedUri,
); err != nil {
continue
}
item.GUID = StringValue(guid)
item.Title = StringValue(title)
item.Link = StringValue(link)
item.Description = StringValue(description)
item.Content = StringValue(content)
item.Author = StringValue(author)
item.PubDate = TimeValue(pubDate)
item.UpdatedAt = TimeValue(updatedAt)
// Parse enclosure
if enclosureUrl != nil && *enclosureUrl != "" {
item.Enclosure = &Enclosure{
URL: *enclosureUrl,
Type: StringValue(enclosureType),
}
if enclosureLength != nil {
item.Enclosure.Length = *enclosureLength
}
}
// Parse imageUrls JSON
if imageUrlsJSON != nil && *imageUrlsJSON != "" {
var urls []string
if err := json.Unmarshal([]byte(*imageUrlsJSON), &urls); err == nil {
item.ImageURLs = urls
}
}
item.PublishedAt = TimeValue(publishedAt)
item.PublishedUri = StringValue(publishedUri)
items = append(items, item)
}
return items, rows.Err()
}
// CleanupOldItems removes items older than 12 months
func (c *Crawler) CleanupOldItems() (int64, error) {
cutoff := time.Now().AddDate(-1, 0, 0) // 12 months ago
result, err := c.db.Exec(`
DELETE FROM items WHERE pub_date < $1 AND pub_date IS NOT NULL
`, cutoff)
if err != nil {
return 0, err
}
return result, nil
}
// processFeed parses and stores a feed with full metadata
func (c *Crawler) processFeed(feedURL, sourceHost, body string, headers http.Header) {
// Fast path: check without lock
if c.feedExists(feedURL) {
return
}
c.feedsMu.Lock()
defer c.feedsMu.Unlock()
// Double-check after acquiring lock
if c.feedExists(feedURL) {
return
}
feedType := c.detectFeedType(body)
now := time.Now()
feed := &Feed{
URL: normalizeURL(feedURL),
Type: feedType,
Category: classifyFeed(feedURL),
DiscoveredAt: now,
LastCrawledAt: now,
Status: "active",
SourceHost: sourceHost,
TLD: getTLD(sourceHost),
ETag: headers.Get("ETag"),
LastModified: headers.Get("Last-Modified"),
}
// Parse feed-specific metadata and items
var items []*Item
switch feedType {
case "rss":
items = c.parseRSSMetadata(body, feed)
case "atom":
items = c.parseAtomMetadata(body, feed)
case "json":
items = c.parseJSONFeedMetadata(body, feed)
}
// Refine category based on parsed title (e.g., "Comments on:")
feed.Category = classifyFeedByTitle(feed.Title, feed.Category)
// Calculate next crawl time
feed.NextCrawlAt = c.calculateNextCrawl(feed)
if err := c.saveFeed(feed); err != nil {
return
}
// Save items
if len(items) > 0 {
c.saveItems(items)
}
}
// addFeed adds a discovered feed URL (not yet fetched)
func (c *Crawler) addFeed(feedURL, feedType, sourceHost, sourceURL string) {
// Fast path: check without lock
if c.feedExists(feedURL) {
return
}
c.feedsMu.Lock()
defer c.feedsMu.Unlock()
// Double-check after acquiring lock
if c.feedExists(feedURL) {
return
}
now := time.Now()
normalizedURL := normalizeURL(feedURL)
feed := &Feed{
URL: normalizedURL,
Type: feedType,
Category: classifyFeed(feedURL),
DiscoveredAt: now,
Status: "active",
SourceURL: normalizeURL(sourceURL),
SourceHost: sourceHost,
TLD: getTLD(sourceHost),
NextCrawlAt: now, // Should be crawled immediately
}
if err := c.saveFeed(feed); err != nil {
return
}
}
// CheckFeed performs a conditional request to check if a feed has been updated
// Returns: changed (bool), error
func (c *Crawler) CheckFeed(feed *Feed) (bool, error) {
atomic.AddInt32(&c.feedsChecked, 1)
// Try different scheme/www combinations since we store URLs without scheme
urlVariants := []string{
"https://" + feed.URL,
"http://" + feed.URL,
"https://www." + feed.URL,
"http://www." + feed.URL,
}
var resp *http.Response
var err error
var successURL string
for _, tryURL := range urlVariants {
req, reqErr := http.NewRequest("GET", tryURL, nil)
if reqErr != nil {
continue
}
req.Header.Set("User-Agent", c.UserAgent)
// Add conditional headers if we have them
if feed.ETag != "" {
req.Header.Set("If-None-Match", feed.ETag)
}
if feed.LastModified != "" {
req.Header.Set("If-Modified-Since", feed.LastModified)
}
resp, err = c.client.Do(req)
if err == nil {
successURL = tryURL
break
}
}
_ = successURL // May be used later for logging/debugging
// If no request succeeded, resp will be nil
if resp == nil {
if err == nil {
err = fmt.Errorf("all URL variants failed")
}
now := time.Now()
feed.LastCrawledAt = now
feed.ErrorCount++
feed.NoUpdate++
feed.NextCrawlAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
feed.LastError = err.Error()
feed.LastErrorAt = now
feed.Status = "error"
c.saveFeed(feed)
return false, err
}
defer resp.Body.Close()
now := time.Now()
feed.LastCrawledAt = now
// 304 Not Modified - feed hasn't changed
if resp.StatusCode == http.StatusNotModified {
feed.NoUpdate++
// Adaptive backoff: 100s base + 100s per consecutive no-change
feed.NextCrawlAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
feed.ErrorCount = 0
feed.LastError = ""
feed.Status = "active"
c.saveFeed(feed)
return false, nil
}
// Non-200 response
if resp.StatusCode != http.StatusOK {
feed.ErrorCount++
feed.NoUpdate++
feed.NextCrawlAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
feed.LastError = resp.Status
feed.LastErrorAt = now
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusGone {
feed.Status = "dead"
} else {
feed.Status = "error"
}
c.saveFeed(feed)
return false, nil
}
// 200 OK - feed has new content
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
feed.ErrorCount++
feed.NoUpdate++
feed.NextCrawlAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
feed.LastError = err.Error()
feed.LastErrorAt = now
feed.Status = "error"
c.saveFeed(feed)
return false, err
}
body := string(bodyBytes)
// Update cache headers
feed.ETag = resp.Header.Get("ETag")
feed.LastModified = resp.Header.Get("Last-Modified")
// Re-detect type and parse metadata
feedType := c.detectFeedType(body)
feed.Type = feedType
var items []*Item
switch feedType {
case "rss":
items = c.parseRSSMetadata(body, feed)
case "atom":
items = c.parseAtomMetadata(body, feed)
case "json":
items = c.parseJSONFeedMetadata(body, feed)
}
// Content changed - reset backoff
feed.NoUpdate = 0
feed.NextCrawlAt = now.Add(100 * time.Second)
feed.ErrorCount = 0
feed.LastError = ""
feed.Status = "active"
c.saveFeed(feed)
// Save items
if len(items) > 0 {
c.saveItems(items)
}
return true, nil
}
// SetPublishStatus sets the publish status for a feed ('held', 'pass', 'deny')
// If status is 'pass', the account handle is also set (auto-derived if empty)
func (c *Crawler) SetPublishStatus(feedURL, status, account string) error {
feedURL = normalizeURL(feedURL)
// Auto-derive account if passing and not provided
if status == "pass" && account == "" {
account = DeriveHandleFromFeed(feedURL)
}
_, err := c.db.Exec(`
UPDATE feeds SET publish_status = $1, publish_account = $2 WHERE url = $3
`, status, NullableString(account), feedURL)
return err
}
// GetFeedsByPublishStatus returns all feeds with a specific publish status
func (c *Crawler) GetFeedsByPublishStatus(status string) ([]*Feed, error) {
rows, err := c.db.Query(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds
WHERE publish_status = $1
`, status)
if err != nil {
return nil, err
}
defer rows.Close()
return scanFeeds(rows)
}
// GetPublishCandidates returns feeds that are held for review and have items
func (c *Crawler) GetPublishCandidates(limit int) ([]*Feed, error) {
rows, err := c.db.Query(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_crawled_at, next_crawl_at, last_build_date,
etag, last_modified,
ttl_minutes, update_period, update_freq,
status, error_count, last_error, last_error_at,
source_url, source_host, tld,
item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds
WHERE publish_status = 'held' AND item_count > 0 AND status = 'active'
ORDER BY item_count DESC
LIMIT $1
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanFeeds(rows)
}
// GetUnpublishedItems returns items for a feed that haven't been published yet
func (c *Crawler) GetUnpublishedItems(feedURL string, limit int) ([]*Item, error) {
rows, err := c.db.Query(`
SELECT id, feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls,
published_at, published_uri
FROM items
WHERE feed_url = $1 AND published_at IS NULL
ORDER BY pub_date ASC
LIMIT $2
`, feedURL, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanItems(rows)
}
// MarkItemPublished marks an item as published with the given URI
func (c *Crawler) MarkItemPublished(itemID int64, uri string) error {
_, err := c.db.Exec(`
UPDATE items SET published_at = NOW(), published_uri = $1 WHERE id = $2
`, uri, itemID)
return err
}
// GetUnpublishedItemCount returns the count of unpublished items for a feed
func (c *Crawler) GetUnpublishedItemCount(feedURL string) (int, error) {
var count int
err := c.db.QueryRow(`
SELECT COUNT(*) FROM items WHERE feed_url = $1 AND published_at IS NULL
`, feedURL).Scan(&count)
return count, err
}