Files
crawler/crawler.go
primal 9a43b69b4b Add profile refresh on startup to backfill feed URLs
Updates existing account profiles with the feed URL on startup.
This ensures all accounts have the source feed URL in their
profile description.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-28 21:03:10 -05:00

587 lines
15 KiB
Go

package main
import (
"fmt"
"io"
"net/http"
"os"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/html"
)
type Crawler struct {
MaxDepth int
MaxPagesPerHost int
Timeout time.Duration
UserAgent string
visited sync.Map
feedsMu sync.Mutex
client *http.Client
hostsProcessed int32
feedsChecked int32
startTime time.Time
db *DB
displayedCrawlRate int
displayedCheckRate int
domainsImported int32
cachedStats *DashboardStats
cachedAllDomains []DomainStat
statsMu sync.RWMutex
}
func NewCrawler(connString string) (*Crawler, error) {
db, err := OpenDatabase(connString)
if err != nil {
return nil, fmt.Errorf("failed to open database: %v", err)
}
return &Crawler{
MaxDepth: 10,
MaxPagesPerHost: 10,
Timeout: 10 * time.Second,
UserAgent: "FeedCrawler/1.0",
startTime: time.Now(),
db: db,
client: &http.Client{
Timeout: 10 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
if len(via) >= 10 {
return fmt.Errorf("stopped after 10 redirects")
}
return nil
},
},
}, nil
}
func (c *Crawler) Close() error {
if c.db != nil {
fmt.Println("Closing database...")
return c.db.Close()
}
return nil
}
// StartStatsLoop updates cached stats once per minute
func (c *Crawler) StartStatsLoop() {
for {
c.UpdateStats()
time.Sleep(1 * time.Minute)
}
}
// StartCleanupLoop runs item cleanup once per week
func (c *Crawler) StartCleanupLoop() {
for {
deleted, err := c.CleanupOldItems()
if err != nil {
fmt.Printf("Cleanup error: %v\n", err)
} else if deleted > 0 {
fmt.Printf("Cleanup: removed %d old items\n", deleted)
}
time.Sleep(7 * 24 * time.Hour)
}
}
// StartMaintenanceLoop performs periodic database maintenance
func (c *Crawler) StartMaintenanceLoop() {
vacuumTicker := time.NewTicker(24 * time.Hour)
analyzeTicker := time.NewTicker(1 * time.Hour)
defer vacuumTicker.Stop()
defer analyzeTicker.Stop()
for {
select {
case <-analyzeTicker.C:
// Update statistics for query planner
if _, err := c.db.Exec("ANALYZE"); err != nil {
fmt.Printf("ANALYZE error: %v\n", err)
}
case <-vacuumTicker.C:
// Reclaim dead tuple space (VACUUM is lighter than VACUUM FULL)
fmt.Println("Running VACUUM...")
if _, err := c.db.Exec("VACUUM"); err != nil {
fmt.Printf("VACUUM error: %v\n", err)
} else {
fmt.Println("VACUUM complete")
}
}
}
}
// StartPublishLoop automatically publishes unpublished items for approved feeds
// Grabs up to 50 items sorted by discovered_at, publishes one per second, then reloops
func (c *Crawler) StartPublishLoop() {
// Load PDS credentials from environment or pds.env file
pdsHost := os.Getenv("PDS_HOST")
pdsAdminPassword := os.Getenv("PDS_ADMIN_PASSWORD")
if pdsHost == "" || pdsAdminPassword == "" {
if data, err := os.ReadFile("pds.env"); err == nil {
for _, line := range strings.Split(string(data), "\n") {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "#") || line == "" {
continue
}
parts := strings.SplitN(line, "=", 2)
if len(parts) == 2 {
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
switch key {
case "PDS_HOST":
pdsHost = value
case "PDS_ADMIN_PASSWORD":
pdsAdminPassword = value
}
}
}
}
}
if pdsHost == "" || pdsAdminPassword == "" {
fmt.Println("Publish loop: PDS credentials not configured, skipping")
return
}
fmt.Printf("Publish loop: starting with PDS %s\n", pdsHost)
feedPassword := "feed1440!"
// Cache sessions per account
sessions := make(map[string]*PDSSession)
publisher := NewPublisher(pdsHost)
// Refresh existing account profiles on startup
c.RefreshAllProfiles(publisher, feedPassword)
for {
// Get up to 50 unpublished items from approved feeds, sorted by discovered_at ASC
items, err := c.GetAllUnpublishedItems(50)
if err != nil {
fmt.Printf("Publish loop error: %v\n", err)
time.Sleep(1 * time.Second)
continue
}
if len(items) == 0 {
time.Sleep(1 * time.Second)
continue
}
// Publish one item per second
for _, item := range items {
// Get or create session for this feed's account
account := c.getAccountForFeed(item.FeedURL)
if account == "" {
time.Sleep(1 * time.Second)
continue
}
session, ok := sessions[account]
if !ok {
// Try to log in
session, err = publisher.CreateSession(account, feedPassword)
if err != nil {
// Account might not exist - try to create it
inviteCode, err := publisher.CreateInviteCode(pdsAdminPassword, 1)
if err != nil {
fmt.Printf("Publish: failed to create invite for %s: %v\n", account, err)
time.Sleep(1 * time.Second)
continue
}
email := account + "@1440.news"
session, err = publisher.CreateAccount(account, email, feedPassword, inviteCode)
if err != nil {
fmt.Printf("Publish: failed to create account %s: %v\n", account, err)
time.Sleep(1 * time.Second)
continue
}
fmt.Printf("Publish: created account %s\n", account)
c.db.Exec("UPDATE feeds SET publish_account = $1 WHERE url = $2", account, item.FeedURL)
// Set up profile for new account
feedInfo := c.getFeedInfo(item.FeedURL)
if feedInfo != nil {
displayName := feedInfo.Title
if displayName == "" {
displayName = account
}
// Build description with feed URL
description := feedInfo.Description
if description == "" {
description = "News feed via 1440.news"
}
// Add feed URL to description
feedURLFull := "https://" + item.FeedURL
description = description + "\n\n" + feedURLFull
// Truncate if needed
if len(displayName) > 64 {
displayName = displayName[:61] + "..."
}
if len(description) > 256 {
description = description[:253] + "..."
}
if err := publisher.UpdateProfile(session, displayName, description, nil); err != nil {
fmt.Printf("Publish: failed to set profile for %s: %v\n", account, err)
} else {
fmt.Printf("Publish: set profile for %s\n", account)
}
}
}
sessions[account] = session
}
// Publish the item
uri, err := publisher.PublishItem(session, &item)
if err != nil {
fmt.Printf("Publish: failed item %d: %v\n", item.ID, err)
// Clear session cache on auth errors
if strings.Contains(err.Error(), "401") || strings.Contains(err.Error(), "auth") {
delete(sessions, account)
}
} else {
c.MarkItemPublished(item.ID, uri)
fmt.Printf("Publish: %s -> %s\n", item.Title[:min(40, len(item.Title))], account)
}
time.Sleep(1 * time.Second)
}
time.Sleep(1 * time.Second)
}
}
// getAccountForFeed returns the publish account for a feed URL
func (c *Crawler) getAccountForFeed(feedURL string) string {
var account *string
err := c.db.QueryRow(`
SELECT publish_account FROM feeds
WHERE url = $1 AND publish_status = 'pass' AND status = 'active'
`, feedURL).Scan(&account)
if err != nil || account == nil || *account == "" {
// Derive handle from feed URL
return DeriveHandleFromFeed(feedURL)
}
return *account
}
// FeedInfo holds basic feed metadata for profile setup
type FeedInfo struct {
Title string
Description string
SiteURL string
}
// getFeedInfo returns feed metadata for profile setup
func (c *Crawler) getFeedInfo(feedURL string) *FeedInfo {
var title, description, siteURL *string
err := c.db.QueryRow(`
SELECT title, description, site_url FROM feeds WHERE url = $1
`, feedURL).Scan(&title, &description, &siteURL)
if err != nil {
return nil
}
return &FeedInfo{
Title: StringValue(title),
Description: StringValue(description),
SiteURL: StringValue(siteURL),
}
}
// RefreshAllProfiles updates profiles for all existing accounts with feed URLs
func (c *Crawler) RefreshAllProfiles(publisher *Publisher, feedPassword string) {
rows, err := c.db.Query(`
SELECT url, title, description, publish_account
FROM feeds
WHERE publish_account IS NOT NULL AND publish_account <> ''
`)
if err != nil {
fmt.Printf("RefreshProfiles: query error: %v\n", err)
return
}
defer rows.Close()
for rows.Next() {
var feedURL, account string
var title, description *string
if err := rows.Scan(&feedURL, &title, &description, &account); err != nil {
continue
}
// Login to account
session, err := publisher.CreateSession(account, feedPassword)
if err != nil {
fmt.Printf("RefreshProfiles: login failed for %s: %v\n", account, err)
continue
}
// Build profile
displayName := StringValue(title)
if displayName == "" {
displayName = account
}
desc := StringValue(description)
if desc == "" {
desc = "News feed via 1440.news"
}
// Add feed URL
feedURLFull := "https://" + feedURL
desc = desc + "\n\n" + feedURLFull
// Truncate if needed
if len(displayName) > 64 {
displayName = displayName[:61] + "..."
}
if len(desc) > 256 {
desc = desc[:253] + "..."
}
if err := publisher.UpdateProfile(session, displayName, desc, nil); err != nil {
fmt.Printf("RefreshProfiles: update failed for %s: %v\n", account, err)
} else {
fmt.Printf("RefreshProfiles: updated %s\n", account)
}
}
}
// GetAllUnpublishedItems returns unpublished items from all approved feeds
func (c *Crawler) GetAllUnpublishedItems(limit int) ([]Item, error) {
rows, err := c.db.Query(`
SELECT i.id, i.feed_url, i.guid, i.title, i.link, i.description, i.content,
i.author, i.pub_date, i.discovered_at
FROM items i
JOIN feeds f ON i.feed_url = f.url
WHERE f.publish_status = 'pass'
AND f.status = 'active'
AND i.published_at IS NULL
ORDER BY i.discovered_at ASC
LIMIT $1
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Item
for rows.Next() {
var item Item
var guid, title, link, description, content, author *string
var pubDate, discoveredAt *time.Time
err := rows.Scan(&item.ID, &item.FeedURL, &guid, &title, &link, &description,
&content, &author, &pubDate, &discoveredAt)
if err != nil {
continue
}
item.GUID = StringValue(guid)
item.Title = StringValue(title)
item.Link = StringValue(link)
item.Description = StringValue(description)
item.Content = StringValue(content)
item.Author = StringValue(author)
item.PubDate = TimeValue(pubDate)
item.DiscoveredAt = TimeValue(discoveredAt)
items = append(items, item)
}
return items, nil
}
// StartCrawlLoop runs the domain crawling loop independently
func (c *Crawler) StartCrawlLoop() {
numWorkers := runtime.NumCPU()
if numWorkers < 1 {
numWorkers = 1
}
// Buffered channel for domain work
workChan := make(chan *Domain, 256)
// Start workers
for i := 0; i < numWorkers; i++ {
go func() {
for domain := range workChan {
feedsFound, crawlErr := c.crawlHost(domain.Host)
errStr := ""
if crawlErr != nil {
errStr = crawlErr.Error()
}
if err := c.markDomainCrawled(domain.Host, feedsFound, errStr); err != nil {
fmt.Printf("Error marking domain %s as crawled: %v\n", domain.Host, err)
}
}
}()
}
const fetchSize = 1000
for {
domains, err := c.GetUncheckedDomains(fetchSize)
if err != nil {
fmt.Printf("Error fetching domains: %v\n", err)
}
if len(domains) == 0 {
c.displayedCrawlRate = 0
time.Sleep(1 * time.Second)
continue
}
fmt.Printf("%s crawl: %d domains to check\n", time.Now().Format("15:04:05"), len(domains))
for _, domain := range domains {
workChan <- domain
}
time.Sleep(1 * time.Second)
}
}
// StartCheckLoop runs the feed checking loop independently
func (c *Crawler) StartCheckLoop() {
numWorkers := runtime.NumCPU()
if numWorkers < 1 {
numWorkers = 1
}
// Buffered channel for feed work
workChan := make(chan *Feed, 256)
// Start workers
for i := 0; i < numWorkers; i++ {
go func() {
for feed := range workChan {
c.CheckFeed(feed)
}
}()
}
const fetchSize = 1000
for {
feeds, err := c.GetFeedsDueForCheck(fetchSize)
if err != nil {
fmt.Printf("Error fetching feeds: %v\n", err)
}
if len(feeds) == 0 {
c.displayedCheckRate = 0
time.Sleep(1 * time.Second)
continue
}
fmt.Printf("%s check: %d feeds to check\n", time.Now().Format("15:04:05"), len(feeds))
for _, feed := range feeds {
workChan <- feed
}
time.Sleep(1 * time.Second)
}
}
func (c *Crawler) crawlHost(host string) (feedsFound int, err error) {
atomic.AddInt32(&c.hostsProcessed, 1)
localVisited := make(map[string]bool)
pagesVisited := 0
// Try HTTPS first, fall back to HTTP if no pages were visited
c.crawlPage("https://"+host, host, 0, localVisited, &pagesVisited)
if pagesVisited == 0 {
c.crawlPage("http://"+host, host, 0, localVisited, &pagesVisited)
}
// Count feeds found for this specific host
feedsFound, _ = c.GetFeedCountByHost(host)
if pagesVisited == 0 {
return feedsFound, fmt.Errorf("could not connect")
}
return feedsFound, nil
}
func (c *Crawler) crawlPage(pageURL, sourceHost string, depth int, localVisited map[string]bool, pagesVisited *int) {
if *pagesVisited >= c.MaxPagesPerHost || depth > c.MaxDepth {
return
}
if localVisited[pageURL] {
return
}
if _, visited := c.visited.LoadOrStore(pageURL, true); visited {
return
}
localVisited[pageURL] = true
*pagesVisited++
body, contentType, headers, err := c.fetchPage(pageURL)
if err != nil {
return
}
if c.isFeedContent(body, contentType) {
c.processFeed(pageURL, sourceHost, body, headers)
return
}
doc, err := html.Parse(strings.NewReader(body))
if err != nil {
return
}
feedLinks := c.extractFeedLinks(doc, pageURL)
for _, fl := range feedLinks {
c.addFeed(fl.URL, fl.Type, sourceHost, pageURL)
}
anchorFeeds := c.extractAnchorFeeds(doc, pageURL)
for _, fl := range anchorFeeds {
c.addFeed(fl.URL, fl.Type, sourceHost, pageURL)
}
if depth < c.MaxDepth {
links := c.extractLinks(doc, pageURL)
for _, link := range links {
if shouldCrawl(link, pageURL) {
c.crawlPage(link, sourceHost, depth+1, localVisited, pagesVisited)
}
}
}
}
func (c *Crawler) fetchPage(pageURL string) (string, string, http.Header, error) {
req, err := http.NewRequest("GET", pageURL, nil)
if err != nil {
return "", "", nil, err
}
req.Header.Set("User-Agent", c.UserAgent)
resp, err := c.client.Do(req)
if err != nil {
return "", "", nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", "", nil, fmt.Errorf("status code: %d", resp.StatusCode)
}
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return "", "", nil, err
}
contentType := resp.Header.Get("Content-Type")
return string(bodyBytes), contentType, resp.Header, nil
}