diff --git a/crawler.go b/crawler.go index fe0dad6..85fa706 100644 --- a/crawler.go +++ b/crawler.go @@ -168,11 +168,11 @@ func (c *Crawler) domainCheck(host string) error { return err } -// StartDomainLoop runs the domain processing loop (domain_check + feed_crawl) -func (c *Crawler) StartDomainLoop() { +// StartDomainCheckLoop runs the DNS check loop (domain_check) +// Checks unchecked domains to see if they resolve +func (c *Crawler) StartDomainCheckLoop() { numWorkers := 1000 - // Buffered channel for domain work workChan := make(chan *Domain, 1000) // Start workers @@ -180,29 +180,15 @@ func (c *Crawler) StartDomainLoop() { go func() { for domain := range workChan { fh := domain.FullHost() - if domain.CrawledAt.Equal(DomainStateUnchecked) { - // domain_check: DNS lookup for liveness - err := c.domainCheck(fh) - errStr := "" - if err != nil { - errStr = err.Error() - } - if err := c.markDomainChecked(domain.Host, domain.TLD, errStr); err != nil { - fmt.Printf("Error marking domain %s as checked: %v\n", fh, err) - } - atomic.AddInt32(&c.domainsChecked, 1) - } else { - // feed_crawl: crawl domain to discover feeds - feedsFound, crawlErr := c.feedCrawl(fh) - errStr := "" - if crawlErr != nil { - errStr = crawlErr.Error() - } - if err := c.markDomainCrawled(domain.Host, domain.TLD, feedsFound, errStr); err != nil { - fmt.Printf("Error marking domain %s as crawled: %v\n", fh, err) - } - atomic.AddInt32(&c.domainsCrawled, 1) + err := c.domainCheck(fh) + errStr := "" + if err != nil { + errStr = err.Error() } + if err := c.markDomainChecked(domain.Host, domain.TLD, errStr); err != nil { + fmt.Printf("Error marking domain %s as checked: %v\n", fh, err) + } + atomic.AddInt32(&c.domainsChecked, 1) } }() } @@ -214,9 +200,9 @@ func (c *Crawler) StartDomainLoop() { return } - domains, err := c.GetDomainsToProcess(fetchSize) + domains, err := c.GetDomainsToCheck(fetchSize) if err != nil { - fmt.Printf("Error fetching domains to process: %v\n", err) + fmt.Printf("Error fetching domains to check: %v\n", err) } if len(domains) == 0 { @@ -224,19 +210,60 @@ func (c *Crawler) StartDomainLoop() { continue } - // Count unchecked vs checked for logging - unchecked := 0 - for _, d := range domains { - if d.CrawledAt.Equal(DomainStateUnchecked) { - unchecked++ - } - } - checked := len(domains) - unchecked + fmt.Printf("%s domain_check: %d domains\n", time.Now().Format("15:04:05"), len(domains)) - if unchecked > 0 || checked > 0 { - fmt.Printf("%s domain: %d domain_check, %d feed_crawl\n", time.Now().Format("15:04:05"), unchecked, checked) + for _, domain := range domains { + workChan <- domain } + time.Sleep(1 * time.Second) + } +} + +// StartFeedCrawlLoop runs the feed discovery loop (feed_crawl) +// Crawls DNS-verified domains to find RSS/Atom feeds +func (c *Crawler) StartFeedCrawlLoop() { + numWorkers := 100 // Fewer workers since crawling is heavier than DNS + + workChan := make(chan *Domain, 100) + + // Start workers + for i := 0; i < numWorkers; i++ { + go func() { + for domain := range workChan { + fh := domain.FullHost() + feedsFound, crawlErr := c.feedCrawl(fh) + errStr := "" + if crawlErr != nil { + errStr = crawlErr.Error() + } + if err := c.markDomainCrawled(domain.Host, domain.TLD, feedsFound, errStr); err != nil { + fmt.Printf("Error marking domain %s as crawled: %v\n", fh, err) + } + atomic.AddInt32(&c.domainsCrawled, 1) + } + }() + } + + const fetchSize = 100 + for { + if c.IsShuttingDown() { + close(workChan) + return + } + + domains, err := c.GetDomainsToCrawl(fetchSize) + if err != nil { + fmt.Printf("Error fetching domains to crawl: %v\n", err) + } + + if len(domains) == 0 { + time.Sleep(1 * time.Second) + continue + } + + fmt.Printf("%s feed_crawl: %d domains\n", time.Now().Format("15:04:05"), len(domains)) + for _, domain := range domains { workChan <- domain } diff --git a/domain.go b/domain.go index 9e8767d..c2ca453 100644 --- a/domain.go +++ b/domain.go @@ -141,19 +141,38 @@ func (c *Crawler) getDomain(host string) (*Domain, error) { return domain, nil } -// GetDomainsToProcess returns domains needing processing (domain_check or feed_crawl) -// crawled_at = zero time means needs domain_check, +1 sec means needs feed_crawl -// Domains with errors are retried when crawled_at < now (scheduled by ErrorRetryDelay) -func (c *Crawler) GetDomainsToProcess(limit int) ([]*Domain, error) { +// GetDomainsToCheck returns unchecked domains needing DNS lookup (domain_check) +// crawled_at = zero time means needs domain_check +func (c *Crawler) GetDomainsToCheck(limit int) ([]*Domain, error) { + rows, err := c.db.Query(` + SELECT host, status, crawled_at, feeds_found, last_error, tld + FROM domains + WHERE status = 'pass' + AND crawled_at = '0001-01-01 00:00:00' + AND last_error IS NULL + LIMIT $1 + `, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + return c.scanDomains(rows) +} + +// GetDomainsToCrawl returns DNS-verified domains needing feed discovery (feed_crawl) +// crawled_at = +1 sec means passed DNS check, ready for crawl +// Also includes domains with errors that are due for retry +func (c *Crawler) GetDomainsToCrawl(limit int) ([]*Domain, error) { now := time.Now() rows, err := c.db.Query(` SELECT host, status, crawled_at, feeds_found, last_error, tld FROM domains WHERE status = 'pass' AND ( - (crawled_at < '0001-01-02' AND last_error IS NULL) -- new domains - OR (crawled_at < $1 AND last_error IS NOT NULL) -- retry errors after delay + (crawled_at = '0001-01-01 00:00:01' AND last_error IS NULL) -- passed DNS, ready to crawl + OR (crawled_at < $1 AND crawled_at > '0001-01-01 00:00:01' AND last_error IS NOT NULL) -- retry errors ) - ORDER BY crawled_at ASC + ORDER BY last_error IS NULL DESC, crawled_at ASC LIMIT $2 `, now, limit) if err != nil { diff --git a/main.go b/main.go index b272b26..6e7eb80 100644 --- a/main.go +++ b/main.go @@ -43,8 +43,11 @@ func main() { // TLD sync loop (background) - syncs with IANA, marks dead TLDs, adds new ones go crawler.startTLDSyncLoop() - // Domain loop (background) - domain_check + feed_crawl - go crawler.StartDomainLoop() + // Domain check loop (background) - DNS verification + go crawler.StartDomainCheckLoop() + + // Feed crawl loop (background) - feed discovery on DNS-verified domains + go crawler.StartFeedCrawlLoop() // Wait for shutdown signal sig := <-sigChan