package main import ( "context" "crypto/tls" "fmt" "io" "net" "net/http" "strings" "sync" "sync/atomic" "time" "golang.org/x/net/html" ) type Crawler struct { MaxDepth int MaxPagesPerHost int Timeout time.Duration UserAgent string visited sync.Map feedsMu sync.Mutex client *http.Client domainsCrawled int32 // feed_crawl: domains crawled for feed discovery domainsChecked int32 // domain_check: domains checked for liveness feedsChecked int32 // feed_check: feeds checked for new items startTime time.Time db *DB domainsImported int32 shutdownCh chan struct{} // closed on shutdown to signal goroutines } func NewCrawler(connString string) (*Crawler, error) { db, err := OpenDatabase(connString) if err != nil { return nil, fmt.Errorf("failed to open database: %v", err) } // Custom transport with longer timeouts (HTTP/2 disabled for compatibility) transport := &http.Transport{ TLSClientConfig: &tls.Config{ MinVersion: tls.VersionTLS12, NextProtos: []string{"http/1.1"}, // Force HTTP/1.1 for compatibility }, DialContext: (&net.Dialer{ Timeout: 30 * time.Second, KeepAlive: 30 * time.Second, }).DialContext, ForceAttemptHTTP2: false, MaxIdleConns: 100, IdleConnTimeout: 90 * time.Second, TLSHandshakeTimeout: 30 * time.Second, ExpectContinueTimeout: 1 * time.Second, ResponseHeaderTimeout: 60 * time.Second, } return &Crawler{ MaxDepth: 10, MaxPagesPerHost: 10, Timeout: 60 * time.Second, UserAgent: "Mozilla/5.0 (compatible; FeedCrawler/1.0; +https://1440.news)", startTime: time.Now(), db: db, shutdownCh: make(chan struct{}), client: &http.Client{ Timeout: 60 * time.Second, Transport: transport, CheckRedirect: func(req *http.Request, via []*http.Request) error { if len(via) >= 10 { return fmt.Errorf("stopped after 10 redirects") } return nil }, }, }, nil } // IsShuttingDown returns true if shutdown has been initiated func (c *Crawler) IsShuttingDown() bool { select { case <-c.shutdownCh: return true default: return false } } func (c *Crawler) Close() error { // Signal all goroutines to stop close(c.shutdownCh) // Give goroutines time to finish current operations fmt.Println("Waiting for goroutines to finish...") time.Sleep(2 * time.Second) if c.db != nil { fmt.Println("Closing database...") return c.db.Close() } return nil } // StartCleanupLoop runs item cleanup once per week func (c *Crawler) StartCleanupLoop() { for { if c.IsShuttingDown() { return } deleted, err := c.CleanupOldItems() if err != nil { fmt.Printf("Cleanup error: %v\n", err) } else if deleted > 0 { fmt.Printf("Cleanup: removed %d old items\n", deleted) } time.Sleep(7 * 24 * time.Hour) } } // StartMaintenanceLoop performs periodic database maintenance func (c *Crawler) StartMaintenanceLoop() { vacuumTicker := time.NewTicker(24 * time.Hour) analyzeTicker := time.NewTicker(1 * time.Hour) defer vacuumTicker.Stop() defer analyzeTicker.Stop() for { select { case <-analyzeTicker.C: // Update statistics for query planner if _, err := c.db.Exec("ANALYZE"); err != nil { fmt.Printf("ANALYZE error: %v\n", err) } case <-vacuumTicker.C: // Reclaim dead tuple space (VACUUM is lighter than VACUUM FULL) fmt.Println("Running VACUUM...") if _, err := c.db.Exec("VACUUM"); err != nil { fmt.Printf("VACUUM error: %v\n", err) } else { fmt.Println("VACUUM complete") } } } } // dnsResolver uses local caching DNS (infra-dns) var dnsResolver = &net.Resolver{ PreferGo: true, Dial: func(ctx context.Context, network, address string) (net.Conn, error) { d := net.Dialer{Timeout: 5 * time.Second} return d.DialContext(ctx, "udp", "infra-dns:53") }, } // domainCheck performs a DNS lookup to check if a domain resolves func (c *Crawler) domainCheck(host string) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _, err := dnsResolver.LookupHost(ctx, host) return err } // StartDomainCheckLoop runs the DNS check loop (domain_check) // Checks unchecked domains to see if they resolve func (c *Crawler) StartDomainCheckLoop() { numWorkers := 1000 workChan := make(chan *Domain, 1000) // Start workers for i := 0; i < numWorkers; i++ { go func() { for domain := range workChan { fh := domain.FullHost() err := c.domainCheck(fh) errStr := "" if err != nil { errStr = err.Error() } if err := c.markDomainChecked(domain.Host, domain.TLD, errStr); err != nil { fmt.Printf("Error marking domain %s as checked: %v\n", fh, err) } atomic.AddInt32(&c.domainsChecked, 1) } }() } const fetchSize = 1000 for { if c.IsShuttingDown() { close(workChan) return } domains, err := c.GetDomainsToCheck(fetchSize) if err != nil { fmt.Printf("Error fetching domains to check: %v\n", err) } if len(domains) == 0 { time.Sleep(1 * time.Second) continue } fmt.Printf("%s domain_check: %d domains\n", time.Now().Format("15:04:05"), len(domains)) for _, domain := range domains { workChan <- domain } time.Sleep(1 * time.Second) } } // StartFeedCrawlLoop runs the feed discovery loop (feed_crawl) // Crawls DNS-verified domains to find RSS/Atom feeds func (c *Crawler) StartFeedCrawlLoop() { numWorkers := 100 // Fewer workers since crawling is heavier than DNS workChan := make(chan *Domain, 100) // Start workers for i := 0; i < numWorkers; i++ { go func() { for domain := range workChan { fh := domain.FullHost() feedsFound, crawlErr := c.feedCrawl(fh) errStr := "" if crawlErr != nil { errStr = crawlErr.Error() } if err := c.markDomainCrawled(domain.Host, domain.TLD, feedsFound, errStr); err != nil { fmt.Printf("Error marking domain %s as crawled: %v\n", fh, err) } atomic.AddInt32(&c.domainsCrawled, 1) } }() } const fetchSize = 100 for { if c.IsShuttingDown() { close(workChan) return } domains, err := c.GetDomainsToCrawl(fetchSize) if err != nil { fmt.Printf("Error fetching domains to crawl: %v\n", err) } if len(domains) == 0 { time.Sleep(1 * time.Second) continue } fmt.Printf("%s feed_crawl: %d domains\n", time.Now().Format("15:04:05"), len(domains)) for _, domain := range domains { workChan <- domain } time.Sleep(1 * time.Second) } } // StartFeedCheckLoop runs the feed_check loop (checking feeds for new items) func (c *Crawler) StartFeedCheckLoop() { numWorkers := 1000 // Buffered channel for feed work workChan := make(chan *Feed, 1000) // Start workers for i := 0; i < numWorkers; i++ { go func() { for feed := range workChan { c.CheckFeed(feed) } }() } const fetchSize = 100 for { if c.IsShuttingDown() { close(workChan) return } feeds, err := c.GetFeedsDueForCheck(fetchSize) if err != nil { fmt.Printf("Error fetching feeds: %v\n", err) } if len(feeds) == 0 { time.Sleep(1 * time.Second) continue } fmt.Printf("%s feed_check: %d feeds\n", time.Now().Format("15:04:05"), len(feeds)) for _, feed := range feeds { workChan <- feed } time.Sleep(1 * time.Second) } } // feedCrawl crawls a domain to discover RSS/Atom feeds func (c *Crawler) feedCrawl(host string) (feedsFound int, err error) { atomic.AddInt32(&c.domainsCrawled, 1) localVisited := make(map[string]bool) pagesVisited := 0 // Try HTTPS first, fall back to HTTP if no pages were visited c.crawlPage("https://"+host, host, 0, localVisited, &pagesVisited) if pagesVisited == 0 { c.crawlPage("http://"+host, host, 0, localVisited, &pagesVisited) } // Count feeds found for this specific host feedsFound, _ = c.GetFeedCountByHost(host) if pagesVisited == 0 { return feedsFound, fmt.Errorf("could not connect") } return feedsFound, nil } func (c *Crawler) crawlPage(pageURL, sourceHost string, depth int, localVisited map[string]bool, pagesVisited *int) { if *pagesVisited >= c.MaxPagesPerHost || depth > c.MaxDepth { return } if localVisited[pageURL] { return } if _, visited := c.visited.LoadOrStore(pageURL, true); visited { return } localVisited[pageURL] = true *pagesVisited++ body, contentType, headers, err := c.fetchPage(pageURL) if err != nil { return } if c.isFeedContent(body, contentType) { c.processFeed(pageURL, sourceHost, body, headers) return } doc, err := html.Parse(strings.NewReader(body)) if err != nil { return } feedLinks := c.extractFeedLinks(doc, pageURL) for _, fl := range feedLinks { c.addFeed(fl.URL, fl.Type, sourceHost, pageURL) } anchorFeeds := c.extractAnchorFeeds(doc, pageURL) for _, fl := range anchorFeeds { c.addFeed(fl.URL, fl.Type, sourceHost, pageURL) } if depth < c.MaxDepth { links := c.extractLinks(doc, pageURL) for _, link := range links { if shouldCrawl(link, pageURL) { c.crawlPage(link, sourceHost, depth+1, localVisited, pagesVisited) } } } } func (c *Crawler) fetchPage(pageURL string) (string, string, http.Header, error) { req, err := http.NewRequest("GET", pageURL, nil) if err != nil { return "", "", nil, err } req.Header.Set("User-Agent", c.UserAgent) resp, err := c.client.Do(req) if err != nil { return "", "", nil, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return "", "", nil, fmt.Errorf("status code: %d", resp.StatusCode) } bodyBytes, err := io.ReadAll(resp.Body) if err != nil { return "", "", nil, err } contentType := resp.Header.Get("Content-Type") return string(bodyBytes), contentType, resp.Header, nil }