Files
crawler/crawler.go
primal 6eaa39f9db Remove publishing code - now handled by publish service
Publishing functionality has been moved to the standalone publish service.
Removed:
- publisher.go, pds_auth.go, pds_records.go, image.go, handle.go
- StartPublishLoop and related functions from crawler.go
- Publish loop invocation from main.go

Updated CLAUDE.md to reflect the new architecture.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 15:40:49 -05:00

390 lines
9.3 KiB
Go

package main
import (
"context"
"crypto/tls"
"fmt"
"io"
"net"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/html"
)
type Crawler struct {
MaxDepth int
MaxPagesPerHost int
Timeout time.Duration
UserAgent string
visited sync.Map
feedsMu sync.Mutex
client *http.Client
domainsCrawled int32 // feed_crawl: domains crawled for feed discovery
domainsChecked int32 // domain_check: domains checked for liveness
feedsChecked int32 // feed_check: feeds checked for new items
startTime time.Time
db *DB
domainsImported int32
shutdownCh chan struct{} // closed on shutdown to signal goroutines
}
func NewCrawler(connString string) (*Crawler, error) {
db, err := OpenDatabase(connString)
if err != nil {
return nil, fmt.Errorf("failed to open database: %v", err)
}
// Custom transport with longer timeouts (HTTP/2 disabled for compatibility)
transport := &http.Transport{
TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
NextProtos: []string{"http/1.1"}, // Force HTTP/1.1 for compatibility
},
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: false,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: 60 * time.Second,
}
return &Crawler{
MaxDepth: 10,
MaxPagesPerHost: 10,
Timeout: 60 * time.Second,
UserAgent: "Mozilla/5.0 (compatible; FeedCrawler/1.0; +https://1440.news)",
startTime: time.Now(),
db: db,
shutdownCh: make(chan struct{}),
client: &http.Client{
Timeout: 60 * time.Second,
Transport: transport,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
if len(via) >= 10 {
return fmt.Errorf("stopped after 10 redirects")
}
return nil
},
},
}, nil
}
// IsShuttingDown returns true if shutdown has been initiated
func (c *Crawler) IsShuttingDown() bool {
select {
case <-c.shutdownCh:
return true
default:
return false
}
}
func (c *Crawler) Close() error {
// Signal all goroutines to stop
close(c.shutdownCh)
// Give goroutines time to finish current operations
fmt.Println("Waiting for goroutines to finish...")
time.Sleep(2 * time.Second)
if c.db != nil {
fmt.Println("Closing database...")
return c.db.Close()
}
return nil
}
// StartCleanupLoop runs item cleanup once per week
func (c *Crawler) StartCleanupLoop() {
for {
if c.IsShuttingDown() {
return
}
deleted, err := c.CleanupOldItems()
if err != nil {
fmt.Printf("Cleanup error: %v\n", err)
} else if deleted > 0 {
fmt.Printf("Cleanup: removed %d old items\n", deleted)
}
time.Sleep(7 * 24 * time.Hour)
}
}
// StartMaintenanceLoop performs periodic database maintenance
func (c *Crawler) StartMaintenanceLoop() {
vacuumTicker := time.NewTicker(24 * time.Hour)
analyzeTicker := time.NewTicker(1 * time.Hour)
defer vacuumTicker.Stop()
defer analyzeTicker.Stop()
for {
select {
case <-analyzeTicker.C:
// Update statistics for query planner
if _, err := c.db.Exec("ANALYZE"); err != nil {
fmt.Printf("ANALYZE error: %v\n", err)
}
case <-vacuumTicker.C:
// Reclaim dead tuple space (VACUUM is lighter than VACUUM FULL)
fmt.Println("Running VACUUM...")
if _, err := c.db.Exec("VACUUM"); err != nil {
fmt.Printf("VACUUM error: %v\n", err)
} else {
fmt.Println("VACUUM complete")
}
}
}
}
// dnsResolver uses local caching DNS (infra-dns) with fallback to system
var dnsResolver = &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
d := net.Dialer{Timeout: 2 * time.Second}
// Try local caching DNS first (CoreDNS on proxy network)
conn, err := d.DialContext(ctx, "udp", "infra-dns:53")
if err == nil {
return conn, nil
}
// Fallback to system DNS
return d.DialContext(ctx, network, address)
},
}
// domainCheck performs a DNS lookup to check if a domain resolves
func (c *Crawler) domainCheck(host string) error {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, err := dnsResolver.LookupHost(ctx, host)
return err
}
// StartDomainLoop runs the domain processing loop (domain_check + feed_crawl)
func (c *Crawler) StartDomainLoop() {
numWorkers := 1000
// Buffered channel for domain work
workChan := make(chan *Domain, 1000)
// Start workers
for i := 0; i < numWorkers; i++ {
go func() {
for domain := range workChan {
fh := domain.FullHost()
if domain.CrawledAt.Equal(DomainStateUnchecked) {
// domain_check: DNS lookup for liveness
err := c.domainCheck(fh)
errStr := ""
if err != nil {
errStr = err.Error()
}
if err := c.markDomainChecked(domain.Host, domain.TLD, errStr); err != nil {
fmt.Printf("Error marking domain %s as checked: %v\n", fh, err)
}
atomic.AddInt32(&c.domainsChecked, 1)
} else {
// feed_crawl: crawl domain to discover feeds
feedsFound, crawlErr := c.feedCrawl(fh)
errStr := ""
if crawlErr != nil {
errStr = crawlErr.Error()
}
if err := c.markDomainCrawled(domain.Host, domain.TLD, feedsFound, errStr); err != nil {
fmt.Printf("Error marking domain %s as crawled: %v\n", fh, err)
}
atomic.AddInt32(&c.domainsCrawled, 1)
}
}
}()
}
const fetchSize = 1000
for {
if c.IsShuttingDown() {
close(workChan)
return
}
domains, err := c.GetDomainsToProcess(fetchSize)
if err != nil {
fmt.Printf("Error fetching domains to process: %v\n", err)
}
if len(domains) == 0 {
time.Sleep(1 * time.Second)
continue
}
// Count unchecked vs checked for logging
unchecked := 0
for _, d := range domains {
if d.CrawledAt.Equal(DomainStateUnchecked) {
unchecked++
}
}
checked := len(domains) - unchecked
if unchecked > 0 || checked > 0 {
fmt.Printf("%s domain: %d domain_check, %d feed_crawl\n", time.Now().Format("15:04:05"), unchecked, checked)
}
for _, domain := range domains {
workChan <- domain
}
time.Sleep(1 * time.Second)
}
}
// StartFeedCheckLoop runs the feed_check loop (checking feeds for new items)
func (c *Crawler) StartFeedCheckLoop() {
numWorkers := 1000
// Buffered channel for feed work
workChan := make(chan *Feed, 1000)
// Start workers
for i := 0; i < numWorkers; i++ {
go func() {
for feed := range workChan {
c.CheckFeed(feed)
}
}()
}
const fetchSize = 100
for {
if c.IsShuttingDown() {
close(workChan)
return
}
feeds, err := c.GetFeedsDueForCheck(fetchSize)
if err != nil {
fmt.Printf("Error fetching feeds: %v\n", err)
}
if len(feeds) == 0 {
time.Sleep(1 * time.Second)
continue
}
fmt.Printf("%s feed_check: %d feeds\n", time.Now().Format("15:04:05"), len(feeds))
for _, feed := range feeds {
workChan <- feed
}
time.Sleep(1 * time.Second)
}
}
// feedCrawl crawls a domain to discover RSS/Atom feeds
func (c *Crawler) feedCrawl(host string) (feedsFound int, err error) {
atomic.AddInt32(&c.domainsCrawled, 1)
localVisited := make(map[string]bool)
pagesVisited := 0
// Try HTTPS first, fall back to HTTP if no pages were visited
c.crawlPage("https://"+host, host, 0, localVisited, &pagesVisited)
if pagesVisited == 0 {
c.crawlPage("http://"+host, host, 0, localVisited, &pagesVisited)
}
// Count feeds found for this specific host
feedsFound, _ = c.GetFeedCountByHost(host)
if pagesVisited == 0 {
return feedsFound, fmt.Errorf("could not connect")
}
return feedsFound, nil
}
func (c *Crawler) crawlPage(pageURL, sourceHost string, depth int, localVisited map[string]bool, pagesVisited *int) {
if *pagesVisited >= c.MaxPagesPerHost || depth > c.MaxDepth {
return
}
if localVisited[pageURL] {
return
}
if _, visited := c.visited.LoadOrStore(pageURL, true); visited {
return
}
localVisited[pageURL] = true
*pagesVisited++
body, contentType, headers, err := c.fetchPage(pageURL)
if err != nil {
return
}
if c.isFeedContent(body, contentType) {
c.processFeed(pageURL, sourceHost, body, headers)
return
}
doc, err := html.Parse(strings.NewReader(body))
if err != nil {
return
}
feedLinks := c.extractFeedLinks(doc, pageURL)
for _, fl := range feedLinks {
c.addFeed(fl.URL, fl.Type, sourceHost, pageURL)
}
anchorFeeds := c.extractAnchorFeeds(doc, pageURL)
for _, fl := range anchorFeeds {
c.addFeed(fl.URL, fl.Type, sourceHost, pageURL)
}
if depth < c.MaxDepth {
links := c.extractLinks(doc, pageURL)
for _, link := range links {
if shouldCrawl(link, pageURL) {
c.crawlPage(link, sourceHost, depth+1, localVisited, pagesVisited)
}
}
}
}
func (c *Crawler) fetchPage(pageURL string) (string, string, http.Header, error) {
req, err := http.NewRequest("GET", pageURL, nil)
if err != nil {
return "", "", nil, err
}
req.Header.Set("User-Agent", c.UserAgent)
resp, err := c.client.Do(req)
if err != nil {
return "", "", nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", "", nil, fmt.Errorf("status code: %d", resp.StatusCode)
}
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return "", "", nil, err
}
contentType := resp.Header.Get("Content-Type")
return string(bodyBytes), contentType, resp.Header, nil
}