Add PebbleDB storage, domain tracking, and web dashboard
- Split main.go into separate files for better organization: crawler.go, domain.go, feed.go, parser.go, html.go, util.go - Add PebbleDB for persistent storage of feeds and domains - Store feeds with metadata: title, TTL, update frequency, ETag, etc. - Track domains with crawl status (uncrawled/crawled/error) - Normalize URLs by stripping scheme and www. prefix - Add web dashboard on port 4321 with real-time stats: - Crawl progress with completion percentage - Feed counts by type (RSS/Atom) - Top TLDs and domains by feed count - Recent feeds table - Filter out comment feeds from results Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
+237
@@ -0,0 +1,237 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/pebble"
|
||||
"golang.org/x/net/html"
|
||||
)
|
||||
|
||||
type Crawler struct {
|
||||
MaxDepth int
|
||||
MaxPagesPerHost int
|
||||
Timeout time.Duration
|
||||
UserAgent string
|
||||
visited sync.Map
|
||||
feedsMu sync.Mutex
|
||||
client *http.Client
|
||||
hostsProcessed int32
|
||||
db *pebble.DB
|
||||
}
|
||||
|
||||
func NewCrawler(dbPath string) (*Crawler, error) {
|
||||
db, err := pebble.Open(dbPath, &pebble.Options{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open pebble db: %v", err)
|
||||
}
|
||||
|
||||
return &Crawler{
|
||||
MaxDepth: 10,
|
||||
MaxPagesPerHost: 10,
|
||||
Timeout: 10 * time.Second,
|
||||
UserAgent: "FeedCrawler/1.0",
|
||||
db: db,
|
||||
client: &http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
||||
if len(via) >= 10 {
|
||||
return fmt.Errorf("stopped after 10 redirects")
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Crawler) Close() error {
|
||||
if c.db != nil {
|
||||
return c.db.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CrawlUncrawledDomains fetches uncrawled domains and crawls them
|
||||
func (c *Crawler) CrawlUncrawledDomains() error {
|
||||
domains, err := c.GetUncrawledDomains()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get uncrawled domains: %v", err)
|
||||
}
|
||||
|
||||
if len(domains) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shuffle for randomized crawling
|
||||
rand.Shuffle(len(domains), func(i, j int) {
|
||||
domains[i], domains[j] = domains[j], domains[i]
|
||||
})
|
||||
|
||||
numWorkers := runtime.NumCPU() - 1
|
||||
if numWorkers < 1 {
|
||||
numWorkers = 1
|
||||
}
|
||||
|
||||
type crawlResult struct {
|
||||
host string
|
||||
feedsFound int
|
||||
lastError string
|
||||
}
|
||||
|
||||
domainChan := make(chan *Domain, numWorkers*2)
|
||||
resultChan := make(chan crawlResult, numWorkers*2)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Start workers
|
||||
for i := 0; i < numWorkers; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for domain := range domainChan {
|
||||
feedsFound, crawlErr := c.crawlHost(domain.Host)
|
||||
errStr := ""
|
||||
if crawlErr != nil {
|
||||
errStr = crawlErr.Error()
|
||||
}
|
||||
resultChan <- crawlResult{
|
||||
host: domain.Host,
|
||||
feedsFound: feedsFound,
|
||||
lastError: errStr,
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Start result processor
|
||||
done := make(chan bool)
|
||||
go func() {
|
||||
for result := range resultChan {
|
||||
if err := c.markDomainCrawled(result.host, result.feedsFound, result.lastError); err != nil {
|
||||
fmt.Printf("Error marking domain %s as crawled: %v\n", result.host, err)
|
||||
}
|
||||
}
|
||||
done <- true
|
||||
}()
|
||||
|
||||
// Send domains to workers
|
||||
for _, domain := range domains {
|
||||
domainChan <- domain
|
||||
}
|
||||
|
||||
close(domainChan)
|
||||
wg.Wait()
|
||||
close(resultChan)
|
||||
<-done
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Crawler) crawlHost(host string) (feedsFound int, err error) {
|
||||
atomic.AddInt32(&c.hostsProcessed, 1)
|
||||
|
||||
// Count feeds before crawling
|
||||
initialCount, _ := c.GetFeedCount()
|
||||
|
||||
localVisited := make(map[string]bool)
|
||||
pagesVisited := 0
|
||||
|
||||
// Try HTTPS first, fall back to HTTP if no pages were visited
|
||||
c.crawlPage("https://"+host, host, 0, localVisited, &pagesVisited)
|
||||
if pagesVisited == 0 {
|
||||
c.crawlPage("http://"+host, host, 0, localVisited, &pagesVisited)
|
||||
}
|
||||
|
||||
// Count feeds after crawling
|
||||
finalCount, _ := c.GetFeedCount()
|
||||
feedsFound = finalCount - initialCount
|
||||
|
||||
if pagesVisited == 0 {
|
||||
return feedsFound, fmt.Errorf("could not connect")
|
||||
}
|
||||
|
||||
return feedsFound, nil
|
||||
}
|
||||
|
||||
func (c *Crawler) crawlPage(pageURL, sourceHost string, depth int, localVisited map[string]bool, pagesVisited *int) {
|
||||
if *pagesVisited >= c.MaxPagesPerHost || depth > c.MaxDepth {
|
||||
return
|
||||
}
|
||||
|
||||
if localVisited[pageURL] {
|
||||
return
|
||||
}
|
||||
|
||||
if _, visited := c.visited.LoadOrStore(pageURL, true); visited {
|
||||
return
|
||||
}
|
||||
|
||||
localVisited[pageURL] = true
|
||||
*pagesVisited++
|
||||
|
||||
body, contentType, headers, err := c.fetchPage(pageURL)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if c.isFeedContent(body, contentType) {
|
||||
c.processFeed(pageURL, sourceHost, body, headers)
|
||||
return
|
||||
}
|
||||
|
||||
doc, err := html.Parse(strings.NewReader(body))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
feedLinks := c.extractFeedLinks(doc, pageURL)
|
||||
for _, fl := range feedLinks {
|
||||
c.addFeed(fl.URL, fl.Type, sourceHost, pageURL)
|
||||
}
|
||||
|
||||
anchorFeeds := c.extractAnchorFeeds(doc, pageURL)
|
||||
for _, fl := range anchorFeeds {
|
||||
c.addFeed(fl.URL, fl.Type, sourceHost, pageURL)
|
||||
}
|
||||
|
||||
if depth < c.MaxDepth {
|
||||
links := c.extractLinks(doc, pageURL)
|
||||
for _, link := range links {
|
||||
if shouldCrawl(link, pageURL) {
|
||||
c.crawlPage(link, sourceHost, depth+1, localVisited, pagesVisited)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Crawler) fetchPage(pageURL string) (string, string, http.Header, error) {
|
||||
req, err := http.NewRequest("GET", pageURL, nil)
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
}
|
||||
req.Header.Set("User-Agent", c.UserAgent)
|
||||
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return "", "", nil, fmt.Errorf("status code: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
bodyBytes, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return "", "", nil, err
|
||||
}
|
||||
|
||||
contentType := resp.Header.Get("Content-Type")
|
||||
return string(bodyBytes), contentType, resp.Header, nil
|
||||
}
|
||||
Reference in New Issue
Block a user