- Import batch size: 100 (was 100k) - Domain check/crawl fetch size: 100 (was 1000) - Feed check fetch size: 100 (was 1000) - Worker count: 100 fixed (was NumCPU) - Channel buffers: 100 (was 256) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
468 lines
13 KiB
Go
468 lines
13 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"compress/gzip"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
// Domain represents a host to be crawled for feeds
|
|
// Status: hold (pending review), pass (approved), skip (not processing), fail (error)
|
|
type Domain struct {
|
|
Host string `json:"host"`
|
|
Status string `json:"status"`
|
|
DiscoveredAt time.Time `json:"discovered_at"`
|
|
LastCheckedAt time.Time `json:"last_checked_at,omitempty"`
|
|
LastCrawledAt time.Time `json:"last_crawled_at,omitempty"`
|
|
FeedsFound int `json:"feeds_found,omitempty"`
|
|
LastError string `json:"last_error,omitempty"`
|
|
TLD string `json:"tld,omitempty"`
|
|
}
|
|
|
|
// shouldAutoSkipDomain checks if a domain should be auto-skipped based on patterns
|
|
func shouldAutoSkipDomain(host string) bool {
|
|
// Never skip our own domain
|
|
if strings.HasSuffix(host, "1440.news") || host == "1440.news" {
|
|
return false
|
|
}
|
|
// Skip bare TLDs (no dot means it's just "com", "net", etc.)
|
|
if !strings.Contains(host, ".") {
|
|
return true
|
|
}
|
|
// Skip domains starting with a digit (spam pattern)
|
|
if len(host) > 0 && host[0] >= '0' && host[0] <= '9' {
|
|
return true
|
|
}
|
|
// Skip domains starting with letter-dash (spam pattern, e.g., "a-example.com")
|
|
if len(host) > 1 && ((host[0] >= 'a' && host[0] <= 'z') || (host[0] >= 'A' && host[0] <= 'Z')) && host[1] == '-' {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// saveDomain stores a domain in PostgreSQL
|
|
func (c *Crawler) saveDomain(domain *Domain) error {
|
|
// Auto-skip domains matching spam patterns
|
|
status := domain.Status
|
|
if shouldAutoSkipDomain(domain.Host) {
|
|
status = "skip"
|
|
}
|
|
|
|
_, err := c.db.Exec(`
|
|
INSERT INTO domains (host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
ON CONFLICT(host) DO UPDATE SET
|
|
status = EXCLUDED.status,
|
|
last_checked_at = EXCLUDED.last_checked_at,
|
|
last_crawled_at = EXCLUDED.last_crawled_at,
|
|
feeds_found = EXCLUDED.feeds_found,
|
|
last_error = EXCLUDED.last_error,
|
|
tld = EXCLUDED.tld
|
|
`, domain.Host, status, domain.DiscoveredAt, NullableTime(domain.LastCheckedAt),
|
|
NullableTime(domain.LastCrawledAt), domain.FeedsFound, NullableString(domain.LastError), domain.TLD)
|
|
return err
|
|
}
|
|
|
|
// saveDomainTx stores a domain using a transaction
|
|
func (c *Crawler) saveDomainTx(tx pgx.Tx, domain *Domain) error {
|
|
// Auto-skip domains matching spam patterns
|
|
status := domain.Status
|
|
if shouldAutoSkipDomain(domain.Host) {
|
|
status = "skip"
|
|
}
|
|
|
|
_, err := tx.Exec(context.Background(), `
|
|
INSERT INTO domains (host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
ON CONFLICT(host) DO NOTHING
|
|
`, domain.Host, status, domain.DiscoveredAt, NullableTime(domain.LastCheckedAt),
|
|
NullableTime(domain.LastCrawledAt), domain.FeedsFound, NullableString(domain.LastError), domain.TLD)
|
|
return err
|
|
}
|
|
|
|
// domainExists checks if a domain already exists in the database
|
|
func (c *Crawler) domainExists(host string) bool {
|
|
var exists bool
|
|
err := c.db.QueryRow("SELECT EXISTS(SELECT 1 FROM domains WHERE host = $1)", normalizeHost(host)).Scan(&exists)
|
|
return err == nil && exists
|
|
}
|
|
|
|
// getDomain retrieves a domain from PostgreSQL
|
|
func (c *Crawler) getDomain(host string) (*Domain, error) {
|
|
domain := &Domain{}
|
|
var lastCheckedAt, lastCrawledAt *time.Time
|
|
var lastError *string
|
|
|
|
err := c.db.QueryRow(`
|
|
SELECT host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld
|
|
FROM domains WHERE host = $1
|
|
`, normalizeHost(host)).Scan(
|
|
&domain.Host, &domain.Status, &domain.DiscoveredAt, &lastCheckedAt, &lastCrawledAt,
|
|
&domain.FeedsFound, &lastError, &domain.TLD,
|
|
)
|
|
|
|
if err == pgx.ErrNoRows {
|
|
return nil, nil
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
domain.LastCheckedAt = TimeValue(lastCheckedAt)
|
|
domain.LastCrawledAt = TimeValue(lastCrawledAt)
|
|
domain.LastError = StringValue(lastError)
|
|
|
|
return domain, nil
|
|
}
|
|
|
|
// GetDomainsToCheck returns domains ready for checking (status='pass', never checked)
|
|
func (c *Crawler) GetDomainsToCheck(limit int) ([]*Domain, error) {
|
|
rows, err := c.db.Query(`
|
|
SELECT host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld
|
|
FROM domains WHERE status = 'pass' AND last_checked_at IS NULL
|
|
ORDER BY discovered_at ASC
|
|
LIMIT $1
|
|
`, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
return c.scanDomains(rows)
|
|
}
|
|
|
|
// GetDomainsToCrawl returns domains ready for crawling (status='pass', checked but not crawled)
|
|
func (c *Crawler) GetDomainsToCrawl(limit int) ([]*Domain, error) {
|
|
rows, err := c.db.Query(`
|
|
SELECT host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld
|
|
FROM domains WHERE status = 'pass' AND last_checked_at IS NOT NULL AND last_crawled_at IS NULL
|
|
ORDER BY discovered_at ASC
|
|
LIMIT $1
|
|
`, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
return c.scanDomains(rows)
|
|
}
|
|
|
|
// scanDomains is a helper to scan multiple domain rows
|
|
func (c *Crawler) scanDomains(rows pgx.Rows) ([]*Domain, error) {
|
|
var domains []*Domain
|
|
for rows.Next() {
|
|
domain := &Domain{}
|
|
var lastCheckedAt, lastCrawledAt *time.Time
|
|
var lastError *string
|
|
|
|
if err := rows.Scan(
|
|
&domain.Host, &domain.Status, &domain.DiscoveredAt, &lastCheckedAt, &lastCrawledAt,
|
|
&domain.FeedsFound, &lastError, &domain.TLD,
|
|
); err != nil {
|
|
continue
|
|
}
|
|
|
|
domain.LastCheckedAt = TimeValue(lastCheckedAt)
|
|
domain.LastCrawledAt = TimeValue(lastCrawledAt)
|
|
domain.LastError = StringValue(lastError)
|
|
|
|
domains = append(domains, domain)
|
|
}
|
|
|
|
return domains, rows.Err()
|
|
}
|
|
|
|
// markDomainChecked updates a domain after the check (HEAD request) stage
|
|
func (c *Crawler) markDomainChecked(host string, lastError string) error {
|
|
now := time.Now()
|
|
if lastError != "" {
|
|
_, err := c.db.Exec(`
|
|
UPDATE domains SET status = 'fail', last_checked_at = $1, last_error = $2
|
|
WHERE host = $3
|
|
`, now, lastError, normalizeHost(host))
|
|
return err
|
|
}
|
|
_, err := c.db.Exec(`
|
|
UPDATE domains SET last_checked_at = $1, last_error = NULL
|
|
WHERE host = $2
|
|
`, now, normalizeHost(host))
|
|
return err
|
|
}
|
|
|
|
// markDomainCrawled updates a domain after the crawl stage
|
|
func (c *Crawler) markDomainCrawled(host string, feedsFound int, lastError string) error {
|
|
now := time.Now()
|
|
if lastError != "" {
|
|
_, err := c.db.Exec(`
|
|
UPDATE domains SET status = 'fail', last_crawled_at = $1, feeds_found = $2, last_error = $3
|
|
WHERE host = $4
|
|
`, now, feedsFound, lastError, normalizeHost(host))
|
|
return err
|
|
}
|
|
_, err := c.db.Exec(`
|
|
UPDATE domains SET last_crawled_at = $1, feeds_found = $2, last_error = NULL
|
|
WHERE host = $3
|
|
`, now, feedsFound, normalizeHost(host))
|
|
return err
|
|
}
|
|
|
|
// GetDomainCount returns the total number of domains and counts by status
|
|
func (c *Crawler) GetDomainCount() (total int, hold int, err error) {
|
|
err = c.db.QueryRow("SELECT COUNT(*) FROM domains").Scan(&total)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
err = c.db.QueryRow("SELECT COUNT(*) FROM domains WHERE status = 'hold'").Scan(&hold)
|
|
return total, hold, err
|
|
}
|
|
|
|
// ImportTestDomains adds a list of specific domains for testing
|
|
func (c *Crawler) ImportTestDomains(domains []string) {
|
|
now := time.Now()
|
|
for _, host := range domains {
|
|
_, err := c.db.Exec(`
|
|
INSERT INTO domains (host, status, discovered_at, tld)
|
|
VALUES ($1, 'pass', $2, $3)
|
|
ON CONFLICT(host) DO NOTHING
|
|
`, host, now, getTLD(host))
|
|
if err != nil {
|
|
fmt.Printf("Error adding test domain %s: %v\n", host, err)
|
|
} else {
|
|
fmt.Printf("Added test domain: %s\n", host)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ImportDomainsFromFile reads a vertices file and stores new domains as "pass"
|
|
func (c *Crawler) ImportDomainsFromFile(filename string, limit int) (imported int, skipped int, err error) {
|
|
file, err := os.Open(filename)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("failed to open file: %v", err)
|
|
}
|
|
defer file.Close()
|
|
|
|
return c.parseAndStoreDomains(file, limit)
|
|
}
|
|
|
|
// ImportDomainsInBackground starts domain import in a background goroutine
|
|
func (c *Crawler) ImportDomainsInBackground(filename string) {
|
|
go func() {
|
|
file, err := os.Open(filename)
|
|
if err != nil {
|
|
fmt.Printf("Failed to open vertices file: %v\n", err)
|
|
return
|
|
}
|
|
defer file.Close()
|
|
|
|
var bodyReader io.Reader
|
|
|
|
bufReader := bufio.NewReader(file)
|
|
peekBytes, err := bufReader.Peek(2)
|
|
if err != nil && err != io.EOF {
|
|
fmt.Printf("Failed to peek at file: %v\n", err)
|
|
return
|
|
}
|
|
|
|
if len(peekBytes) >= 2 && peekBytes[0] == 0x1f && peekBytes[1] == 0x8b {
|
|
gzReader, err := gzip.NewReader(bufReader)
|
|
if err != nil {
|
|
fmt.Printf("Failed to create gzip reader: %v\n", err)
|
|
return
|
|
}
|
|
defer gzReader.Close()
|
|
bodyReader = gzReader
|
|
} else {
|
|
bodyReader = bufReader
|
|
}
|
|
|
|
scanner := bufio.NewScanner(bodyReader)
|
|
buf := make([]byte, 0, 64*1024)
|
|
scanner.Buffer(buf, 1024*1024)
|
|
|
|
const batchSize = 100
|
|
now := time.Now()
|
|
totalImported := 0
|
|
batchCount := 0
|
|
|
|
type domainEntry struct {
|
|
host string
|
|
tld string
|
|
}
|
|
|
|
for {
|
|
// Read and canonicalize batch
|
|
var domains []domainEntry
|
|
for len(domains) < batchSize && scanner.Scan() {
|
|
line := scanner.Text()
|
|
parts := strings.Split(line, "\t")
|
|
if len(parts) >= 2 {
|
|
reverseHostName := strings.TrimSpace(parts[1])
|
|
if reverseHostName != "" {
|
|
host := normalizeHost(reverseHost(reverseHostName))
|
|
tld := getTLD(host)
|
|
domains = append(domains, domainEntry{host: host, tld: tld})
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(domains) == 0 {
|
|
break
|
|
}
|
|
|
|
// Use COPY for bulk insert (much faster than individual INSERTs)
|
|
ctx := context.Background()
|
|
conn, err := c.db.Acquire(ctx)
|
|
if err != nil {
|
|
fmt.Printf("Failed to acquire connection: %v\n", err)
|
|
break
|
|
}
|
|
|
|
// Build rows for copy, applying auto-skip for spam patterns
|
|
rows := make([][]interface{}, len(domains))
|
|
for i, d := range domains {
|
|
status := "pass"
|
|
if shouldAutoSkipDomain(d.host) {
|
|
status = "skip"
|
|
}
|
|
rows[i] = []interface{}{d.host, status, now, d.tld}
|
|
}
|
|
|
|
// Use CopyFrom for bulk insert
|
|
imported, err := conn.CopyFrom(
|
|
ctx,
|
|
pgx.Identifier{"domains"},
|
|
[]string{"host", "status", "discovered_at", "tld"},
|
|
pgx.CopyFromRows(rows),
|
|
)
|
|
conn.Release()
|
|
|
|
if err != nil {
|
|
// Fall back to individual inserts with ON CONFLICT
|
|
for _, d := range domains {
|
|
status := "pass"
|
|
if shouldAutoSkipDomain(d.host) {
|
|
status = "skip"
|
|
}
|
|
c.db.Exec(`
|
|
INSERT INTO domains (host, status, discovered_at, tld)
|
|
VALUES ($1, $2, $3, $4)
|
|
ON CONFLICT(host) DO NOTHING
|
|
`, d.host, status, now, d.tld)
|
|
}
|
|
imported = int64(len(domains))
|
|
}
|
|
|
|
batchCount++
|
|
totalImported += int(imported)
|
|
atomic.AddInt32(&c.domainsImported, int32(imported))
|
|
|
|
fmt.Printf("Import batch %d: %d domains (total: %d)\n", batchCount, imported, totalImported)
|
|
|
|
// Wait 1 second before the next batch
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
fmt.Printf("Error reading vertices file: %v\n", err)
|
|
}
|
|
|
|
fmt.Printf("Background import complete: %d domains imported\n", totalImported)
|
|
}()
|
|
}
|
|
|
|
func (c *Crawler) parseAndStoreDomains(reader io.Reader, limit int) (imported int, skipped int, err error) {
|
|
var bodyReader io.Reader
|
|
|
|
bufReader := bufio.NewReader(reader)
|
|
peekBytes, err := bufReader.Peek(2)
|
|
if err != nil && err != io.EOF {
|
|
return 0, 0, fmt.Errorf("failed to peek at file: %v", err)
|
|
}
|
|
|
|
if len(peekBytes) >= 2 && peekBytes[0] == 0x1f && peekBytes[1] == 0x8b {
|
|
gzReader, err := gzip.NewReader(bufReader)
|
|
if err != nil {
|
|
return 0, 0, fmt.Errorf("failed to create gzip reader: %v", err)
|
|
}
|
|
defer gzReader.Close()
|
|
bodyReader = gzReader
|
|
} else {
|
|
bodyReader = bufReader
|
|
}
|
|
|
|
scanner := bufio.NewScanner(bodyReader)
|
|
buf := make([]byte, 0, 64*1024)
|
|
scanner.Buffer(buf, 1024*1024)
|
|
|
|
now := time.Now()
|
|
count := 0
|
|
const batchSize = 100
|
|
|
|
type domainEntry struct {
|
|
host string
|
|
tld string
|
|
}
|
|
|
|
for {
|
|
// Read and canonicalize batch
|
|
var domains []domainEntry
|
|
for len(domains) < batchSize && scanner.Scan() {
|
|
if limit > 0 && count >= limit {
|
|
break
|
|
}
|
|
line := scanner.Text()
|
|
parts := strings.Split(line, "\t")
|
|
if len(parts) >= 2 {
|
|
reverseHostName := strings.TrimSpace(parts[1])
|
|
if reverseHostName != "" {
|
|
host := normalizeHost(reverseHost(reverseHostName))
|
|
domains = append(domains, domainEntry{host: host, tld: getTLD(host)})
|
|
count++
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(domains) == 0 {
|
|
break
|
|
}
|
|
|
|
// Insert with ON CONFLICT, applying auto-skip for spam patterns
|
|
for _, d := range domains {
|
|
status := "pass"
|
|
if shouldAutoSkipDomain(d.host) {
|
|
status = "skip"
|
|
}
|
|
result, err := c.db.Exec(`
|
|
INSERT INTO domains (host, status, discovered_at, tld)
|
|
VALUES ($1, $2, $3, $4)
|
|
ON CONFLICT(host) DO NOTHING
|
|
`, d.host, status, now, d.tld)
|
|
if err != nil {
|
|
skipped++
|
|
} else if result > 0 {
|
|
imported++
|
|
} else {
|
|
skipped++
|
|
}
|
|
}
|
|
|
|
if limit > 0 && count >= limit {
|
|
break
|
|
}
|
|
}
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
return imported, skipped, fmt.Errorf("error reading file: %v", err)
|
|
}
|
|
|
|
return imported, skipped, nil
|
|
}
|