diff --git a/.gitignore b/.gitignore index ce66a70..460c35d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ go.* *.gz feeds/ +feeds.db/ diff --git a/crawler.go b/crawler.go new file mode 100644 index 0000000..215179c --- /dev/null +++ b/crawler.go @@ -0,0 +1,237 @@ +package main + +import ( + "fmt" + "io" + "math/rand" + "net/http" + "runtime" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/cockroachdb/pebble" + "golang.org/x/net/html" +) + +type Crawler struct { + MaxDepth int + MaxPagesPerHost int + Timeout time.Duration + UserAgent string + visited sync.Map + feedsMu sync.Mutex + client *http.Client + hostsProcessed int32 + db *pebble.DB +} + +func NewCrawler(dbPath string) (*Crawler, error) { + db, err := pebble.Open(dbPath, &pebble.Options{}) + if err != nil { + return nil, fmt.Errorf("failed to open pebble db: %v", err) + } + + return &Crawler{ + MaxDepth: 10, + MaxPagesPerHost: 10, + Timeout: 10 * time.Second, + UserAgent: "FeedCrawler/1.0", + db: db, + client: &http.Client{ + Timeout: 10 * time.Second, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + if len(via) >= 10 { + return fmt.Errorf("stopped after 10 redirects") + } + return nil + }, + }, + }, nil +} + +func (c *Crawler) Close() error { + if c.db != nil { + return c.db.Close() + } + return nil +} + +// CrawlUncrawledDomains fetches uncrawled domains and crawls them +func (c *Crawler) CrawlUncrawledDomains() error { + domains, err := c.GetUncrawledDomains() + if err != nil { + return fmt.Errorf("failed to get uncrawled domains: %v", err) + } + + if len(domains) == 0 { + return nil + } + + // Shuffle for randomized crawling + rand.Shuffle(len(domains), func(i, j int) { + domains[i], domains[j] = domains[j], domains[i] + }) + + numWorkers := runtime.NumCPU() - 1 + if numWorkers < 1 { + numWorkers = 1 + } + + type crawlResult struct { + host string + feedsFound int + lastError string + } + + domainChan := make(chan *Domain, numWorkers*2) + resultChan := make(chan crawlResult, numWorkers*2) + var wg sync.WaitGroup + + // Start workers + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for domain := range domainChan { + feedsFound, crawlErr := c.crawlHost(domain.Host) + errStr := "" + if crawlErr != nil { + errStr = crawlErr.Error() + } + resultChan <- crawlResult{ + host: domain.Host, + feedsFound: feedsFound, + lastError: errStr, + } + } + }() + } + + // Start result processor + done := make(chan bool) + go func() { + for result := range resultChan { + if err := c.markDomainCrawled(result.host, result.feedsFound, result.lastError); err != nil { + fmt.Printf("Error marking domain %s as crawled: %v\n", result.host, err) + } + } + done <- true + }() + + // Send domains to workers + for _, domain := range domains { + domainChan <- domain + } + + close(domainChan) + wg.Wait() + close(resultChan) + <-done + + return nil +} + +func (c *Crawler) crawlHost(host string) (feedsFound int, err error) { + atomic.AddInt32(&c.hostsProcessed, 1) + + // Count feeds before crawling + initialCount, _ := c.GetFeedCount() + + localVisited := make(map[string]bool) + pagesVisited := 0 + + // Try HTTPS first, fall back to HTTP if no pages were visited + c.crawlPage("https://"+host, host, 0, localVisited, &pagesVisited) + if pagesVisited == 0 { + c.crawlPage("http://"+host, host, 0, localVisited, &pagesVisited) + } + + // Count feeds after crawling + finalCount, _ := c.GetFeedCount() + feedsFound = finalCount - initialCount + + if pagesVisited == 0 { + return feedsFound, fmt.Errorf("could not connect") + } + + return feedsFound, nil +} + +func (c *Crawler) crawlPage(pageURL, sourceHost string, depth int, localVisited map[string]bool, pagesVisited *int) { + if *pagesVisited >= c.MaxPagesPerHost || depth > c.MaxDepth { + return + } + + if localVisited[pageURL] { + return + } + + if _, visited := c.visited.LoadOrStore(pageURL, true); visited { + return + } + + localVisited[pageURL] = true + *pagesVisited++ + + body, contentType, headers, err := c.fetchPage(pageURL) + if err != nil { + return + } + + if c.isFeedContent(body, contentType) { + c.processFeed(pageURL, sourceHost, body, headers) + return + } + + doc, err := html.Parse(strings.NewReader(body)) + if err != nil { + return + } + + feedLinks := c.extractFeedLinks(doc, pageURL) + for _, fl := range feedLinks { + c.addFeed(fl.URL, fl.Type, sourceHost, pageURL) + } + + anchorFeeds := c.extractAnchorFeeds(doc, pageURL) + for _, fl := range anchorFeeds { + c.addFeed(fl.URL, fl.Type, sourceHost, pageURL) + } + + if depth < c.MaxDepth { + links := c.extractLinks(doc, pageURL) + for _, link := range links { + if shouldCrawl(link, pageURL) { + c.crawlPage(link, sourceHost, depth+1, localVisited, pagesVisited) + } + } + } +} + +func (c *Crawler) fetchPage(pageURL string) (string, string, http.Header, error) { + req, err := http.NewRequest("GET", pageURL, nil) + if err != nil { + return "", "", nil, err + } + req.Header.Set("User-Agent", c.UserAgent) + + resp, err := c.client.Do(req) + if err != nil { + return "", "", nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return "", "", nil, fmt.Errorf("status code: %d", resp.StatusCode) + } + + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return "", "", nil, err + } + + contentType := resp.Header.Get("Content-Type") + return string(bodyBytes), contentType, resp.Header, nil +} diff --git a/dashboard.go b/dashboard.go new file mode 100644 index 0000000..a362b2d --- /dev/null +++ b/dashboard.go @@ -0,0 +1,422 @@ +package main + +import ( + "encoding/json" + "fmt" + "html/template" + "net/http" + "sort" + "time" +) + +// DashboardStats holds all statistics for the dashboard +type DashboardStats struct { + // Domain stats + TotalDomains int `json:"total_domains"` + CrawledDomains int `json:"crawled_domains"` + UncrawledDomains int `json:"uncrawled_domains"` + ErrorDomains int `json:"error_domains"` + + // Feed stats + TotalFeeds int `json:"total_feeds"` + RSSFeeds int `json:"rss_feeds"` + AtomFeeds int `json:"atom_feeds"` + UnknownFeeds int `json:"unknown_feeds"` + + // Crawl progress + HostsProcessed int32 `json:"hosts_processed"` + CrawlRate float64 `json:"crawl_rate"` // domains per minute + + // Top TLDs by feed count + TopTLDs []TLDStat `json:"top_tlds"` + + // Recent feeds + RecentFeeds []RecentFeed `json:"recent_feeds"` + + // Top domains by feed count + TopDomains []DomainStat `json:"top_domains"` + + // Timing + UpdatedAt time.Time `json:"updated_at"` +} + +type TLDStat struct { + TLD string `json:"tld"` + Count int `json:"count"` +} + +type RecentFeed struct { + URL string `json:"url"` + Title string `json:"title"` + Type string `json:"type"` + DiscoveredAt time.Time `json:"discovered_at"` +} + +type DomainStat struct { + Host string `json:"host"` + FeedsFound int `json:"feeds_found"` +} + +// GetDashboardStats collects all statistics for the dashboard +func (c *Crawler) GetDashboardStats() (*DashboardStats, error) { + stats := &DashboardStats{ + UpdatedAt: time.Now(), + HostsProcessed: c.hostsProcessed, + } + + // Get domain stats + if err := c.collectDomainStats(stats); err != nil { + return nil, err + } + + // Get feed stats + if err := c.collectFeedStats(stats); err != nil { + return nil, err + } + + return stats, nil +} + +func (c *Crawler) collectDomainStats(stats *DashboardStats) error { + iter, err := c.db.NewIter(nil) + if err != nil { + return err + } + defer iter.Close() + + domainFeeds := make(map[string]int) + + for iter.SeekGE([]byte("domain:")); iter.Valid(); iter.Next() { + key := string(iter.Key()) + if len(key) < 7 || key[:7] != "domain:" { + break + } + + var domain Domain + if err := json.Unmarshal(iter.Value(), &domain); err != nil { + continue + } + + stats.TotalDomains++ + switch domain.Status { + case "crawled": + stats.CrawledDomains++ + if domain.FeedsFound > 0 { + domainFeeds[domain.Host] = domain.FeedsFound + } + case "uncrawled": + stats.UncrawledDomains++ + case "error": + stats.ErrorDomains++ + } + } + + // Top domains by feed count + type kv struct { + Host string + Count int + } + var sorted []kv + for h, c := range domainFeeds { + sorted = append(sorted, kv{h, c}) + } + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].Count > sorted[j].Count + }) + for i := 0; i < len(sorted) && i < 10; i++ { + stats.TopDomains = append(stats.TopDomains, DomainStat{ + Host: sorted[i].Host, + FeedsFound: sorted[i].Count, + }) + } + + return iter.Error() +} + +func (c *Crawler) collectFeedStats(stats *DashboardStats) error { + iter, err := c.db.NewIter(nil) + if err != nil { + return err + } + defer iter.Close() + + tldCounts := make(map[string]int) + var recentFeeds []RecentFeed + + for iter.SeekGE([]byte("feed:")); iter.Valid(); iter.Next() { + key := string(iter.Key()) + if len(key) < 5 || key[:5] != "feed:" { + break + } + + var feed Feed + if err := json.Unmarshal(iter.Value(), &feed); err != nil { + continue + } + + stats.TotalFeeds++ + switch feed.Type { + case "rss": + stats.RSSFeeds++ + case "atom": + stats.AtomFeeds++ + default: + stats.UnknownFeeds++ + } + + if feed.TLD != "" { + tldCounts[feed.TLD]++ + } + + recentFeeds = append(recentFeeds, RecentFeed{ + URL: feed.URL, + Title: feed.Title, + Type: feed.Type, + DiscoveredAt: feed.DiscoveredAt, + }) + } + + // Top TLDs + type kv struct { + TLD string + Count int + } + var sortedTLDs []kv + for t, c := range tldCounts { + sortedTLDs = append(sortedTLDs, kv{t, c}) + } + sort.Slice(sortedTLDs, func(i, j int) bool { + return sortedTLDs[i].Count > sortedTLDs[j].Count + }) + for i := 0; i < len(sortedTLDs) && i < 10; i++ { + stats.TopTLDs = append(stats.TopTLDs, TLDStat{ + TLD: sortedTLDs[i].TLD, + Count: sortedTLDs[i].Count, + }) + } + + // Recent feeds (last 20, sorted by discovery time) + sort.Slice(recentFeeds, func(i, j int) bool { + return recentFeeds[i].DiscoveredAt.After(recentFeeds[j].DiscoveredAt) + }) + if len(recentFeeds) > 20 { + recentFeeds = recentFeeds[:20] + } + stats.RecentFeeds = recentFeeds + + return iter.Error() +} + +// StartDashboard starts the web dashboard server +func (c *Crawler) StartDashboard(addr string) error { + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + c.handleDashboard(w, r) + }) + http.HandleFunc("/api/stats", func(w http.ResponseWriter, r *http.Request) { + c.handleAPIStats(w, r) + }) + + fmt.Printf("Dashboard running at http://%s\n", addr) + return http.ListenAndServe(addr, nil) +} + +func (c *Crawler) handleDashboard(w http.ResponseWriter, r *http.Request) { + stats, err := c.GetDashboardStats() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + funcMap := template.FuncMap{ + "divf": func(a, b int) float64 { + if b == 0 { + return 0 + } + return float64(a) / float64(b) + }, + "mulf": func(a int, b float64) float64 { + return float64(a) * b + }, + } + + tmpl, err := template.New("dashboard").Funcs(funcMap).Parse(dashboardHTML) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/html") + tmpl.Execute(w, stats) +} + +func (c *Crawler) handleAPIStats(w http.ResponseWriter, r *http.Request) { + stats, err := c.GetDashboardStats() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(stats) +} + +const dashboardHTML = ` + + + 1440.news Feed Crawler + + + + + +

1440.news Feed Crawler

+ +

Crawl Progress

+
+
+
{{.TotalDomains}}
+
Total Domains
+
+
+
{{.CrawledDomains}}
+
Crawled
+ {{if .TotalDomains}} +
+
+
+ {{end}} +
+
+
{{.UncrawledDomains}}
+
Uncrawled
+
+
+
{{.ErrorDomains}}
+
Errors
+
+
+ +

Feeds Discovered

+
+
+
{{.TotalFeeds}}
+
Total Feeds
+
+
+
{{.RSSFeeds}}
+
RSS Feeds
+
+
+
{{.AtomFeeds}}
+
Atom Feeds
+
+
+
{{.UnknownFeeds}}
+
Unknown Type
+
+
+ +
+
+

Top TLDs

+ {{range .TopTLDs}} +
+ .{{.TLD}} + {{.Count}} +
+ {{else}} +
No data yet
+ {{end}} +
+
+

Top Domains

+ {{range .TopDomains}} +
+ {{.Host}} + {{.FeedsFound}} +
+ {{else}} +
No data yet
+ {{end}} +
+
+ +

Recent Feeds

+
+ + + + + + + + + + + {{range .RecentFeeds}} + + + + + + + {{else}} + + {{end}} + +
URLTitleTypeDiscovered
{{.URL}}{{if .Title}}{{.Title}}{{else}}-{{end}}{{.Type}}{{.DiscoveredAt.Format "15:04:05"}}
No feeds discovered yet
+
+ +
Last updated: {{.UpdatedAt.Format "2006-01-02 15:04:05"}}
+ +` diff --git a/domain.go b/domain.go new file mode 100644 index 0000000..177f970 --- /dev/null +++ b/domain.go @@ -0,0 +1,227 @@ +package main + +import ( + "bufio" + "compress/gzip" + "encoding/json" + "fmt" + "io" + "os" + "strings" + "time" + + "github.com/cockroachdb/pebble" +) + +// Domain represents a host to be crawled for feeds +type Domain struct { + Host string `json:"host"` // Normalized hostname (no scheme, no www.) + Status string `json:"status"` // "uncrawled", "crawled", "error" + DiscoveredAt time.Time `json:"discovered_at"` + LastCrawledAt time.Time `json:"last_crawled_at,omitempty"` + FeedsFound int `json:"feeds_found,omitempty"` + LastError string `json:"last_error,omitempty"` + TLD string `json:"tld,omitempty"` +} + +// saveDomain stores a domain in PebbleDB +func (c *Crawler) saveDomain(domain *Domain) error { + data, err := json.Marshal(domain) + if err != nil { + return fmt.Errorf("failed to marshal domain: %v", err) + } + + key := []byte("domain:" + domain.Host) + return c.db.Set(key, data, pebble.Sync) +} + +// getDomain retrieves a domain from PebbleDB +func (c *Crawler) getDomain(host string) (*Domain, error) { + key := []byte("domain:" + normalizeHost(host)) + data, closer, err := c.db.Get(key) + if err != nil { + if err == pebble.ErrNotFound { + return nil, nil + } + return nil, err + } + defer closer.Close() + + var domain Domain + if err := json.Unmarshal(data, &domain); err != nil { + return nil, fmt.Errorf("failed to unmarshal domain: %v", err) + } + return &domain, nil +} + +// domainExists checks if a domain already exists in the database +func (c *Crawler) domainExists(host string) bool { + key := []byte("domain:" + normalizeHost(host)) + _, closer, err := c.db.Get(key) + if err != nil { + return false + } + closer.Close() + return true +} + +// GetUncrawledDomains returns all domains with status "uncrawled" +func (c *Crawler) GetUncrawledDomains() ([]*Domain, error) { + var domains []*Domain + + iter, err := c.db.NewIter(&pebble.IterOptions{ + LowerBound: []byte("domain:"), + UpperBound: []byte("domain:\xff"), + }) + if err != nil { + return nil, err + } + defer iter.Close() + + for iter.First(); iter.Valid(); iter.Next() { + var domain Domain + if err := json.Unmarshal(iter.Value(), &domain); err != nil { + continue + } + if domain.Status == "uncrawled" { + domains = append(domains, &domain) + } + } + + if err := iter.Error(); err != nil { + return nil, err + } + + return domains, nil +} + +// markDomainCrawled updates a domain's status after crawling +func (c *Crawler) markDomainCrawled(host string, feedsFound int, lastError string) error { + domain, err := c.getDomain(host) + if err != nil { + return err + } + if domain == nil { + return fmt.Errorf("domain not found: %s", host) + } + + domain.LastCrawledAt = time.Now() + domain.FeedsFound = feedsFound + if lastError != "" { + domain.Status = "error" + domain.LastError = lastError + } else { + domain.Status = "crawled" + domain.LastError = "" + } + + return c.saveDomain(domain) +} + +// GetDomainCount returns the total number of domains in the database +func (c *Crawler) GetDomainCount() (total int, uncrawled int, err error) { + iter, err := c.db.NewIter(&pebble.IterOptions{ + LowerBound: []byte("domain:"), + UpperBound: []byte("domain:\xff"), + }) + if err != nil { + return 0, 0, err + } + defer iter.Close() + + for iter.First(); iter.Valid(); iter.Next() { + total++ + var domain Domain + if err := json.Unmarshal(iter.Value(), &domain); err != nil { + continue + } + if domain.Status == "uncrawled" { + uncrawled++ + } + } + + if err := iter.Error(); err != nil { + return 0, 0, err + } + + return total, uncrawled, nil +} + +// ImportDomainsFromFile reads a vertices file and stores new domains as "uncrawled" +func (c *Crawler) ImportDomainsFromFile(filename string, limit int) (imported int, skipped int, err error) { + file, err := os.Open(filename) + if err != nil { + return 0, 0, fmt.Errorf("failed to open file: %v", err) + } + defer file.Close() + + return c.parseAndStoreDomains(file, limit) +} + +func (c *Crawler) parseAndStoreDomains(reader io.Reader, limit int) (imported int, skipped int, err error) { + var bodyReader io.Reader + + bufReader := bufio.NewReader(reader) + peekBytes, err := bufReader.Peek(2) + if err != nil && err != io.EOF { + return 0, 0, fmt.Errorf("failed to peek at file: %v", err) + } + + if len(peekBytes) >= 2 && peekBytes[0] == 0x1f && peekBytes[1] == 0x8b { + gzReader, err := gzip.NewReader(bufReader) + if err != nil { + return 0, 0, fmt.Errorf("failed to create gzip reader: %v", err) + } + defer gzReader.Close() + bodyReader = gzReader + } else { + bodyReader = bufReader + } + + scanner := bufio.NewScanner(bodyReader) + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, 1024*1024) + + now := time.Now() + count := 0 + + for scanner.Scan() { + if limit > 0 && count >= limit { + break + } + + line := scanner.Text() + parts := strings.Split(line, "\t") + if len(parts) >= 2 { + reverseHostName := strings.TrimSpace(parts[1]) + if reverseHostName != "" { + host := normalizeHost(reverseHost(reverseHostName)) + count++ + + // Skip if domain already exists + if c.domainExists(host) { + skipped++ + continue + } + + // Store new domain as uncrawled + domain := &Domain{ + Host: host, + Status: "uncrawled", + DiscoveredAt: now, + TLD: getTLD(host), + } + if err := c.saveDomain(domain); err != nil { + continue + } + imported++ + } + } + } + + if err := scanner.Err(); err != nil { + return imported, skipped, fmt.Errorf("error reading file: %v", err) + } + + return imported, skipped, nil +} diff --git a/feed.go b/feed.go new file mode 100644 index 0000000..e8949a1 --- /dev/null +++ b/feed.go @@ -0,0 +1,233 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + "time" + + "github.com/cockroachdb/pebble" +) + +// Feed represents a discovered RSS/Atom feed with metadata +type Feed struct { + URL string `json:"url"` + Type string `json:"type"` // "rss", "atom", or "unknown" + Title string `json:"title,omitempty"` + Description string `json:"description,omitempty"` + Language string `json:"language,omitempty"` + SiteURL string `json:"site_url,omitempty"` // The website the feed belongs to + + // Timing + DiscoveredAt time.Time `json:"discovered_at"` + LastCrawledAt time.Time `json:"last_crawled_at,omitempty"` + NextCrawlAt time.Time `json:"next_crawl_at,omitempty"` + LastBuildDate time.Time `json:"last_build_date,omitempty"` // From feed's lastBuildDate/updated + + // Cache headers for conditional requests + ETag string `json:"etag,omitempty"` + LastModified string `json:"last_modified,omitempty"` + + // Feed hints for crawl scheduling + TTLMinutes int `json:"ttl_minutes,omitempty"` // From RSS element + UpdatePeriod string `json:"update_period,omitempty"` // From sy:updatePeriod (hourly, daily, weekly, monthly, yearly) + UpdateFreq int `json:"update_freq,omitempty"` // From sy:updateFrequency + + // Health tracking + Status string `json:"status"` // "active", "dead", "redirect", "error" + ErrorCount int `json:"error_count"` + LastError string `json:"last_error,omitempty"` + LastErrorAt time.Time `json:"last_error_at,omitempty"` + + // Discovery source + SourceURL string `json:"source_url,omitempty"` // Where we found this feed + SourceHost string `json:"source_host,omitempty"` + TLD string `json:"tld,omitempty"` + + // Content stats + ItemCount int `json:"item_count,omitempty"` // Number of items in last crawl + AvgPostFreqHrs float64 `json:"avg_post_freq_hrs,omitempty"` // Average hours between posts + OldestItemDate time.Time `json:"oldest_item_date,omitempty"` + NewestItemDate time.Time `json:"newest_item_date,omitempty"` +} + +// saveFeed stores a feed in PebbleDB +func (c *Crawler) saveFeed(feed *Feed) error { + data, err := json.Marshal(feed) + if err != nil { + return fmt.Errorf("failed to marshal feed: %v", err) + } + + key := []byte("feed:" + feed.URL) + return c.db.Set(key, data, pebble.Sync) +} + +// getFeed retrieves a feed from PebbleDB +func (c *Crawler) getFeed(feedURL string) (*Feed, error) { + key := []byte("feed:" + normalizeURL(feedURL)) + data, closer, err := c.db.Get(key) + if err != nil { + if err == pebble.ErrNotFound { + return nil, nil + } + return nil, err + } + defer closer.Close() + + var feed Feed + if err := json.Unmarshal(data, &feed); err != nil { + return nil, fmt.Errorf("failed to unmarshal feed: %v", err) + } + return &feed, nil +} + +// feedExists checks if a feed URL already exists in the database +func (c *Crawler) feedExists(feedURL string) bool { + key := []byte("feed:" + normalizeURL(feedURL)) + _, closer, err := c.db.Get(key) + if err != nil { + return false + } + closer.Close() + return true +} + +// GetAllFeeds returns all feeds from the database +func (c *Crawler) GetAllFeeds() ([]*Feed, error) { + var feeds []*Feed + + iter, err := c.db.NewIter(&pebble.IterOptions{ + LowerBound: []byte("feed:"), + UpperBound: []byte("feed:\xff"), + }) + if err != nil { + return nil, err + } + defer iter.Close() + + for iter.First(); iter.Valid(); iter.Next() { + var feed Feed + if err := json.Unmarshal(iter.Value(), &feed); err != nil { + continue + } + feeds = append(feeds, &feed) + } + + if err := iter.Error(); err != nil { + return nil, err + } + + return feeds, nil +} + +// GetFeedCount returns the total number of feeds in the database +func (c *Crawler) GetFeedCount() (int, error) { + count := 0 + + iter, err := c.db.NewIter(&pebble.IterOptions{ + LowerBound: []byte("feed:"), + UpperBound: []byte("feed:\xff"), + }) + if err != nil { + return 0, err + } + defer iter.Close() + + for iter.First(); iter.Valid(); iter.Next() { + count++ + } + + if err := iter.Error(); err != nil { + return 0, err + } + + return count, nil +} + +// processFeed parses and stores a feed with full metadata +func (c *Crawler) processFeed(feedURL, sourceHost, body string, headers http.Header) { + if strings.Contains(feedURL, "/comment") { + return + } + + // Fast path: check without lock + if c.feedExists(feedURL) { + return + } + + c.feedsMu.Lock() + defer c.feedsMu.Unlock() + + // Double-check after acquiring lock + if c.feedExists(feedURL) { + return + } + + feedType := c.detectFeedType(body) + now := time.Now() + + feed := &Feed{ + URL: normalizeURL(feedURL), + Type: feedType, + DiscoveredAt: now, + LastCrawledAt: now, + Status: "active", + SourceHost: sourceHost, + TLD: getTLD(sourceHost), + ETag: headers.Get("ETag"), + LastModified: headers.Get("Last-Modified"), + } + + // Parse feed-specific metadata + switch feedType { + case "rss": + c.parseRSSMetadata(body, feed) + case "atom": + c.parseAtomMetadata(body, feed) + } + + // Calculate next crawl time + feed.NextCrawlAt = c.calculateNextCrawl(feed) + + if err := c.saveFeed(feed); err != nil { + return + } +} + +// addFeed adds a discovered feed URL (not yet fetched) +func (c *Crawler) addFeed(feedURL, feedType, sourceHost, sourceURL string) { + if strings.Contains(feedURL, "/comment") { + return + } + + // Fast path: check without lock + if c.feedExists(feedURL) { + return + } + + c.feedsMu.Lock() + defer c.feedsMu.Unlock() + + // Double-check after acquiring lock + if c.feedExists(feedURL) { + return + } + + now := time.Now() + normalizedURL := normalizeURL(feedURL) + feed := &Feed{ + URL: normalizedURL, + Type: feedType, + DiscoveredAt: now, + Status: "active", + SourceURL: normalizeURL(sourceURL), + SourceHost: sourceHost, + TLD: getTLD(sourceHost), + NextCrawlAt: now, // Should be crawled immediately + } + + if err := c.saveFeed(feed); err != nil { + return + } +} diff --git a/html.go b/html.go new file mode 100644 index 0000000..ffcabf1 --- /dev/null +++ b/html.go @@ -0,0 +1,122 @@ +package main + +import ( + "regexp" + "strings" + + "golang.org/x/net/html" +) + +// simpleFeed is a lightweight feed reference used during HTML extraction +type simpleFeed struct { + URL string + Type string +} + +func (c *Crawler) isFeedContent(body, contentType string) bool { + if strings.Contains(contentType, "application/rss+xml") || + strings.Contains(contentType, "application/atom+xml") || + strings.Contains(contentType, "application/xml") || + strings.Contains(contentType, "text/xml") { + return true + } + + body = strings.TrimSpace(body) + if strings.HasPrefix(body, "= 10 { - return fmt.Errorf("stopped after 10 redirects") - } - return nil - }, - }, - } -} - -// reverseHost converts a reverse domain notation back to normal -// e.g., "com.example.www" -> "www.example.com" -func reverseHost(reverseHost string) string { - parts := strings.Split(reverseHost, ".") - // Reverse the parts - for i, j := 0, len(parts)-1; i < j; i, j = i+1, j-1 { - parts[i], parts[j] = parts[j], parts[i] - } - return strings.Join(parts, ".") -} - -// getTLD extracts the TLD from a hostname -func getTLD(host string) string { - parts := strings.Split(host, ".") - if len(parts) > 0 { - return parts[len(parts)-1] - } - return "" -} - -func (c *Crawler) GetCommonCrawlHostsFromFile(filename string, limit int) ([]string, error) { - file, err := os.Open(filename) - if err != nil { - return nil, fmt.Errorf("failed to open file: %v", err) - } - defer file.Close() - - hosts, err := c.parseVerticesFile(file, limit) - if err != nil { - return nil, fmt.Errorf("failed to parse vertices: %v", err) - } - - // Randomize the order - rand.Shuffle(len(hosts), func(i, j int) { - hosts[i], hosts[j] = hosts[j], hosts[i] - }) - - return hosts, nil -} - -func (c *Crawler) parseVerticesFile(reader io.Reader, limit int) ([]string, error) { - // Try to detect if it's gzipped - var bodyReader io.Reader - - // Create a buffered reader so we can peek - bufReader := bufio.NewReader(reader) - peekBytes, err := bufReader.Peek(2) - if err != nil && err != io.EOF { - return nil, fmt.Errorf("failed to peek at file: %v", err) - } - - // Check for gzip magic number (0x1f 0x8b) - if len(peekBytes) >= 2 && peekBytes[0] == 0x1f && peekBytes[1] == 0x8b { - gzReader, err := gzip.NewReader(bufReader) - if err != nil { - return nil, fmt.Errorf("failed to create gzip reader: %v", err) - } - defer gzReader.Close() - bodyReader = gzReader - } else { - bodyReader = bufReader - } - - hosts := make([]string, 0) - scanner := bufio.NewScanner(bodyReader) - - // Set a larger buffer for scanning - buf := make([]byte, 0, 64*1024) - scanner.Buffer(buf, 1024*1024) - - count := 0 - for scanner.Scan() { - if limit > 0 && count >= limit { - break - } - - line := scanner.Text() - // Vertices file format: line_number\treverse_hostname\tinteger - // Example: 0\tcom.example\t42 - parts := strings.Split(line, "\t") - if len(parts) >= 2 { - reverseHostName := strings.TrimSpace(parts[1]) - if reverseHostName != "" { - // Convert from reverse notation (com.example) to normal (example.com) - normalHost := reverseHost(reverseHostName) - hosts = append(hosts, normalHost) - count++ - } - } - } - - if err := scanner.Err(); err != nil { - return nil, fmt.Errorf("error reading file: %v", err) - } - - return hosts, nil -} - -func (c *Crawler) openTLDFile(tld string) error { - // Close previous file if open - if c.tldFile != nil { - c.sortAndDeduplicateTLDFile() - c.tldFile.Close() - c.tldFile = nil - c.tldFeeds = make(map[string]bool) - } - - // Open new file - if tld != "" { - filename := "feeds/" + tld + ".feed" - file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return fmt.Errorf("failed to open TLD file %s: %v", filename, err) - } - c.tldFile = file - c.currentTLD = tld - } - - return nil -} - -func (c *Crawler) sortAndDeduplicateTLDFile() { - if c.currentTLD == "" { - return - } - - filename := "feeds/" + c.currentTLD + ".feed" - - // Read all lines from the file - file, err := os.Open(filename) - if err != nil { - return - } - - feedSet := make(map[string]bool) - scanner := bufio.NewScanner(file) - for scanner.Scan() { - line := strings.TrimSpace(scanner.Text()) - if line != "" { - feedSet[line] = true - } - } - file.Close() - - // Sort the unique feeds - feeds := make([]string, 0, len(feedSet)) - for feed := range feedSet { - feeds = append(feeds, feed) - } - sort.Strings(feeds) - - // Write back to file - file, err = os.Create(filename) - if err != nil { - return - } - defer file.Close() - - writer := bufio.NewWriter(file) - for _, feed := range feeds { - writer.WriteString(feed + "\n") - } - writer.Flush() -} - -func (c *Crawler) writeFeedToTLDFile(feedURL, host string) { - c.tldMu.Lock() - defer c.tldMu.Unlock() - - tld := getTLD(host) - - // Check if TLD changed - if tld != c.currentTLD { - c.openTLDFile(tld) - } - - // Write feed to file if not already written - if c.tldFile != nil && !c.tldFeeds[feedURL] { - c.tldFile.WriteString(feedURL + "\n") - c.tldFeeds[feedURL] = true - } -} - -func (c *Crawler) Crawl(startURL string) ([]Feed, error) { - pagesVisited := 0 - c.crawlPage(startURL, 0, make(map[string]bool), &pagesVisited) - return c.feeds, nil -} - -func (c *Crawler) CrawlHosts(hosts []string) ([]Feed, error) { - numWorkers := runtime.NumCPU() - 1 - if numWorkers < 1 { - numWorkers = 1 - } - - hostChan := make(chan string, numWorkers*2) - var wg sync.WaitGroup - - // Start workers - for i := 0; i < numWorkers; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for host := range hostChan { - c.crawlHost(host) - } - }() - } - - // Send hosts to workers - for _, host := range hosts { - hostChan <- host - } - - close(hostChan) - wg.Wait() - - // Close final TLD file - c.tldMu.Lock() - c.openTLDFile("") - c.tldMu.Unlock() - - return c.feeds, nil -} - -func (c *Crawler) crawlHost(host string) { - atomic.AddInt32(&c.hostsProcessed, 1) - - hostFeeds := make([]Feed, 0) - localVisited := make(map[string]bool) - pagesVisited := 0 - - // Try both http and https - urls := []string{ - "https://" + host, - "http://" + host, - } - - for _, url := range urls { - c.crawlPage(url, 0, localVisited, &pagesVisited) - break // If first succeeds, don't try second - } - - // Collect feeds found for this host - c.feedsMu.Lock() - for _, feed := range c.feeds { - // Check if feed belongs to this host - feedHost := "" - if u, err := url.Parse(feed.URL); err == nil { - feedHost = u.Host - } - if feedHost == host || strings.HasSuffix(feedHost, "."+host) { - hostFeeds = append(hostFeeds, feed) - } - } - c.feedsMu.Unlock() - - // Print and write feeds found for this host - if len(hostFeeds) > 0 { - for _, feed := range hostFeeds { - fmt.Printf("%s\n", feed.URL) - c.writeFeedToTLDFile(feed.URL, host) - } - } -} - -func (c *Crawler) crawlPage(pageURL string, depth int, localVisited map[string]bool, pagesVisited *int) { - if *pagesVisited >= c.MaxPagesPerHost || depth > c.MaxDepth { - return - } - - if localVisited[pageURL] { - return - } - - // Check global visited - if _, visited := c.visited.LoadOrStore(pageURL, true); visited { - return - } - - localVisited[pageURL] = true - *pagesVisited++ - - body, contentType, err := c.fetchPage(pageURL) - if err != nil { - return - } - - // Check if this page itself is a feed - if c.isFeedContent(body, contentType) { - feedType := c.detectFeedType(body) - c.addFeed(pageURL, feedType) - - // Extract links from the feed and crawl them - feedLinks := c.extractLinksFromFeed(body, feedType) - - for _, link := range feedLinks { - c.crawlPage(link, depth+1, localVisited, pagesVisited) - } - return - } - - // Parse HTML and look for feed links - doc, err := html.Parse(strings.NewReader(body)) - if err != nil { - return - } - - // Find feed links in tags - feedLinks := c.extractFeedLinks(doc, pageURL) - for _, feed := range feedLinks { - c.addFeed(feed.URL, feed.Type) - } - - // Find feed links in anchor tags - anchorFeeds := c.extractAnchorFeeds(doc, pageURL) - for _, feed := range anchorFeeds { - c.addFeed(feed.URL, feed.Type) - } - - // Extract all links for further crawling - if depth < c.MaxDepth { - links := c.extractLinks(doc, pageURL) - for _, link := range links { - if c.shouldCrawl(link, pageURL) { - c.crawlPage(link, depth+1, localVisited, pagesVisited) - } - } - } -} - -func (c *Crawler) fetchPage(pageURL string) (string, string, error) { - req, err := http.NewRequest("GET", pageURL, nil) - if err != nil { - return "", "", err - } - req.Header.Set("User-Agent", c.UserAgent) - - resp, err := c.client.Do(req) - if err != nil { - return "", "", err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return "", "", fmt.Errorf("status code: %d", resp.StatusCode) - } - - bodyBytes, err := io.ReadAll(resp.Body) - if err != nil { - return "", "", err - } - - contentType := resp.Header.Get("Content-Type") - return string(bodyBytes), contentType, nil -} - -func (c *Crawler) isFeedContent(body, contentType string) bool { - if strings.Contains(contentType, "application/rss+xml") || - strings.Contains(contentType, "application/atom+xml") || - strings.Contains(contentType, "application/xml") || - strings.Contains(contentType, "text/xml") { - return true - } - - body = strings.TrimSpace(body) - if strings.HasPrefix(body, " 0 { + oldest, newest := dates[0], dates[0] + for _, d := range dates { + if d.Before(oldest) { + oldest = d + } + if d.After(newest) { + newest = d + } + } + feed.OldestItemDate = oldest + feed.NewestItemDate = newest + + if len(dates) > 1 { + totalHours := newest.Sub(oldest).Hours() + feed.AvgPostFreqHrs = totalHours / float64(len(dates)-1) + } + } +} + +func (c *Crawler) parseAtomMetadata(body string, feed *Feed) { + var atom AtomFeed + if err := xml.Unmarshal([]byte(body), &atom); err != nil { + return + } + + feed.Title = atom.Title + feed.ItemCount = len(atom.Entries) + + // Get site URL from links + for _, link := range atom.Link { + if link.Rel == "" || link.Rel == "alternate" { + if link.Type == "" || strings.Contains(link.Type, "html") { + feed.SiteURL = normalizeURL(link.Href) + break + } + } + } + + // Parse updated date + if atom.Updated != "" { + if t, err := time.Parse(time.RFC3339, atom.Updated); err == nil { + feed.LastBuildDate = t + } + } + + // Analyze entry dates + var dates []time.Time + for _, entry := range atom.Entries { + dateStr := entry.Updated + if dateStr == "" { + dateStr = entry.Published + } + if dateStr != "" { + if t, err := time.Parse(time.RFC3339, dateStr); err == nil { + dates = append(dates, t) + } + } + } + + if len(dates) > 0 { + oldest, newest := dates[0], dates[0] + for _, d := range dates { + if d.Before(oldest) { + oldest = d + } + if d.After(newest) { + newest = d + } + } + feed.OldestItemDate = oldest + feed.NewestItemDate = newest + + if len(dates) > 1 { + totalHours := newest.Sub(oldest).Hours() + feed.AvgPostFreqHrs = totalHours / float64(len(dates)-1) + } + } +} + +// parseRSSDate attempts to parse various RSS date formats +func parseRSSDate(s string) (time.Time, error) { + formats := []string{ + time.RFC1123Z, + time.RFC1123, + time.RFC822Z, + time.RFC822, + time.RFC3339, + "Mon, 2 Jan 2006 15:04:05 -0700", + "2006-01-02T15:04:05-07:00", + "2006-01-02 15:04:05", + } + + for _, format := range formats { + if t, err := time.Parse(format, s); err == nil { + return t, nil + } + } + return time.Time{}, fmt.Errorf("unable to parse date: %s", s) +} + +// calculateNextCrawl determines when to next crawl this feed +func (c *Crawler) calculateNextCrawl(feed *Feed) time.Time { + now := time.Now() + + // If TTL is specified, use it + if feed.TTLMinutes > 0 { + return now.Add(time.Duration(feed.TTLMinutes) * time.Minute) + } + + // If updatePeriod is specified + if feed.UpdatePeriod != "" { + freq := feed.UpdateFreq + if freq == 0 { + freq = 1 + } + switch strings.ToLower(feed.UpdatePeriod) { + case "hourly": + return now.Add(time.Duration(freq) * time.Hour) + case "daily": + return now.Add(time.Duration(freq) * 24 * time.Hour) + case "weekly": + return now.Add(time.Duration(freq) * 7 * 24 * time.Hour) + case "monthly": + return now.Add(time.Duration(freq) * 30 * 24 * time.Hour) + case "yearly": + return now.Add(time.Duration(freq) * 365 * 24 * time.Hour) + } + } + + // If we have average post frequency, use that + if feed.AvgPostFreqHrs > 0 { + // Crawl at half the average frequency, but at least every hour and at most once per day + crawlInterval := feed.AvgPostFreqHrs / 2 + if crawlInterval < 1 { + crawlInterval = 1 + } + if crawlInterval > 24 { + crawlInterval = 24 + } + return now.Add(time.Duration(crawlInterval * float64(time.Hour))) + } + + // Default: crawl every 6 hours + return now.Add(6 * time.Hour) +} diff --git a/util.go b/util.go new file mode 100644 index 0000000..32f153b --- /dev/null +++ b/util.go @@ -0,0 +1,82 @@ +package main + +import ( + "net/url" + "strings" +) + +// normalizeURL strips scheme (http/https) and www. prefix to save storage space. +// The normalized URL can be reconstructed with https:// for fetching. +func normalizeURL(rawURL string) string { + // Remove scheme + u := rawURL + if strings.HasPrefix(u, "https://") { + u = u[8:] + } else if strings.HasPrefix(u, "http://") { + u = u[7:] + } + + // Remove www. prefix + if strings.HasPrefix(u, "www.") { + u = u[4:] + } + + return u +} + +// normalizeHost strips www. prefix from a hostname for canonical storage +func normalizeHost(host string) string { + if strings.HasPrefix(host, "www.") { + return host[4:] + } + return host +} + +// reverseHost converts a reverse domain notation back to normal +// e.g., "com.example.www" -> "www.example.com" +func reverseHost(reverseHost string) string { + parts := strings.Split(reverseHost, ".") + for i, j := 0, len(parts)-1; i < j; i, j = i+1, j-1 { + parts[i], parts[j] = parts[j], parts[i] + } + return strings.Join(parts, ".") +} + +// getTLD extracts the TLD from a hostname +func getTLD(host string) string { + parts := strings.Split(host, ".") + if len(parts) > 0 { + return parts[len(parts)-1] + } + return "" +} + +// makeAbsoluteURL resolves a relative URL against a base URL +func makeAbsoluteURL(href, baseURL string) string { + base, err := url.Parse(baseURL) + if err != nil { + return href + } + + link, err := url.Parse(href) + if err != nil { + return href + } + + return base.ResolveReference(link).String() +} + +// shouldCrawl checks if a link should be crawled (same host as base) +func shouldCrawl(link, baseURL string) bool { + linkURL, err := url.Parse(link) + if err != nil { + return false + } + + baseURLParsed, err := url.Parse(baseURL) + if err != nil { + return false + } + + return linkURL.Host == baseURLParsed.Host +}