Files
crawler/cdx.go
primal 288379804d Remove category field from feeds
- Remove classifyFeed and classifyFeedByTitle functions
- Remove Category from Feed struct
- Remove category from all SQL queries and scans
- Add migration to drop category column from database

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 20:37:26 -05:00

676 lines
17 KiB
Go

package main
import (
"bufio"
"compress/gzip"
"context"
"database/sql"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"sync/atomic"
"time"
_ "github.com/duckdb/duckdb-go/v2"
)
const cdxDataDir = "cdx-data"
const cdxFileTimeout = 100 * time.Minute
// checkAndImportCDXFeeds looks for completed CDX TSV files and bulk imports them on startup
func (c *Crawler) checkAndImportCDXFeeds() {
// Look for TSV files in cdx-data directory
pattern := filepath.Join(cdxDataDir, "*-feeds.tsv")
matches, err := filepath.Glob(pattern)
if err != nil || len(matches) == 0 {
return
}
// Get current feed count
feedCount, _ := c.GetFeedCount()
for _, tsvFile := range matches {
// Count lines in TSV
lineCount := countLines(tsvFile)
if lineCount == 0 {
continue
}
// If we have significantly fewer feeds than the TSV, import it
if feedCount < lineCount/2 {
fmt.Printf("CDX bulk import: found %s with %d feeds (db has %d), importing...\n",
filepath.Base(tsvFile), lineCount, feedCount)
if err := c.BulkImportFeedsFromTSV(tsvFile); err != nil {
fmt.Printf("CDX bulk import: error: %v\n", err)
}
} else {
fmt.Printf("CDX bulk import: %s already processed (%d feeds in db)\n",
filepath.Base(tsvFile), feedCount)
}
}
}
// countLines counts lines in a file efficiently
func countLines(filename string) int {
f, err := os.Open(filename)
if err != nil {
return 0
}
defer f.Close()
buf := make([]byte, 32*1024)
count := 0
for {
n, err := f.Read(buf)
for i := 0; i < n; i++ {
if buf[i] == '\n' {
count++
}
}
if err != nil {
break
}
}
return count
}
// StartCDXMonthlyLoop runs CDX import on the 1st of each month
func (c *Crawler) StartCDXMonthlyLoop() {
fmt.Println("CDX monthly: starting scheduler...")
for {
if c.IsShuttingDown() {
return
}
// Calculate time until next 1st of month at 00:00 UTC
now := time.Now().UTC()
nextRun := time.Date(now.Year(), now.Month()+1, 1, 0, 0, 0, 0, time.UTC)
if now.Day() == 1 && now.Hour() < 12 {
// If it's the 1st and before noon, run today
nextRun = time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)
}
waitDuration := time.Until(nextRun)
fmt.Printf("CDX monthly: next run at %s (in %v)\n", nextRun.Format(time.RFC3339), waitDuration.Round(time.Hour))
// Wait until next run time (or shutdown)
select {
case <-c.shutdownCh:
return
case <-time.After(waitDuration):
}
// Run the import
c.runCDXImport()
}
}
// runCDXImport performs the actual CDX import for the latest crawl
func (c *Crawler) runCDXImport() {
// Ensure data directory exists
if err := os.MkdirAll(cdxDataDir, 0755); err != nil {
fmt.Printf("CDX import: failed to create data dir: %v\n", err)
return
}
// Get latest crawl ID
crawlID := c.getLatestCrawlID()
if crawlID == "" {
fmt.Println("CDX import: could not determine latest crawl ID")
return
}
fmt.Printf("CDX import: latest crawl is %s\n", crawlID)
// Check if this crawl is already complete
progress := c.db.GetCDXProgress()
if progress.CrawlID == crawlID && progress.CurrentFile == "" && progress.TotalFeeds > 0 {
fmt.Printf("CDX import: crawl %s already complete (%d feeds)\n", crawlID, progress.TotalFeeds)
return
}
// Check if we already have final results for this crawl
feedsFile := filepath.Join(cdxDataDir, crawlID+"-feeds.tsv")
if _, err := os.Stat(feedsFile); err == nil {
fmt.Printf("CDX import: found existing %s, processing...\n", feedsFile)
c.processFeedsFile(feedsFile)
return
}
// Process parquet files directly from remote
if err := c.extractFeedsFromRemoteParquet(crawlID, feedsFile); err != nil {
fmt.Printf("CDX import error: %v\n", err)
return
}
// Process the feeds file
c.processFeedsFile(feedsFile)
}
// extractFeedsFromRemoteParquet queries remote parquet files directly via DuckDB httpfs
func (c *Crawler) extractFeedsFromRemoteParquet(crawlID, outputFile string) error {
// Get manifest of parquet files
parquetPaths, err := c.fetchParquetManifest(crawlID)
if err != nil {
return err
}
totalFiles := len(parquetPaths)
fmt.Printf("CDX import: found %d parquet files to process\n", totalFiles)
// Check for resume point
progress := c.db.GetCDXProgress()
startIndex := 0
var totalFeeds int64
if progress.CrawlID == crawlID && progress.CurrentFile != "" {
// Resume from where we left off
totalFeeds = int64(progress.TotalFeeds)
for i, path := range parquetPaths {
if filepath.Base(path) == progress.CurrentFile {
startIndex = i
fmt.Printf("CDX import: resuming from file %d/%d (%s), %d feeds so far\n",
i+1, totalFiles, progress.CurrentFile, totalFeeds)
break
}
}
}
// Create/append output file
tmpFile := outputFile + ".tmp"
var f *os.File
if startIndex > 0 {
f, err = os.OpenFile(tmpFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
} else {
f, err = os.Create(tmpFile)
}
if err != nil {
return fmt.Errorf("failed to open output file: %w", err)
}
defer f.Close()
writer := bufio.NewWriter(f)
startTime := time.Now()
// Process each file sequentially with infinite retries
for i := startIndex; i < totalFiles; i++ {
if c.IsShuttingDown() {
fmt.Println("CDX import: shutdown requested, stopping...")
break
}
path := parquetPaths[i]
fileName := filepath.Base(path)
remoteURL := "https://data.commoncrawl.org/" + path
// Save progress before starting this file
c.db.SetCDXProgress(crawlID, fileName, int(totalFeeds))
// Process this file with infinite retries
count := c.queryRemoteParquetWithRetry(remoteURL, fileName, writer, i+1, totalFiles)
atomic.AddInt64(&totalFeeds, int64(count))
// Progress update
processed := i - startIndex + 1
elapsed := time.Since(startTime)
rate := float64(processed) / elapsed.Seconds() * 60
remaining := totalFiles - i - 1
eta := time.Duration(float64(remaining) / rate * float64(time.Minute))
fmt.Printf("CDX import: [%d/%d] %s - %d feeds (total: %d, %.1f files/min, ETA: %v)\n",
i+1, totalFiles, fileName, count, atomic.LoadInt64(&totalFeeds), rate, eta.Round(time.Minute))
}
if err := writer.Flush(); err != nil {
return fmt.Errorf("failed to flush output: %w", err)
}
f.Close()
// Mark as complete
c.db.CompleteCDXProgress(crawlID, int(totalFeeds))
// Rename to final filename
if err := os.Rename(tmpFile, outputFile); err != nil {
return fmt.Errorf("failed to rename output: %w", err)
}
fmt.Printf("CDX import: extracted %d feed candidates in %v\n",
atomic.LoadInt64(&totalFeeds), time.Since(startTime).Round(time.Second))
return nil
}
// fetchParquetManifest downloads and parses the manifest file
func (c *Crawler) fetchParquetManifest(crawlID string) ([]string, error) {
manifestURL := fmt.Sprintf("https://data.commoncrawl.org/crawl-data/%s/cc-index-table.paths.gz", crawlID)
fmt.Printf("CDX import: downloading manifest from %s\n", manifestURL)
resp, err := c.client.Get(manifestURL)
if err != nil {
return nil, fmt.Errorf("failed to download manifest: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("manifest download failed: %s", resp.Status)
}
gzReader, err := gzip.NewReader(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to decompress manifest: %w", err)
}
defer gzReader.Close()
// Parse manifest and filter for warc subset
var parquetPaths []string
scanner := bufio.NewScanner(gzReader)
for scanner.Scan() {
path := scanner.Text()
if strings.Contains(path, "subset=warc/") {
parquetPaths = append(parquetPaths, path)
}
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("error reading manifest: %w", err)
}
return parquetPaths, nil
}
// queryRemoteParquetWithRetry queries a remote parquet file with infinite retries and timeout
func (c *Crawler) queryRemoteParquetWithRetry(remoteURL, fileName string, writer *bufio.Writer, fileNum, totalFiles int) int {
retryCount := 0
for {
if c.IsShuttingDown() {
return 0
}
retryCount++
if retryCount > 1 {
fmt.Printf("CDX import: [%d/%d] %s - retry #%d\n", fileNum, totalFiles, fileName, retryCount-1)
time.Sleep(30 * time.Second) // Wait before retry
}
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), cdxFileTimeout)
// Channel for result
resultCh := make(chan int, 1)
errCh := make(chan error, 1)
go func() {
count, err := c.queryOneRemoteParquet(remoteURL, writer)
if err != nil {
errCh <- err
} else {
resultCh <- count
}
}()
select {
case <-ctx.Done():
cancel()
fmt.Printf("CDX import: [%d/%d] %s - timeout after %v, retrying...\n", fileNum, totalFiles, fileName, cdxFileTimeout)
continue
case err := <-errCh:
cancel()
fmt.Printf("CDX import: [%d/%d] %s - error: %v, retrying...\n", fileNum, totalFiles, fileName, err)
continue
case count := <-resultCh:
cancel()
return count
}
}
}
// queryOneRemoteParquet queries a single remote parquet file directly via httpfs
func (c *Crawler) queryOneRemoteParquet(remoteURL string, writer *bufio.Writer) (int, error) {
// Open DuckDB connection
db, err := sql.Open("duckdb", "")
if err != nil {
return 0, fmt.Errorf("duckdb open failed: %w", err)
}
defer db.Close()
// Configure httpfs with 100 minute timeout (in milliseconds)
if _, err := db.Exec("INSTALL httpfs; LOAD httpfs; SET http_timeout = 6000000;"); err != nil {
return 0, fmt.Errorf("httpfs setup failed: %w", err)
}
// Query remote parquet directly
// Include text/html because many feeds serve with wrong MIME type (e.g., news.ycombinator.com)
// False positives will be filtered during feed_check when detectFeedType returns "unknown"
query := fmt.Sprintf(`
SELECT DISTINCT url, url_host_name, content_mime_detected
FROM read_parquet('%s')
WHERE fetch_status = 200
AND content_mime_detected IN (
'application/rss+xml',
'application/atom+xml',
'application/feed+json',
'text/html'
)
`, remoteURL)
rows, err := db.Query(query)
if err != nil {
return 0, fmt.Errorf("query failed: %w", err)
}
defer rows.Close()
count := 0
for rows.Next() {
var feedURL, host, mime string
if err := rows.Scan(&feedURL, &host, &mime); err != nil {
continue
}
fmt.Fprintf(writer, "%s\t%s\t%s\n", feedURL, host, mime)
count++
}
if err := rows.Err(); err != nil {
return 0, fmt.Errorf("rows error: %w", err)
}
return count, nil
}
// processFeedsFile reads the TSV file and verifies/imports feeds
func (c *Crawler) processFeedsFile(feedsFile string) {
f, err := os.Open(feedsFile)
if err != nil {
fmt.Printf("CDX import: failed to open feeds file: %v\n", err)
return
}
defer f.Close()
scanner := bufio.NewScanner(f)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
var processed, skipped, verified int
startTime := time.Now()
for scanner.Scan() {
if c.IsShuttingDown() {
fmt.Println("CDX import: shutdown requested")
break
}
line := scanner.Text()
parts := strings.Split(line, "\t")
if len(parts) < 3 {
continue
}
feedURL := parts[0]
processed++
// Skip if we already have this feed
if c.feedExists(feedURL) {
skipped++
continue
}
// Verify and add the feed
if c.verifyAndAddFeed(feedURL) {
verified++
}
// Progress every 1000
if processed%1000 == 0 {
elapsed := time.Since(startTime)
rate := float64(processed) / elapsed.Seconds()
fmt.Printf("CDX import: processed %d, skipped %d, verified %d (%.1f/sec)\n",
processed, skipped, verified, rate)
}
// Rate limit
time.Sleep(50 * time.Millisecond)
}
fmt.Printf("CDX import: complete. Processed %d, skipped %d, verified %d feeds\n",
processed, skipped, verified)
}
// verifyAndAddFeed fetches a URL and adds it if it's a valid feed
func (c *Crawler) verifyAndAddFeed(feedURL string) bool {
// Ensure URL has scheme
if !strings.HasPrefix(feedURL, "http://") && !strings.HasPrefix(feedURL, "https://") {
feedURL = "https://" + feedURL
}
// Parse to get host
parsed, err := url.Parse(feedURL)
if err != nil {
return false
}
// Create request
req, err := http.NewRequest("GET", feedURL, nil)
if err != nil {
return false
}
req.Header.Set("User-Agent", c.UserAgent)
// Fetch
resp, err := c.client.Do(req)
if err != nil {
// Try http if https failed
if strings.HasPrefix(feedURL, "https://") {
feedURL = "http://" + strings.TrimPrefix(feedURL, "https://")
req, _ = http.NewRequest("GET", feedURL, nil)
req.Header.Set("User-Agent", c.UserAgent)
resp, err = c.client.Do(req)
if err != nil {
return false
}
} else {
return false
}
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return false
}
// Read body (limit to 5MB)
bodyBytes, err := io.ReadAll(io.LimitReader(resp.Body, 5*1024*1024))
if err != nil {
return false
}
body := string(bodyBytes)
// Detect feed type
feedType := c.detectFeedType(body)
if feedType == "unknown" {
return false
}
// Ensure domain exists (foreign key)
host := normalizeHost(parsed.Host)
tld := getTLD(host)
c.db.Exec(`
INSERT INTO domains (host, crawl_status, tld)
VALUES ($1, 'pass', $2)
ON CONFLICT(host, tld) DO NOTHING
`, stripTLD(host), tld)
// Process the feed
c.processFeed(feedURL, parsed.Host, body, resp.Header)
return true
}
// BulkImportFeedsFromTSV imports feeds directly from a TSV file without HTTP verification
// This is much faster than processFeedsFile - inserts millions of feeds in minutes
// Spam check is applied: spam feeds get IGNORE, others get STANDBY
func (c *Crawler) BulkImportFeedsFromTSV(tsvFile string) error {
f, err := os.Open(tsvFile)
if err != nil {
return fmt.Errorf("failed to open TSV file: %w", err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
var processed, inserted, ignored, skipped, errors int
startTime := time.Now()
for scanner.Scan() {
if c.IsShuttingDown() {
fmt.Println("CDX bulk import: shutdown requested")
break
}
line := scanner.Text()
parts := strings.Split(line, "\t")
if len(parts) < 3 {
continue
}
feedURL := parts[0]
host := parts[1]
mime := parts[2]
processed++
// Normalize URL
feedURL = normalizeURL(feedURL)
// Determine feed type from mime
// text/html becomes "html" - will be content-sniffed during feed_check
feedType := "unknown"
switch mime {
case "application/rss+xml":
feedType = "rss"
case "application/atom+xml":
feedType = "atom"
case "application/feed+json":
feedType = "json"
case "text/html":
feedType = "html" // Will be content-sniffed during feed_check
}
// Normalize host
host = normalizeHost(host)
hostWithoutTLD := stripTLD(host)
// Determine status based on spam check
// Note: language is empty for bulk import since we haven't fetched the feed yet
// So we only check domain-based spam patterns here
status := "STANDBY"
if isSpamDomain(hostWithoutTLD) {
status = "IGNORE"
ignored++
}
// Insert feed with status
rowsAffected, err := c.db.Exec(`
INSERT INTO feeds (url, type, status, miss_count)
VALUES ($1, $2, $3, 0)
ON CONFLICT(url) DO NOTHING
`, feedURL, feedType, status)
if err != nil {
errors++
skipped++
} else if rowsAffected > 0 {
inserted++
} else {
skipped++
}
// Progress update every 10000
if processed%10000 == 0 {
elapsed := time.Since(startTime)
rate := float64(processed) / elapsed.Seconds()
fmt.Printf("CDX bulk import: %d processed, %d inserted (%d ignored), %d skipped, %d errors (%.0f/sec)\n",
processed, inserted, ignored, skipped, errors, rate)
}
}
elapsed := time.Since(startTime)
fmt.Printf("CDX bulk import: complete. %d processed, %d inserted (%d ignored), %d skipped, %d errors in %v\n",
processed, inserted, ignored, skipped, errors, elapsed.Round(time.Second))
return scanner.Err()
}
// isSpamDomain checks domain-only spam patterns (for bulk import when language is unknown)
func isSpamDomain(host string) bool {
// Never spam our own domain
if host == "1440.news" || strings.HasSuffix(host, ".1440.news") {
return false
}
// Bare TLD (no dot)
if !strings.Contains(host, ".") {
return true
}
// Domain starts with digit
if len(host) > 0 && host[0] >= '0' && host[0] <= '9' {
return true
}
// Domain starts with letter-dash (e.g., "a-example.com")
if len(host) > 1 && ((host[0] >= 'a' && host[0] <= 'z') || (host[0] >= 'A' && host[0] <= 'Z')) && host[1] == '-' {
return true
}
return false
}
// getLatestCrawlID fetches the most recent crawl ID from Common Crawl
func (c *Crawler) getLatestCrawlID() string {
// Try data.commoncrawl.org crawl-data directory listing
resp, err := c.client.Get("https://data.commoncrawl.org/crawl-data/")
if err != nil {
fmt.Printf("CDX import: failed to fetch crawl-data listing: %v\n", err)
return "CC-MAIN-2026-04" // fallback to known recent crawl
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Printf("CDX import: failed to read crawl-data listing: %v\n", err)
return "CC-MAIN-2026-04"
}
// Find all CC-MAIN-YYYY-WW patterns and get the latest
content := string(body)
var latestCrawl string
idx := 0
for {
pos := strings.Index(content[idx:], "CC-MAIN-")
if pos == -1 {
break
}
idx += pos
if idx+15 <= len(content) {
crawlID := content[idx : idx+15]
// Validate format: CC-MAIN-YYYY-WW
if len(crawlID) == 15 && crawlID[8:12] >= "2020" {
if crawlID > latestCrawl {
latestCrawl = crawlID
}
}
}
idx += 8 // move past "CC-MAIN-"
}
if latestCrawl != "" {
return latestCrawl
}
fmt.Println("CDX import: no CC-MAIN pattern found, using fallback")
return "CC-MAIN-2026-04"
}