package main import ( "fmt" "io" "net/http" "sync/atomic" "time" ) // processFeed parses and stores a feed with full metadata func (c *Crawler) processFeed(feedURL, sourceHost, body string, headers http.Header) { // Fast path: check without lock if c.feedExists(feedURL) { return } c.feedsMu.Lock() defer c.feedsMu.Unlock() // Double-check after acquiring lock if c.feedExists(feedURL) { return } feedType := c.detectFeedType(body) now := time.Now() feed := &Feed{ URL: normalizeURL(feedURL), Type: feedType, Category: classifyFeed(feedURL), DiscoveredAt: now, LastCheckedAt: now, Status: "pass", DomainHost: getDomainHost(sourceHost), DomainTLD: getTLD(sourceHost), ETag: headers.Get("ETag"), LastModified: headers.Get("Last-Modified"), } // Parse feed-specific metadata and items var items []*Item switch feedType { case "rss": items = c.parseRSSMetadata(body, feed) case "atom": items = c.parseAtomMetadata(body, feed) case "json": items = c.parseJSONFeedMetadata(body, feed) } // Refine category based on parsed title (e.g., "Comments on:") feed.Category = classifyFeedByTitle(feed.Title, feed.Category) // Calculate next feed_check time feed.NextCheckAt = c.calculateNextCheck(feed) if err := c.saveFeed(feed); err != nil { return } // Save items if len(items) > 0 { c.saveItems(items) } } // addFeed adds a discovered feed URL (not yet fetched) func (c *Crawler) addFeed(feedURL, feedType, sourceHost, sourceURL string) { // Fast path: check without lock if c.feedExists(feedURL) { return } c.feedsMu.Lock() defer c.feedsMu.Unlock() // Double-check after acquiring lock if c.feedExists(feedURL) { return } now := time.Now() normalizedURL := normalizeURL(feedURL) feed := &Feed{ URL: normalizedURL, Type: feedType, Category: classifyFeed(feedURL), DiscoveredAt: now, Status: "pass", SourceURL: normalizeURL(sourceURL), DomainHost: getDomainHost(sourceHost), DomainTLD: getTLD(sourceHost), NextCheckAt: now, // Should be crawled immediately } if err := c.saveFeed(feed); err != nil { return } } // CheckFeed performs a conditional request to check if a feed has been updated // Returns: changed (bool), error func (c *Crawler) CheckFeed(feed *Feed) (bool, error) { atomic.AddInt32(&c.feedsChecked, 1) // Try different scheme/www combinations since we store URLs without scheme urlVariants := []string{ "https://" + feed.URL, "http://" + feed.URL, "https://www." + feed.URL, "http://www." + feed.URL, } var resp *http.Response var err error var successURL string for _, tryURL := range urlVariants { req, reqErr := http.NewRequest("GET", tryURL, nil) if reqErr != nil { continue } req.Header.Set("User-Agent", c.UserAgent) // Add conditional headers if we have them if feed.ETag != "" { req.Header.Set("If-None-Match", feed.ETag) } if feed.LastModified != "" { req.Header.Set("If-Modified-Since", feed.LastModified) } resp, err = c.client.Do(req) if err == nil { successURL = tryURL break } } _ = successURL // May be used later for logging/debugging // If no request succeeded, resp will be nil if resp == nil { if err == nil { err = fmt.Errorf("all URL variants failed") } now := time.Now() feed.LastCheckedAt = now feed.NoUpdate++ feed.NextCheckAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second) feed.LastError = err.Error() feed.LastErrorAt = now feed.Status = "hold" // Auto-hold feeds after 1000 consecutive failures/no-changes if feed.NoUpdate >= 1000 && feed.PublishStatus == "pass" { feed.PublishStatus = "hold" fmt.Printf("Feed auto-held after %d no-updates: %s\n", feed.NoUpdate, feed.URL) } c.saveFeed(feed) return false, err } defer resp.Body.Close() now := time.Now() feed.LastCheckedAt = now // 304 Not Modified - feed hasn't changed if resp.StatusCode == http.StatusNotModified { feed.NoUpdate++ // Adaptive backoff: 100s base + 100s per consecutive no-change feed.NextCheckAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second) feed.LastError = "" feed.Status = "pass" // Auto-hold feeds after 1000 consecutive no-changes if feed.NoUpdate >= 1000 && feed.PublishStatus == "pass" { feed.PublishStatus = "hold" fmt.Printf("Feed auto-held after %d no-updates: %s\n", feed.NoUpdate, feed.URL) } c.saveFeed(feed) return false, nil } // Non-200 response if resp.StatusCode != http.StatusOK { feed.NoUpdate++ feed.NextCheckAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second) feed.LastError = resp.Status feed.LastErrorAt = now feed.Status = "hold" // Auto-hold feeds after 1000 consecutive failures/no-changes if feed.NoUpdate >= 1000 && feed.PublishStatus == "pass" { feed.PublishStatus = "hold" fmt.Printf("Feed auto-held after %d no-updates: %s\n", feed.NoUpdate, feed.URL) } c.saveFeed(feed) return false, nil } // 200 OK - feed has new content bodyBytes, err := io.ReadAll(resp.Body) if err != nil { feed.NoUpdate++ feed.NextCheckAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second) feed.LastError = err.Error() feed.LastErrorAt = now feed.Status = "hold" // Auto-hold feeds after 1000 consecutive failures/no-changes if feed.NoUpdate >= 1000 && feed.PublishStatus == "pass" { feed.PublishStatus = "hold" fmt.Printf("Feed auto-held after %d no-updates: %s\n", feed.NoUpdate, feed.URL) } c.saveFeed(feed) return false, err } body := string(bodyBytes) // Update cache headers feed.ETag = resp.Header.Get("ETag") feed.LastModified = resp.Header.Get("Last-Modified") // Re-detect type and parse metadata feedType := c.detectFeedType(body) feed.Type = feedType var items []*Item switch feedType { case "rss": items = c.parseRSSMetadata(body, feed) case "atom": items = c.parseAtomMetadata(body, feed) case "json": items = c.parseJSONFeedMetadata(body, feed) } // Content changed - reset backoff feed.NoUpdate = 0 feed.NextCheckAt = now.Add(100 * time.Second) feed.LastError = "" feed.Status = "pass" c.saveFeed(feed) // Save items if len(items) > 0 { c.saveItems(items) } return true, nil }