Files
crawler/domain.go
primal be595cb403 v100
2026-01-30 22:35:08 -05:00

435 lines
12 KiB
Go

package main
import (
"bufio"
"compress/gzip"
"context"
"fmt"
"io"
"os"
"strings"
"sync/atomic"
"time"
"github.com/jackc/pgx/v5"
)
// Domain represents a host to be crawled for feeds
// Status: hold (pending review), pass (approved), skip (not processing)
type Domain struct {
Host string `json:"host"`
Status string `json:"status"`
DiscoveredAt time.Time `json:"discovered_at"`
LastCheckedAt time.Time `json:"last_checked_at,omitempty"`
LastCrawledAt time.Time `json:"last_crawled_at,omitempty"`
FeedsFound int `json:"feeds_found,omitempty"`
LastError string `json:"last_error,omitempty"`
TLD string `json:"tld,omitempty"`
}
// shouldAutoSkipDomain checks if a domain should be auto-skipped based on patterns
func shouldAutoSkipDomain(host string) bool {
// Never skip our own domain
if strings.HasSuffix(host, "1440.news") || host == "1440.news" {
return false
}
// Skip bare TLDs (no dot means it's just "com", "net", etc.)
if !strings.Contains(host, ".") {
return true
}
// Skip domains starting with a digit (spam pattern)
if len(host) > 0 && host[0] >= '0' && host[0] <= '9' {
return true
}
// Skip domains starting with letter-dash (spam pattern, e.g., "a-example.com")
if len(host) > 1 && ((host[0] >= 'a' && host[0] <= 'z') || (host[0] >= 'A' && host[0] <= 'Z')) && host[1] == '-' {
return true
}
return false
}
// saveDomain stores a domain in PostgreSQL
func (c *Crawler) saveDomain(domain *Domain) error {
// Auto-skip domains matching spam patterns
status := domain.Status
if shouldAutoSkipDomain(domain.Host) {
status = "skip"
}
_, err := c.db.Exec(`
INSERT INTO domains (host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT(host) DO UPDATE SET
status = EXCLUDED.status,
last_checked_at = EXCLUDED.last_checked_at,
last_crawled_at = EXCLUDED.last_crawled_at,
feeds_found = EXCLUDED.feeds_found,
last_error = EXCLUDED.last_error,
tld = EXCLUDED.tld
`, domain.Host, status, domain.DiscoveredAt, NullableTime(domain.LastCheckedAt),
NullableTime(domain.LastCrawledAt), domain.FeedsFound, NullableString(domain.LastError), domain.TLD)
return err
}
// saveDomainTx stores a domain using a transaction
func (c *Crawler) saveDomainTx(tx pgx.Tx, domain *Domain) error {
// Auto-skip domains matching spam patterns
status := domain.Status
if shouldAutoSkipDomain(domain.Host) {
status = "skip"
}
_, err := tx.Exec(context.Background(), `
INSERT INTO domains (host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT(host) DO NOTHING
`, domain.Host, status, domain.DiscoveredAt, NullableTime(domain.LastCheckedAt),
NullableTime(domain.LastCrawledAt), domain.FeedsFound, NullableString(domain.LastError), domain.TLD)
return err
}
// domainExists checks if a domain already exists in the database
func (c *Crawler) domainExists(host string) bool {
var exists bool
err := c.db.QueryRow("SELECT EXISTS(SELECT 1 FROM domains WHERE host = $1)", normalizeHost(host)).Scan(&exists)
return err == nil && exists
}
// getDomain retrieves a domain from PostgreSQL
func (c *Crawler) getDomain(host string) (*Domain, error) {
domain := &Domain{}
var lastCheckedAt, lastCrawledAt *time.Time
var lastError *string
err := c.db.QueryRow(`
SELECT host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld
FROM domains WHERE host = $1
`, normalizeHost(host)).Scan(
&domain.Host, &domain.Status, &domain.DiscoveredAt, &lastCheckedAt, &lastCrawledAt,
&domain.FeedsFound, &lastError, &domain.TLD,
)
if err == pgx.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
domain.LastCheckedAt = TimeValue(lastCheckedAt)
domain.LastCrawledAt = TimeValue(lastCrawledAt)
domain.LastError = StringValue(lastError)
return domain, nil
}
// GetDomainsToCrawl returns domains ready for crawling (status='pass', not yet crawled)
func (c *Crawler) GetDomainsToCrawl(limit int) ([]*Domain, error) {
rows, err := c.db.Query(`
SELECT host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld
FROM domains WHERE status = 'pass' AND last_crawled_at IS NULL
ORDER BY discovered_at DESC
LIMIT $1
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return c.scanDomains(rows)
}
// scanDomains is a helper to scan multiple domain rows
func (c *Crawler) scanDomains(rows pgx.Rows) ([]*Domain, error) {
var domains []*Domain
for rows.Next() {
domain := &Domain{}
var lastCheckedAt, lastCrawledAt *time.Time
var lastError *string
if err := rows.Scan(
&domain.Host, &domain.Status, &domain.DiscoveredAt, &lastCheckedAt, &lastCrawledAt,
&domain.FeedsFound, &lastError, &domain.TLD,
); err != nil {
continue
}
domain.LastCheckedAt = TimeValue(lastCheckedAt)
domain.LastCrawledAt = TimeValue(lastCrawledAt)
domain.LastError = StringValue(lastError)
domains = append(domains, domain)
}
return domains, rows.Err()
}
// markDomainCrawled updates a domain after the crawl stage
func (c *Crawler) markDomainCrawled(host string, feedsFound int, lastError string) error {
now := time.Now()
if lastError != "" {
_, err := c.db.Exec(`
UPDATE domains SET last_crawled_at = $1, feeds_found = $2, last_error = $3
WHERE host = $4
`, now, feedsFound, lastError, normalizeHost(host))
return err
}
_, err := c.db.Exec(`
UPDATE domains SET last_crawled_at = $1, feeds_found = $2, last_error = NULL
WHERE host = $3
`, now, feedsFound, normalizeHost(host))
return err
}
// GetDomainCount returns the total number of domains and counts by status
func (c *Crawler) GetDomainCount() (total int, hold int, err error) {
err = c.db.QueryRow("SELECT COUNT(*) FROM domains").Scan(&total)
if err != nil {
return 0, 0, err
}
err = c.db.QueryRow("SELECT COUNT(*) FROM domains WHERE status = 'hold'").Scan(&hold)
return total, hold, err
}
// ImportTestDomains adds a list of specific domains for testing
func (c *Crawler) ImportTestDomains(domains []string) {
now := time.Now()
for _, host := range domains {
_, err := c.db.Exec(`
INSERT INTO domains (host, status, discovered_at, tld)
VALUES ($1, 'pass', $2, $3)
ON CONFLICT(host) DO NOTHING
`, host, now, getTLD(host))
if err != nil {
fmt.Printf("Error adding test domain %s: %v\n", host, err)
} else {
fmt.Printf("Added test domain: %s\n", host)
}
}
}
// ImportDomainsFromFile reads a vertices file and stores new domains as "pass"
func (c *Crawler) ImportDomainsFromFile(filename string, limit int) (imported int, skipped int, err error) {
file, err := os.Open(filename)
if err != nil {
return 0, 0, fmt.Errorf("failed to open file: %v", err)
}
defer file.Close()
return c.parseAndStoreDomains(file, limit)
}
// ImportDomainsInBackground starts domain import in a background goroutine
func (c *Crawler) ImportDomainsInBackground(filename string) {
go func() {
file, err := os.Open(filename)
if err != nil {
fmt.Printf("Failed to open vertices file: %v\n", err)
return
}
defer file.Close()
var bodyReader io.Reader
bufReader := bufio.NewReader(file)
peekBytes, err := bufReader.Peek(2)
if err != nil && err != io.EOF {
fmt.Printf("Failed to peek at file: %v\n", err)
return
}
if len(peekBytes) >= 2 && peekBytes[0] == 0x1f && peekBytes[1] == 0x8b {
gzReader, err := gzip.NewReader(bufReader)
if err != nil {
fmt.Printf("Failed to create gzip reader: %v\n", err)
return
}
defer gzReader.Close()
bodyReader = gzReader
} else {
bodyReader = bufReader
}
scanner := bufio.NewScanner(bodyReader)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
const batchSize = 100
now := time.Now()
totalImported := 0
batchCount := 0
type domainEntry struct {
host string
tld string
}
for {
// Read and canonicalize batch
var domains []domainEntry
for len(domains) < batchSize && scanner.Scan() {
line := scanner.Text()
parts := strings.Split(line, "\t")
if len(parts) >= 2 {
reverseHostName := strings.TrimSpace(parts[1])
if reverseHostName != "" {
host := normalizeHost(reverseHost(reverseHostName))
tld := getTLD(host)
domains = append(domains, domainEntry{host: host, tld: tld})
}
}
}
if len(domains) == 0 {
break
}
// Use COPY for bulk insert (much faster than individual INSERTs)
ctx := context.Background()
conn, err := c.db.Acquire(ctx)
if err != nil {
fmt.Printf("Failed to acquire connection: %v\n", err)
break
}
// Build rows for copy, applying auto-skip for spam patterns
rows := make([][]interface{}, len(domains))
for i, d := range domains {
status := "pass"
if shouldAutoSkipDomain(d.host) {
status = "skip"
}
rows[i] = []interface{}{d.host, status, now, d.tld}
}
// Use CopyFrom for bulk insert
imported, err := conn.CopyFrom(
ctx,
pgx.Identifier{"domains"},
[]string{"host", "status", "discovered_at", "tld"},
pgx.CopyFromRows(rows),
)
conn.Release()
if err != nil {
// Fall back to individual inserts with ON CONFLICT
for _, d := range domains {
status := "pass"
if shouldAutoSkipDomain(d.host) {
status = "skip"
}
c.db.Exec(`
INSERT INTO domains (host, status, discovered_at, tld)
VALUES ($1, $2, $3, $4)
ON CONFLICT(host) DO NOTHING
`, d.host, status, now, d.tld)
}
imported = int64(len(domains))
}
batchCount++
totalImported += int(imported)
atomic.AddInt32(&c.domainsImported, int32(imported))
fmt.Printf("Import batch %d: %d domains (total: %d)\n", batchCount, imported, totalImported)
// Wait 1 second before the next batch
time.Sleep(1 * time.Second)
}
if err := scanner.Err(); err != nil {
fmt.Printf("Error reading vertices file: %v\n", err)
}
fmt.Printf("Background import complete: %d domains imported\n", totalImported)
}()
}
func (c *Crawler) parseAndStoreDomains(reader io.Reader, limit int) (imported int, skipped int, err error) {
var bodyReader io.Reader
bufReader := bufio.NewReader(reader)
peekBytes, err := bufReader.Peek(2)
if err != nil && err != io.EOF {
return 0, 0, fmt.Errorf("failed to peek at file: %v", err)
}
if len(peekBytes) >= 2 && peekBytes[0] == 0x1f && peekBytes[1] == 0x8b {
gzReader, err := gzip.NewReader(bufReader)
if err != nil {
return 0, 0, fmt.Errorf("failed to create gzip reader: %v", err)
}
defer gzReader.Close()
bodyReader = gzReader
} else {
bodyReader = bufReader
}
scanner := bufio.NewScanner(bodyReader)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
now := time.Now()
count := 0
const batchSize = 100
type domainEntry struct {
host string
tld string
}
for {
// Read and canonicalize batch
var domains []domainEntry
for len(domains) < batchSize && scanner.Scan() {
if limit > 0 && count >= limit {
break
}
line := scanner.Text()
parts := strings.Split(line, "\t")
if len(parts) >= 2 {
reverseHostName := strings.TrimSpace(parts[1])
if reverseHostName != "" {
host := normalizeHost(reverseHost(reverseHostName))
domains = append(domains, domainEntry{host: host, tld: getTLD(host)})
count++
}
}
}
if len(domains) == 0 {
break
}
// Insert with ON CONFLICT, applying auto-skip for spam patterns
for _, d := range domains {
status := "pass"
if shouldAutoSkipDomain(d.host) {
status = "skip"
}
result, err := c.db.Exec(`
INSERT INTO domains (host, status, discovered_at, tld)
VALUES ($1, $2, $3, $4)
ON CONFLICT(host) DO NOTHING
`, d.host, status, now, d.tld)
if err != nil {
skipped++
} else if result > 0 {
imported++
} else {
skipped++
}
}
if limit > 0 && count >= limit {
break
}
}
if err := scanner.Err(); err != nil {
return imported, skipped, fmt.Errorf("error reading file: %v", err)
}
return imported, skipped, nil
}