Split domain processing into separate check and crawl loops
- StartDomainCheckLoop: DNS verification for unchecked domains (1000 workers) - StartFeedCrawlLoop: Feed discovery on DNS-verified domains (100 workers) This fixes starvation where 104M unchecked domains blocked 1.2M DNS-verified domains from ever being crawled for feeds. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
+56
-29
@@ -168,11 +168,11 @@ func (c *Crawler) domainCheck(host string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// StartDomainLoop runs the domain processing loop (domain_check + feed_crawl)
|
||||
func (c *Crawler) StartDomainLoop() {
|
||||
// StartDomainCheckLoop runs the DNS check loop (domain_check)
|
||||
// Checks unchecked domains to see if they resolve
|
||||
func (c *Crawler) StartDomainCheckLoop() {
|
||||
numWorkers := 1000
|
||||
|
||||
// Buffered channel for domain work
|
||||
workChan := make(chan *Domain, 1000)
|
||||
|
||||
// Start workers
|
||||
@@ -180,8 +180,6 @@ func (c *Crawler) StartDomainLoop() {
|
||||
go func() {
|
||||
for domain := range workChan {
|
||||
fh := domain.FullHost()
|
||||
if domain.CrawledAt.Equal(DomainStateUnchecked) {
|
||||
// domain_check: DNS lookup for liveness
|
||||
err := c.domainCheck(fh)
|
||||
errStr := ""
|
||||
if err != nil {
|
||||
@@ -191,18 +189,6 @@ func (c *Crawler) StartDomainLoop() {
|
||||
fmt.Printf("Error marking domain %s as checked: %v\n", fh, err)
|
||||
}
|
||||
atomic.AddInt32(&c.domainsChecked, 1)
|
||||
} else {
|
||||
// feed_crawl: crawl domain to discover feeds
|
||||
feedsFound, crawlErr := c.feedCrawl(fh)
|
||||
errStr := ""
|
||||
if crawlErr != nil {
|
||||
errStr = crawlErr.Error()
|
||||
}
|
||||
if err := c.markDomainCrawled(domain.Host, domain.TLD, feedsFound, errStr); err != nil {
|
||||
fmt.Printf("Error marking domain %s as crawled: %v\n", fh, err)
|
||||
}
|
||||
atomic.AddInt32(&c.domainsCrawled, 1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -214,9 +200,9 @@ func (c *Crawler) StartDomainLoop() {
|
||||
return
|
||||
}
|
||||
|
||||
domains, err := c.GetDomainsToProcess(fetchSize)
|
||||
domains, err := c.GetDomainsToCheck(fetchSize)
|
||||
if err != nil {
|
||||
fmt.Printf("Error fetching domains to process: %v\n", err)
|
||||
fmt.Printf("Error fetching domains to check: %v\n", err)
|
||||
}
|
||||
|
||||
if len(domains) == 0 {
|
||||
@@ -224,19 +210,60 @@ func (c *Crawler) StartDomainLoop() {
|
||||
continue
|
||||
}
|
||||
|
||||
// Count unchecked vs checked for logging
|
||||
unchecked := 0
|
||||
for _, d := range domains {
|
||||
if d.CrawledAt.Equal(DomainStateUnchecked) {
|
||||
unchecked++
|
||||
}
|
||||
}
|
||||
checked := len(domains) - unchecked
|
||||
fmt.Printf("%s domain_check: %d domains\n", time.Now().Format("15:04:05"), len(domains))
|
||||
|
||||
if unchecked > 0 || checked > 0 {
|
||||
fmt.Printf("%s domain: %d domain_check, %d feed_crawl\n", time.Now().Format("15:04:05"), unchecked, checked)
|
||||
for _, domain := range domains {
|
||||
workChan <- domain
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// StartFeedCrawlLoop runs the feed discovery loop (feed_crawl)
|
||||
// Crawls DNS-verified domains to find RSS/Atom feeds
|
||||
func (c *Crawler) StartFeedCrawlLoop() {
|
||||
numWorkers := 100 // Fewer workers since crawling is heavier than DNS
|
||||
|
||||
workChan := make(chan *Domain, 100)
|
||||
|
||||
// Start workers
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
go func() {
|
||||
for domain := range workChan {
|
||||
fh := domain.FullHost()
|
||||
feedsFound, crawlErr := c.feedCrawl(fh)
|
||||
errStr := ""
|
||||
if crawlErr != nil {
|
||||
errStr = crawlErr.Error()
|
||||
}
|
||||
if err := c.markDomainCrawled(domain.Host, domain.TLD, feedsFound, errStr); err != nil {
|
||||
fmt.Printf("Error marking domain %s as crawled: %v\n", fh, err)
|
||||
}
|
||||
atomic.AddInt32(&c.domainsCrawled, 1)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
const fetchSize = 100
|
||||
for {
|
||||
if c.IsShuttingDown() {
|
||||
close(workChan)
|
||||
return
|
||||
}
|
||||
|
||||
domains, err := c.GetDomainsToCrawl(fetchSize)
|
||||
if err != nil {
|
||||
fmt.Printf("Error fetching domains to crawl: %v\n", err)
|
||||
}
|
||||
|
||||
if len(domains) == 0 {
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
fmt.Printf("%s feed_crawl: %d domains\n", time.Now().Format("15:04:05"), len(domains))
|
||||
|
||||
for _, domain := range domains {
|
||||
workChan <- domain
|
||||
}
|
||||
|
||||
@@ -141,19 +141,38 @@ func (c *Crawler) getDomain(host string) (*Domain, error) {
|
||||
return domain, nil
|
||||
}
|
||||
|
||||
// GetDomainsToProcess returns domains needing processing (domain_check or feed_crawl)
|
||||
// crawled_at = zero time means needs domain_check, +1 sec means needs feed_crawl
|
||||
// Domains with errors are retried when crawled_at < now (scheduled by ErrorRetryDelay)
|
||||
func (c *Crawler) GetDomainsToProcess(limit int) ([]*Domain, error) {
|
||||
// GetDomainsToCheck returns unchecked domains needing DNS lookup (domain_check)
|
||||
// crawled_at = zero time means needs domain_check
|
||||
func (c *Crawler) GetDomainsToCheck(limit int) ([]*Domain, error) {
|
||||
rows, err := c.db.Query(`
|
||||
SELECT host, status, crawled_at, feeds_found, last_error, tld
|
||||
FROM domains
|
||||
WHERE status = 'pass'
|
||||
AND crawled_at = '0001-01-01 00:00:00'
|
||||
AND last_error IS NULL
|
||||
LIMIT $1
|
||||
`, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
return c.scanDomains(rows)
|
||||
}
|
||||
|
||||
// GetDomainsToCrawl returns DNS-verified domains needing feed discovery (feed_crawl)
|
||||
// crawled_at = +1 sec means passed DNS check, ready for crawl
|
||||
// Also includes domains with errors that are due for retry
|
||||
func (c *Crawler) GetDomainsToCrawl(limit int) ([]*Domain, error) {
|
||||
now := time.Now()
|
||||
rows, err := c.db.Query(`
|
||||
SELECT host, status, crawled_at, feeds_found, last_error, tld
|
||||
FROM domains
|
||||
WHERE status = 'pass' AND (
|
||||
(crawled_at < '0001-01-02' AND last_error IS NULL) -- new domains
|
||||
OR (crawled_at < $1 AND last_error IS NOT NULL) -- retry errors after delay
|
||||
(crawled_at = '0001-01-01 00:00:01' AND last_error IS NULL) -- passed DNS, ready to crawl
|
||||
OR (crawled_at < $1 AND crawled_at > '0001-01-01 00:00:01' AND last_error IS NOT NULL) -- retry errors
|
||||
)
|
||||
ORDER BY crawled_at ASC
|
||||
ORDER BY last_error IS NULL DESC, crawled_at ASC
|
||||
LIMIT $2
|
||||
`, now, limit)
|
||||
if err != nil {
|
||||
|
||||
@@ -43,8 +43,11 @@ func main() {
|
||||
// TLD sync loop (background) - syncs with IANA, marks dead TLDs, adds new ones
|
||||
go crawler.startTLDSyncLoop()
|
||||
|
||||
// Domain loop (background) - domain_check + feed_crawl
|
||||
go crawler.StartDomainLoop()
|
||||
// Domain check loop (background) - DNS verification
|
||||
go crawler.StartDomainCheckLoop()
|
||||
|
||||
// Feed crawl loop (background) - feed discovery on DNS-verified domains
|
||||
go crawler.StartFeedCrawlLoop()
|
||||
|
||||
// Wait for shutdown signal
|
||||
sig := <-sigChan
|
||||
|
||||
Reference in New Issue
Block a user