Replace source_host column with proper FK to domains table using composite key (domain_host, domain_tld). This enables JOIN queries instead of string concatenation for domain lookups. Changes: - Update Feed struct: SourceHost/TLD → DomainHost/DomainTLD - Update all SQL queries to use domain_host/domain_tld columns - Add column aliases (as source_host) for API backwards compatibility - Update trigram index from source_host to domain_host - Add getDomainHost() helper for extracting host from domain Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
253 lines
6.2 KiB
Go
253 lines
6.2 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// processFeed parses and stores a feed with full metadata
|
|
func (c *Crawler) processFeed(feedURL, sourceHost, body string, headers http.Header) {
|
|
// Fast path: check without lock
|
|
if c.feedExists(feedURL) {
|
|
return
|
|
}
|
|
|
|
c.feedsMu.Lock()
|
|
defer c.feedsMu.Unlock()
|
|
|
|
// Double-check after acquiring lock
|
|
if c.feedExists(feedURL) {
|
|
return
|
|
}
|
|
|
|
feedType := c.detectFeedType(body)
|
|
now := time.Now()
|
|
|
|
feed := &Feed{
|
|
URL: normalizeURL(feedURL),
|
|
Type: feedType,
|
|
Category: classifyFeed(feedURL),
|
|
DiscoveredAt: now,
|
|
LastCheckedAt: now,
|
|
Status: "pass",
|
|
DomainHost: getDomainHost(sourceHost),
|
|
DomainTLD: getTLD(sourceHost),
|
|
ETag: headers.Get("ETag"),
|
|
LastModified: headers.Get("Last-Modified"),
|
|
}
|
|
|
|
// Parse feed-specific metadata and items
|
|
var items []*Item
|
|
switch feedType {
|
|
case "rss":
|
|
items = c.parseRSSMetadata(body, feed)
|
|
case "atom":
|
|
items = c.parseAtomMetadata(body, feed)
|
|
case "json":
|
|
items = c.parseJSONFeedMetadata(body, feed)
|
|
}
|
|
|
|
// Refine category based on parsed title (e.g., "Comments on:")
|
|
feed.Category = classifyFeedByTitle(feed.Title, feed.Category)
|
|
|
|
// Calculate next feed_check time
|
|
feed.NextCheckAt = c.calculateNextCheck(feed)
|
|
|
|
if err := c.saveFeed(feed); err != nil {
|
|
return
|
|
}
|
|
|
|
// Save items
|
|
if len(items) > 0 {
|
|
c.saveItems(items)
|
|
}
|
|
}
|
|
|
|
// addFeed adds a discovered feed URL (not yet fetched)
|
|
func (c *Crawler) addFeed(feedURL, feedType, sourceHost, sourceURL string) {
|
|
// Fast path: check without lock
|
|
if c.feedExists(feedURL) {
|
|
return
|
|
}
|
|
|
|
c.feedsMu.Lock()
|
|
defer c.feedsMu.Unlock()
|
|
|
|
// Double-check after acquiring lock
|
|
if c.feedExists(feedURL) {
|
|
return
|
|
}
|
|
|
|
now := time.Now()
|
|
normalizedURL := normalizeURL(feedURL)
|
|
feed := &Feed{
|
|
URL: normalizedURL,
|
|
Type: feedType,
|
|
Category: classifyFeed(feedURL),
|
|
DiscoveredAt: now,
|
|
Status: "pass",
|
|
SourceURL: normalizeURL(sourceURL),
|
|
DomainHost: getDomainHost(sourceHost),
|
|
DomainTLD: getTLD(sourceHost),
|
|
NextCheckAt: now, // Should be crawled immediately
|
|
}
|
|
|
|
if err := c.saveFeed(feed); err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
// CheckFeed performs a conditional request to check if a feed has been updated
|
|
// Returns: changed (bool), error
|
|
func (c *Crawler) CheckFeed(feed *Feed) (bool, error) {
|
|
atomic.AddInt32(&c.feedsChecked, 1)
|
|
|
|
// Try different scheme/www combinations since we store URLs without scheme
|
|
urlVariants := []string{
|
|
"https://" + feed.URL,
|
|
"http://" + feed.URL,
|
|
"https://www." + feed.URL,
|
|
"http://www." + feed.URL,
|
|
}
|
|
|
|
var resp *http.Response
|
|
var err error
|
|
var successURL string
|
|
|
|
for _, tryURL := range urlVariants {
|
|
req, reqErr := http.NewRequest("GET", tryURL, nil)
|
|
if reqErr != nil {
|
|
continue
|
|
}
|
|
|
|
req.Header.Set("User-Agent", c.UserAgent)
|
|
|
|
// Add conditional headers if we have them
|
|
if feed.ETag != "" {
|
|
req.Header.Set("If-None-Match", feed.ETag)
|
|
}
|
|
if feed.LastModified != "" {
|
|
req.Header.Set("If-Modified-Since", feed.LastModified)
|
|
}
|
|
|
|
resp, err = c.client.Do(req)
|
|
if err == nil {
|
|
successURL = tryURL
|
|
break
|
|
}
|
|
}
|
|
|
|
_ = successURL // May be used later for logging/debugging
|
|
|
|
// If no request succeeded, resp will be nil
|
|
if resp == nil {
|
|
if err == nil {
|
|
err = fmt.Errorf("all URL variants failed")
|
|
}
|
|
now := time.Now()
|
|
feed.LastCheckedAt = now
|
|
feed.NoUpdate++
|
|
feed.NextCheckAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
|
|
feed.LastError = err.Error()
|
|
feed.LastErrorAt = now
|
|
feed.Status = "hold"
|
|
// Auto-hold feeds after 1000 consecutive failures/no-changes
|
|
if feed.NoUpdate >= 1000 && feed.PublishStatus == "pass" {
|
|
feed.PublishStatus = "hold"
|
|
fmt.Printf("Feed auto-held after %d no-updates: %s\n", feed.NoUpdate, feed.URL)
|
|
}
|
|
c.saveFeed(feed)
|
|
return false, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
now := time.Now()
|
|
feed.LastCheckedAt = now
|
|
|
|
// 304 Not Modified - feed hasn't changed
|
|
if resp.StatusCode == http.StatusNotModified {
|
|
feed.NoUpdate++
|
|
// Adaptive backoff: 100s base + 100s per consecutive no-change
|
|
feed.NextCheckAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
|
|
feed.LastError = ""
|
|
feed.Status = "pass"
|
|
// Auto-hold feeds after 1000 consecutive no-changes
|
|
if feed.NoUpdate >= 1000 && feed.PublishStatus == "pass" {
|
|
feed.PublishStatus = "hold"
|
|
fmt.Printf("Feed auto-held after %d no-updates: %s\n", feed.NoUpdate, feed.URL)
|
|
}
|
|
c.saveFeed(feed)
|
|
return false, nil
|
|
}
|
|
|
|
// Non-200 response
|
|
if resp.StatusCode != http.StatusOK {
|
|
feed.NoUpdate++
|
|
feed.NextCheckAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
|
|
feed.LastError = resp.Status
|
|
feed.LastErrorAt = now
|
|
feed.Status = "hold"
|
|
// Auto-hold feeds after 1000 consecutive failures/no-changes
|
|
if feed.NoUpdate >= 1000 && feed.PublishStatus == "pass" {
|
|
feed.PublishStatus = "hold"
|
|
fmt.Printf("Feed auto-held after %d no-updates: %s\n", feed.NoUpdate, feed.URL)
|
|
}
|
|
c.saveFeed(feed)
|
|
return false, nil
|
|
}
|
|
|
|
// 200 OK - feed has new content
|
|
bodyBytes, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
feed.NoUpdate++
|
|
feed.NextCheckAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
|
|
feed.LastError = err.Error()
|
|
feed.LastErrorAt = now
|
|
feed.Status = "hold"
|
|
// Auto-hold feeds after 1000 consecutive failures/no-changes
|
|
if feed.NoUpdate >= 1000 && feed.PublishStatus == "pass" {
|
|
feed.PublishStatus = "hold"
|
|
fmt.Printf("Feed auto-held after %d no-updates: %s\n", feed.NoUpdate, feed.URL)
|
|
}
|
|
c.saveFeed(feed)
|
|
return false, err
|
|
}
|
|
|
|
body := string(bodyBytes)
|
|
|
|
// Update cache headers
|
|
feed.ETag = resp.Header.Get("ETag")
|
|
feed.LastModified = resp.Header.Get("Last-Modified")
|
|
|
|
// Re-detect type and parse metadata
|
|
feedType := c.detectFeedType(body)
|
|
feed.Type = feedType
|
|
|
|
var items []*Item
|
|
switch feedType {
|
|
case "rss":
|
|
items = c.parseRSSMetadata(body, feed)
|
|
case "atom":
|
|
items = c.parseAtomMetadata(body, feed)
|
|
case "json":
|
|
items = c.parseJSONFeedMetadata(body, feed)
|
|
}
|
|
|
|
// Content changed - reset backoff
|
|
feed.NoUpdate = 0
|
|
feed.NextCheckAt = now.Add(100 * time.Second)
|
|
feed.LastError = ""
|
|
feed.Status = "pass"
|
|
c.saveFeed(feed)
|
|
|
|
// Save items
|
|
if len(items) > 0 {
|
|
c.saveItems(items)
|
|
}
|
|
|
|
return true, nil
|
|
}
|