Files
crawler/crawler.go
primal 522233c4a2 Tune concurrency settings: 100 workers, 100 batch size, 100 buffer
- Import batch size: 100 (was 100k)
- Domain check/crawl fetch size: 100 (was 1000)
- Feed check fetch size: 100 (was 1000)
- Worker count: 100 fixed (was NumCPU)
- Channel buffers: 100 (was 256)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 23:33:57 -05:00

730 lines
19 KiB
Go

package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/html"
)
type Crawler struct {
MaxDepth int
MaxPagesPerHost int
Timeout time.Duration
UserAgent string
visited sync.Map
feedsMu sync.Mutex
client *http.Client
hostsProcessed int32
feedsChecked int32
startTime time.Time
db *DB
displayedCrawlRate int
displayedCheckRate int
domainsImported int32
cachedStats *DashboardStats
cachedAllDomains []DomainStat
statsMu sync.RWMutex
}
func NewCrawler(connString string) (*Crawler, error) {
db, err := OpenDatabase(connString)
if err != nil {
return nil, fmt.Errorf("failed to open database: %v", err)
}
return &Crawler{
MaxDepth: 10,
MaxPagesPerHost: 10,
Timeout: 10 * time.Second,
UserAgent: "FeedCrawler/1.0",
startTime: time.Now(),
db: db,
client: &http.Client{
Timeout: 10 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
if len(via) >= 10 {
return fmt.Errorf("stopped after 10 redirects")
}
return nil
},
},
}, nil
}
func (c *Crawler) Close() error {
if c.db != nil {
fmt.Println("Closing database...")
return c.db.Close()
}
return nil
}
// StartStatsLoop updates cached stats every 10 seconds
func (c *Crawler) StartStatsLoop() {
for {
c.UpdateStats()
time.Sleep(10 * time.Second)
}
}
// StartCleanupLoop runs item cleanup once per week
func (c *Crawler) StartCleanupLoop() {
for {
deleted, err := c.CleanupOldItems()
if err != nil {
fmt.Printf("Cleanup error: %v\n", err)
} else if deleted > 0 {
fmt.Printf("Cleanup: removed %d old items\n", deleted)
}
time.Sleep(7 * 24 * time.Hour)
}
}
// StartMaintenanceLoop performs periodic database maintenance
func (c *Crawler) StartMaintenanceLoop() {
vacuumTicker := time.NewTicker(24 * time.Hour)
analyzeTicker := time.NewTicker(1 * time.Hour)
defer vacuumTicker.Stop()
defer analyzeTicker.Stop()
for {
select {
case <-analyzeTicker.C:
// Update statistics for query planner
if _, err := c.db.Exec("ANALYZE"); err != nil {
fmt.Printf("ANALYZE error: %v\n", err)
}
case <-vacuumTicker.C:
// Reclaim dead tuple space (VACUUM is lighter than VACUUM FULL)
fmt.Println("Running VACUUM...")
if _, err := c.db.Exec("VACUUM"); err != nil {
fmt.Printf("VACUUM error: %v\n", err)
} else {
fmt.Println("VACUUM complete")
}
}
}
}
// StartPublishLoop automatically publishes unpublished items for approved feeds
// Grabs up to 50 items sorted by discovered_at, publishes one per second, then reloops
func (c *Crawler) StartPublishLoop() {
// Load PDS credentials from environment or pds.env file
pdsHost := os.Getenv("PDS_HOST")
pdsAdminPassword := os.Getenv("PDS_ADMIN_PASSWORD")
if pdsHost == "" || pdsAdminPassword == "" {
if data, err := os.ReadFile("pds.env"); err == nil {
for _, line := range strings.Split(string(data), "\n") {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "#") || line == "" {
continue
}
parts := strings.SplitN(line, "=", 2)
if len(parts) == 2 {
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
switch key {
case "PDS_HOST":
pdsHost = value
case "PDS_ADMIN_PASSWORD":
pdsAdminPassword = value
}
}
}
}
}
if pdsHost == "" || pdsAdminPassword == "" {
fmt.Println("Publish loop: PDS credentials not configured, skipping")
return
}
fmt.Printf("Publish loop: starting with PDS %s\n", pdsHost)
feedPassword := "feed1440!"
// Cache sessions per account
sessions := make(map[string]*PDSSession)
publisher := NewPublisher(pdsHost)
// Refresh existing account profiles on startup
c.RefreshAllProfiles(publisher, feedPassword)
for {
// Get up to 50 unpublished items from approved feeds, sorted by discovered_at ASC
items, err := c.GetAllUnpublishedItems(50)
if err != nil {
fmt.Printf("Publish loop error: %v\n", err)
time.Sleep(1 * time.Second)
continue
}
if len(items) == 0 {
time.Sleep(1 * time.Second)
continue
}
// Publish one item per second
for _, item := range items {
// Get or create session for this feed's account
account := c.getAccountForFeed(item.FeedURL)
if account == "" {
time.Sleep(1 * time.Second)
continue
}
session, ok := sessions[account]
if !ok {
// Try to log in
session, err = publisher.CreateSession(account, feedPassword)
if err != nil {
// Account might not exist - try to create it
inviteCode, err := publisher.CreateInviteCode(pdsAdminPassword, 1)
if err != nil {
fmt.Printf("Publish: failed to create invite for %s: %v\n", account, err)
time.Sleep(1 * time.Second)
continue
}
email := account + "@1440.news"
session, err = publisher.CreateAccount(account, email, feedPassword, inviteCode)
if err != nil {
fmt.Printf("Publish: failed to create account %s: %v\n", account, err)
time.Sleep(1 * time.Second)
continue
}
fmt.Printf("Publish: created account %s\n", account)
c.db.Exec("UPDATE feeds SET publish_account = $1 WHERE url = $2", account, item.FeedURL)
// Set up profile for new account
feedInfo := c.getFeedInfo(item.FeedURL)
if feedInfo != nil {
displayName := feedInfo.Title
if displayName == "" {
displayName = account
}
// Build description with feed URL (strip HTML tags)
description := stripHTML(feedInfo.Description)
if description == "" {
description = "News feed via 1440.news"
}
// Add feed URL as first line of description
feedURLFull := "https://" + item.FeedURL
description = feedURLFull + "\n\n" + description
// Truncate if needed
if len(displayName) > 64 {
displayName = displayName[:61] + "..."
}
if len(description) > 256 {
description = description[:253] + "..."
}
// Fetch and upload favicon as avatar
var avatar *BlobRef
faviconSource := feedInfo.SiteURL
if faviconSource == "" {
// Fallback to deriving from feed URL
faviconSource = feedInfo.SourceHost
}
if faviconSource != "" {
faviconURL := publisher.FetchFavicon(faviconSource)
if faviconURL != "" {
avatar = publisher.fetchAndUploadImage(session, faviconURL)
}
}
if err := publisher.UpdateProfile(session, displayName, description, avatar); err != nil {
fmt.Printf("Publish: failed to set profile for %s: %v\n", account, err)
} else {
fmt.Printf("Publish: set profile for %s\n", account)
}
// Have directory account follow this new account
if err := publisher.FollowAsDirectory(session.DID); err != nil {
fmt.Printf("Publish: directory follow failed for %s: %v\n", account, err)
} else {
fmt.Printf("Publish: directory now following %s\n", account)
}
}
}
sessions[account] = session
}
// Shorten URLs before publishing
itemToPublish := item
if item.Link != "" {
if shortURL, err := c.GetShortURLForPost(item.Link, &item.ID, item.FeedURL); err == nil {
fmt.Printf("Publish: shortened %s -> %s\n", item.Link[:min(40, len(item.Link))], shortURL)
itemToPublish.Link = shortURL
} else {
fmt.Printf("Publish: short URL failed for %s: %v\n", item.Link[:min(40, len(item.Link))], err)
}
}
// Publish the item
uri, err := publisher.PublishItem(session, &itemToPublish)
if err != nil {
fmt.Printf("Publish: failed item %d: %v\n", item.ID, err)
// Clear session cache on auth errors
if strings.Contains(err.Error(), "401") || strings.Contains(err.Error(), "auth") {
delete(sessions, account)
}
} else {
c.MarkItemPublished(item.ID, uri)
fmt.Printf("Publish: %s -> %s\n", item.Title[:min(40, len(item.Title))], account)
}
time.Sleep(1 * time.Second)
}
time.Sleep(1 * time.Second)
}
}
// getAccountForFeed returns the publish account for a feed URL
func (c *Crawler) getAccountForFeed(feedURL string) string {
var account *string
err := c.db.QueryRow(`
SELECT publish_account FROM feeds
WHERE url = $1 AND publish_status = 'pass' AND status = 'active'
`, feedURL).Scan(&account)
if err != nil || account == nil || *account == "" {
// Derive handle from feed URL
return DeriveHandleFromFeed(feedURL)
}
return *account
}
// FeedInfo holds basic feed metadata for profile setup
type FeedInfo struct {
Title string
Description string
SiteURL string
SourceHost string
}
// getFeedInfo returns feed metadata for profile setup
func (c *Crawler) getFeedInfo(feedURL string) *FeedInfo {
var title, description, siteURL, sourceHost *string
err := c.db.QueryRow(`
SELECT title, description, site_url, source_host FROM feeds WHERE url = $1
`, feedURL).Scan(&title, &description, &siteURL, &sourceHost)
if err != nil {
return nil
}
return &FeedInfo{
Title: StringValue(title),
Description: StringValue(description),
SiteURL: StringValue(siteURL),
SourceHost: StringValue(sourceHost),
}
}
// RefreshAllProfiles updates profiles for all existing accounts with feed URLs
func (c *Crawler) RefreshAllProfiles(publisher *Publisher, feedPassword string) {
rows, err := c.db.Query(`
SELECT url, title, description, site_url, source_host, publish_account
FROM feeds
WHERE publish_account IS NOT NULL AND publish_account <> ''
`)
if err != nil {
fmt.Printf("RefreshProfiles: query error: %v\n", err)
return
}
defer rows.Close()
for rows.Next() {
var feedURL, account string
var title, description, siteURL, sourceHost *string
if err := rows.Scan(&feedURL, &title, &description, &siteURL, &sourceHost, &account); err != nil {
continue
}
// Login to account
session, err := publisher.CreateSession(account, feedPassword)
if err != nil {
fmt.Printf("RefreshProfiles: login failed for %s: %v\n", account, err)
continue
}
// Build profile
displayName := StringValue(title)
if displayName == "" {
displayName = account
}
desc := stripHTML(StringValue(description))
if desc == "" {
desc = "News feed via 1440.news"
}
// Add feed URL as first line
feedURLFull := "https://" + feedURL
desc = feedURLFull + "\n\n" + desc
// Truncate if needed
if len(displayName) > 64 {
displayName = displayName[:61] + "..."
}
if len(desc) > 256 {
desc = desc[:253] + "..."
}
// Fetch and upload favicon as avatar
var avatar *BlobRef
faviconSource := StringValue(siteURL)
if faviconSource == "" {
// Fallback to source host
faviconSource = StringValue(sourceHost)
}
if faviconSource != "" {
faviconURL := publisher.FetchFavicon(faviconSource)
if faviconURL != "" {
avatar = publisher.fetchAndUploadImage(session, faviconURL)
}
}
if err := publisher.UpdateProfile(session, displayName, desc, avatar); err != nil {
fmt.Printf("RefreshProfiles: update failed for %s: %v\n", account, err)
} else {
fmt.Printf("RefreshProfiles: updated %s\n", account)
}
}
}
// GetAllUnpublishedItems returns unpublished items from all approved feeds
func (c *Crawler) GetAllUnpublishedItems(limit int) ([]Item, error) {
rows, err := c.db.Query(`
SELECT i.id, i.feed_url, i.guid, i.title, i.link, i.description, i.content,
i.author, i.pub_date, i.discovered_at, i.image_urls, i.tags,
i.enclosure_url, i.enclosure_type, i.enclosure_length
FROM items i
JOIN feeds f ON i.feed_url = f.url
WHERE f.publish_status = 'pass'
AND f.status = 'active'
AND i.published_at IS NULL
ORDER BY i.discovered_at ASC
LIMIT $1
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Item
for rows.Next() {
var item Item
var guid, title, link, description, content, author, imageURLsJSON, tagsJSON *string
var pubDate, discoveredAt *time.Time
var enclosureURL, enclosureType *string
var enclosureLength *int64
err := rows.Scan(&item.ID, &item.FeedURL, &guid, &title, &link, &description,
&content, &author, &pubDate, &discoveredAt, &imageURLsJSON, &tagsJSON,
&enclosureURL, &enclosureType, &enclosureLength)
if err != nil {
continue
}
item.GUID = StringValue(guid)
item.Title = StringValue(title)
item.Link = StringValue(link)
item.Description = StringValue(description)
item.Content = StringValue(content)
item.Author = StringValue(author)
item.PubDate = TimeValue(pubDate)
item.DiscoveredAt = TimeValue(discoveredAt)
// Parse image URLs from JSON array
if imageURLsJSON != nil && *imageURLsJSON != "" {
json.Unmarshal([]byte(*imageURLsJSON), &item.ImageURLs)
}
// Parse tags from JSON array
if tagsJSON != nil && *tagsJSON != "" {
json.Unmarshal([]byte(*tagsJSON), &item.Tags)
}
// Parse enclosure
if enclosureURL != nil && *enclosureURL != "" {
item.Enclosure = &Enclosure{
URL: *enclosureURL,
Type: StringValue(enclosureType),
}
if enclosureLength != nil {
item.Enclosure.Length = *enclosureLength
}
}
items = append(items, item)
}
return items, nil
}
// StartDomainCheckLoop runs HEAD requests on approved domains to verify they're reachable
func (c *Crawler) StartDomainCheckLoop() {
numWorkers := 100
// Buffered channel for domain work
workChan := make(chan *Domain, 100)
// Start workers
for i := 0; i < numWorkers; i++ {
go func() {
for domain := range workChan {
// Do HEAD request to verify domain is reachable
checkErr := c.checkDomain(domain.Host)
errStr := ""
if checkErr != nil {
errStr = checkErr.Error()
}
if err := c.markDomainChecked(domain.Host, errStr); err != nil {
fmt.Printf("Error marking domain %s as checked: %v\n", domain.Host, err)
}
}
}()
}
const fetchSize = 100
for {
domains, err := c.GetDomainsToCheck(fetchSize)
if err != nil {
fmt.Printf("Error fetching domains to check: %v\n", err)
}
if len(domains) == 0 {
time.Sleep(1 * time.Second)
continue
}
fmt.Printf("%s domain-check: %d domains to verify\n", time.Now().Format("15:04:05"), len(domains))
for _, domain := range domains {
workChan <- domain
}
time.Sleep(1 * time.Second)
}
}
// checkDomain performs a HEAD request to verify a domain is reachable
func (c *Crawler) checkDomain(host string) error {
url := "https://" + host
req, err := http.NewRequest("HEAD", url, nil)
if err != nil {
return err
}
req.Header.Set("User-Agent", c.UserAgent)
resp, err := c.client.Do(req)
if err != nil {
// Try HTTP fallback
url = "http://" + host
req, err = http.NewRequest("HEAD", url, nil)
if err != nil {
return err
}
req.Header.Set("User-Agent", c.UserAgent)
resp, err = c.client.Do(req)
if err != nil {
return err
}
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return fmt.Errorf("HTTP %d", resp.StatusCode)
}
return nil
}
// StartCrawlLoop runs the domain crawling loop independently (crawls checked domains)
func (c *Crawler) StartCrawlLoop() {
numWorkers := 100
// Buffered channel for domain work
workChan := make(chan *Domain, 100)
// Start workers
for i := 0; i < numWorkers; i++ {
go func() {
for domain := range workChan {
feedsFound, crawlErr := c.crawlHost(domain.Host)
errStr := ""
if crawlErr != nil {
errStr = crawlErr.Error()
}
if err := c.markDomainCrawled(domain.Host, feedsFound, errStr); err != nil {
fmt.Printf("Error marking domain %s as crawled: %v\n", domain.Host, err)
}
}
}()
}
const fetchSize = 100
for {
domains, err := c.GetDomainsToCrawl(fetchSize)
if err != nil {
fmt.Printf("Error fetching domains to crawl: %v\n", err)
}
if len(domains) == 0 {
c.displayedCrawlRate = 0
time.Sleep(1 * time.Second)
continue
}
fmt.Printf("%s crawl: %d domains to crawl\n", time.Now().Format("15:04:05"), len(domains))
for _, domain := range domains {
workChan <- domain
}
time.Sleep(1 * time.Second)
}
}
// StartCheckLoop runs the feed checking loop independently
func (c *Crawler) StartCheckLoop() {
numWorkers := 100
// Buffered channel for feed work
workChan := make(chan *Feed, 100)
// Start workers
for i := 0; i < numWorkers; i++ {
go func() {
for feed := range workChan {
c.CheckFeed(feed)
}
}()
}
const fetchSize = 100
for {
feeds, err := c.GetFeedsDueForCheck(fetchSize)
if err != nil {
fmt.Printf("Error fetching feeds: %v\n", err)
}
if len(feeds) == 0 {
c.displayedCheckRate = 0
time.Sleep(1 * time.Second)
continue
}
fmt.Printf("%s check: %d feeds to check\n", time.Now().Format("15:04:05"), len(feeds))
for _, feed := range feeds {
workChan <- feed
}
time.Sleep(1 * time.Second)
}
}
func (c *Crawler) crawlHost(host string) (feedsFound int, err error) {
atomic.AddInt32(&c.hostsProcessed, 1)
localVisited := make(map[string]bool)
pagesVisited := 0
// Try HTTPS first, fall back to HTTP if no pages were visited
c.crawlPage("https://"+host, host, 0, localVisited, &pagesVisited)
if pagesVisited == 0 {
c.crawlPage("http://"+host, host, 0, localVisited, &pagesVisited)
}
// Count feeds found for this specific host
feedsFound, _ = c.GetFeedCountByHost(host)
if pagesVisited == 0 {
return feedsFound, fmt.Errorf("could not connect")
}
return feedsFound, nil
}
func (c *Crawler) crawlPage(pageURL, sourceHost string, depth int, localVisited map[string]bool, pagesVisited *int) {
if *pagesVisited >= c.MaxPagesPerHost || depth > c.MaxDepth {
return
}
if localVisited[pageURL] {
return
}
if _, visited := c.visited.LoadOrStore(pageURL, true); visited {
return
}
localVisited[pageURL] = true
*pagesVisited++
body, contentType, headers, err := c.fetchPage(pageURL)
if err != nil {
return
}
if c.isFeedContent(body, contentType) {
c.processFeed(pageURL, sourceHost, body, headers)
return
}
doc, err := html.Parse(strings.NewReader(body))
if err != nil {
return
}
feedLinks := c.extractFeedLinks(doc, pageURL)
for _, fl := range feedLinks {
c.addFeed(fl.URL, fl.Type, sourceHost, pageURL)
}
anchorFeeds := c.extractAnchorFeeds(doc, pageURL)
for _, fl := range anchorFeeds {
c.addFeed(fl.URL, fl.Type, sourceHost, pageURL)
}
if depth < c.MaxDepth {
links := c.extractLinks(doc, pageURL)
for _, link := range links {
if shouldCrawl(link, pageURL) {
c.crawlPage(link, sourceHost, depth+1, localVisited, pagesVisited)
}
}
}
}
func (c *Crawler) fetchPage(pageURL string) (string, string, http.Header, error) {
req, err := http.NewRequest("GET", pageURL, nil)
if err != nil {
return "", "", nil, err
}
req.Header.Set("User-Agent", c.UserAgent)
resp, err := c.client.Do(req)
if err != nil {
return "", "", nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", "", nil, fmt.Errorf("status code: %d", resp.StatusCode)
}
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return "", "", nil, err
}
contentType := resp.Header.Get("Content-Type")
return string(bodyBytes), contentType, resp.Header, nil
}