package main import ( "bufio" "compress/gzip" "context" "fmt" "io" "os" "strings" "sync/atomic" "time" "github.com/jackc/pgx/v5" ) // Domain represents a host to process for feeds // Status: hold (pending review), pass (approved), skip (not processing) // CrawledAt: zero time = needs domain_check, +1 sec = needs feed_crawl, real time = done type Domain struct { Host string `json:"host"` Status string `json:"status"` CrawledAt time.Time `json:"crawled_at"` FeedsFound int `json:"feeds_found,omitempty"` LastError string `json:"last_error,omitempty"` TLD string `json:"tld,omitempty"` MissCount int `json:"miss_count,omitempty"` } // MissCountThreshold is the number of consecutive errors before setting status to hold const MissCountThreshold = 100 // ErrorRetryDelay is how long to wait before retrying a domain with errors (1 hour minimum) // At 100 seconds actual rate due to queue, 100 misses = ~2.8 hours // At 1 hour minimum delay, 100 misses = ~4+ days in practice var ErrorRetryDelay = 1 * time.Hour // FullHost returns the complete hostname (host + tld) func (d *Domain) FullHost() string { return fullHost(d.Host, d.TLD) } // Sentinel values for domain processing state var ( DomainStateUnchecked = time.Time{} // 0001-01-01 00:00:00 - needs domain_check DomainStateChecked = time.Time{}.Add(time.Second) // 0001-01-01 00:00:01 - needs feed_crawl ) // shouldAutoSkipDomain checks if a domain should be auto-skipped based on patterns func shouldAutoSkipDomain(host string) bool { // Never skip our own domain if strings.HasSuffix(host, "1440.news") || host == "1440.news" { return false } // Skip bare TLDs (no dot means it's just "com", "net", etc.) if !strings.Contains(host, ".") { return true } // Skip domains starting with a digit (spam pattern) if len(host) > 0 && host[0] >= '0' && host[0] <= '9' { return true } // Skip domains starting with letter-dash (spam pattern, e.g., "a-example.com") if len(host) > 1 && ((host[0] >= 'a' && host[0] <= 'z') || (host[0] >= 'A' && host[0] <= 'Z')) && host[1] == '-' { return true } return false } // saveDomain stores a domain in PostgreSQL func (c *Crawler) saveDomain(domain *Domain) error { // Auto-skip domains matching spam patterns fh := domain.FullHost() status := domain.Status if shouldAutoSkipDomain(fh) { status = "skip" } _, err := c.db.Exec(` INSERT INTO domains (host, status, crawled_at, feeds_found, last_error, tld) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT(host, tld) DO UPDATE SET status = EXCLUDED.status, crawled_at = EXCLUDED.crawled_at, feeds_found = EXCLUDED.feeds_found, last_error = EXCLUDED.last_error `, stripTLD(fh), status, domain.CrawledAt, domain.FeedsFound, NullableString(domain.LastError), domain.TLD) return err } // saveDomainTx stores a domain using a transaction func (c *Crawler) saveDomainTx(tx pgx.Tx, domain *Domain) error { // Auto-skip domains matching spam patterns fh := domain.FullHost() status := domain.Status if shouldAutoSkipDomain(fh) { status = "skip" } _, err := tx.Exec(context.Background(), ` INSERT INTO domains (host, status, crawled_at, feeds_found, last_error, tld) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT(host, tld) DO NOTHING `, stripTLD(fh), status, domain.CrawledAt, domain.FeedsFound, NullableString(domain.LastError), domain.TLD) return err } // domainExists checks if a domain already exists in the database func (c *Crawler) domainExists(host string) bool { host = normalizeHost(host) var exists bool err := c.db.QueryRow("SELECT EXISTS(SELECT 1 FROM domains WHERE host = $1 AND tld = $2)", stripTLD(host), getTLD(host)).Scan(&exists) return err == nil && exists } // getDomain retrieves a domain from PostgreSQL func (c *Crawler) getDomain(host string) (*Domain, error) { host = normalizeHost(host) domain := &Domain{} var lastError *string err := c.db.QueryRow(` SELECT host, tld, status, crawled_at, feeds_found, last_error FROM domains WHERE host = $1 AND tld = $2 `, stripTLD(host), getTLD(host)).Scan( &domain.Host, &domain.TLD, &domain.Status, &domain.CrawledAt, &domain.FeedsFound, &lastError, ) if err == pgx.ErrNoRows { return nil, nil } if err != nil { return nil, err } domain.LastError = StringValue(lastError) return domain, nil } // GetDomainsToCheck returns unchecked domains needing DNS lookup (domain_check) // crawled_at = zero time means needs domain_check func (c *Crawler) GetDomainsToCheck(limit int) ([]*Domain, error) { rows, err := c.db.Query(` SELECT host, status, crawled_at, feeds_found, last_error, tld FROM domains WHERE status = 'pass' AND crawled_at = '0001-01-01 00:00:00' AND last_error IS NULL LIMIT $1 `, limit) if err != nil { return nil, err } defer rows.Close() return c.scanDomains(rows) } // GetDomainsToCrawl returns DNS-verified domains needing feed discovery (feed_crawl) // crawled_at = +1 sec means passed DNS check, ready for crawl // Also includes domains with errors that are due for retry func (c *Crawler) GetDomainsToCrawl(limit int) ([]*Domain, error) { now := time.Now() rows, err := c.db.Query(` SELECT host, status, crawled_at, feeds_found, last_error, tld FROM domains WHERE status = 'pass' AND ( (crawled_at = '0001-01-01 00:00:01' AND last_error IS NULL) -- passed DNS, ready to crawl OR (crawled_at < $1 AND crawled_at > '0001-01-01 00:00:01' AND last_error IS NOT NULL) -- retry errors ) ORDER BY last_error IS NULL DESC, crawled_at ASC LIMIT $2 `, now, limit) if err != nil { return nil, err } defer rows.Close() return c.scanDomains(rows) } // markDomainChecked updates a domain after domain_check (sets to +1 sec for feed_crawl) // host parameter should be the stripped host (without TLD) func (c *Crawler) markDomainChecked(host, tld, lastError string) error { if lastError != "" { // Increment miss_count, set to 'hold' only at threshold // Schedule retry after ErrorRetryDelay retryAt := time.Now().Add(ErrorRetryDelay) _, err := c.db.Exec(` UPDATE domains SET crawled_at = $1, last_error = $2, miss_count = miss_count + 1, status = CASE WHEN miss_count + 1 >= $3 THEN 'hold' ELSE status END WHERE host = $4 AND tld = $5 `, retryAt, lastError, MissCountThreshold, host, tld) return err } // Success - reset miss_count _, err := c.db.Exec(` UPDATE domains SET crawled_at = $1, last_error = NULL, miss_count = 0 WHERE host = $2 AND tld = $3 `, DomainStateChecked, host, tld) return err } // scanDomains is a helper to scan multiple domain rows func (c *Crawler) scanDomains(rows pgx.Rows) ([]*Domain, error) { var domains []*Domain for rows.Next() { domain := &Domain{} var lastError *string if err := rows.Scan( &domain.Host, &domain.Status, &domain.CrawledAt, &domain.FeedsFound, &lastError, &domain.TLD, ); err != nil { continue } domain.LastError = StringValue(lastError) domains = append(domains, domain) } return domains, rows.Err() } // markDomainCrawled updates a domain after feed_crawl (sets to NOW()) // host parameter should be the stripped host (without TLD) func (c *Crawler) markDomainCrawled(host, tld string, feedsFound int, lastError string) error { if lastError != "" { // Increment miss_count, set to 'hold' only at threshold // Schedule retry after ErrorRetryDelay retryAt := time.Now().Add(ErrorRetryDelay) _, err := c.db.Exec(` UPDATE domains SET crawled_at = $1, feeds_found = $2, last_error = $3, miss_count = miss_count + 1, status = CASE WHEN miss_count + 1 >= $4 THEN 'hold' ELSE status END WHERE host = $5 AND tld = $6 `, retryAt, feedsFound, lastError, MissCountThreshold, host, tld) return err } // Success - reset miss_count now := time.Now() _, err := c.db.Exec(` UPDATE domains SET crawled_at = $1, feeds_found = $2, last_error = NULL, miss_count = 0 WHERE host = $3 AND tld = $4 `, now, feedsFound, host, tld) return err } // GetDomainCount returns the total number of domains and counts by status func (c *Crawler) GetDomainCount() (total int, hold int, err error) { err = c.db.QueryRow("SELECT COUNT(*) FROM domains").Scan(&total) if err != nil { return 0, 0, err } err = c.db.QueryRow("SELECT COUNT(*) FROM domains WHERE status = 'hold'").Scan(&hold) return total, hold, err } // ImportTestDomains adds a list of specific domains for testing func (c *Crawler) ImportTestDomains(domains []string) { for _, host := range domains { host = normalizeHost(host) _, err := c.db.Exec(` INSERT INTO domains (host, status, tld) VALUES ($1, 'pass', $2) ON CONFLICT(host, tld) DO NOTHING `, stripTLD(host), getTLD(host)) if err != nil { fmt.Printf("Error adding test domain %s: %v\n", host, err) } else { fmt.Printf("Added test domain: %s\n", host) } } } // ImportDomainsFromFile reads a vertices file and stores new domains as "pass" func (c *Crawler) ImportDomainsFromFile(filename string, limit int) (imported int, skipped int, err error) { file, err := os.Open(filename) if err != nil { return 0, 0, fmt.Errorf("failed to open file: %v", err) } defer file.Close() return c.parseAndStoreDomains(file, limit) } // ImportDomainsInBackground starts domain import in a background goroutine func (c *Crawler) ImportDomainsInBackground(filename string) { go func() { file, err := os.Open(filename) if err != nil { fmt.Printf("Failed to open vertices file: %v\n", err) return } defer file.Close() var bodyReader io.Reader bufReader := bufio.NewReader(file) peekBytes, err := bufReader.Peek(2) if err != nil && err != io.EOF { fmt.Printf("Failed to peek at file: %v\n", err) return } if len(peekBytes) >= 2 && peekBytes[0] == 0x1f && peekBytes[1] == 0x8b { gzReader, err := gzip.NewReader(bufReader) if err != nil { fmt.Printf("Failed to create gzip reader: %v\n", err) return } defer gzReader.Close() bodyReader = gzReader } else { bodyReader = bufReader } scanner := bufio.NewScanner(bodyReader) buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, 1024*1024) const batchSize = 100 totalImported := 0 batchCount := 0 type domainEntry struct { host string tld string } for { // Read and canonicalize batch var domains []domainEntry for len(domains) < batchSize && scanner.Scan() { line := scanner.Text() parts := strings.Split(line, "\t") if len(parts) >= 2 { reverseHostName := strings.TrimSpace(parts[1]) if reverseHostName != "" { host := normalizeHost(reverseHost(reverseHostName)) tld := getTLD(host) domains = append(domains, domainEntry{host: host, tld: tld}) } } } if len(domains) == 0 { break } // Use COPY for bulk insert (much faster than individual INSERTs) ctx := context.Background() conn, err := c.db.Acquire(ctx) if err != nil { fmt.Printf("Failed to acquire connection: %v\n", err) break } // Build rows for copy, applying auto-skip for spam patterns rows := make([][]interface{}, len(domains)) for i, d := range domains { status := "pass" if shouldAutoSkipDomain(d.host) { status = "skip" } rows[i] = []interface{}{stripTLD(d.host), status, d.tld} } // Use CopyFrom for bulk insert imported, err := conn.CopyFrom( ctx, pgx.Identifier{"domains"}, []string{"host", "status", "tld"}, pgx.CopyFromRows(rows), ) conn.Release() if err != nil { // Fall back to individual inserts with ON CONFLICT for _, d := range domains { status := "pass" if shouldAutoSkipDomain(d.host) { status = "skip" } c.db.Exec(` INSERT INTO domains (host, status, tld) VALUES ($1, $2, $3) ON CONFLICT(host, tld) DO NOTHING `, stripTLD(d.host), status, d.tld) } imported = int64(len(domains)) } batchCount++ totalImported += int(imported) atomic.AddInt32(&c.domainsImported, int32(imported)) fmt.Printf("Import batch %d: %d domains (total: %d)\n", batchCount, imported, totalImported) // Wait 1 second before the next batch time.Sleep(1 * time.Second) } if err := scanner.Err(); err != nil { fmt.Printf("Error reading vertices file: %v\n", err) } fmt.Printf("Background import complete: %d domains imported\n", totalImported) }() } func (c *Crawler) parseAndStoreDomains(reader io.Reader, limit int) (imported int, skipped int, err error) { var bodyReader io.Reader bufReader := bufio.NewReader(reader) peekBytes, err := bufReader.Peek(2) if err != nil && err != io.EOF { return 0, 0, fmt.Errorf("failed to peek at file: %v", err) } if len(peekBytes) >= 2 && peekBytes[0] == 0x1f && peekBytes[1] == 0x8b { gzReader, err := gzip.NewReader(bufReader) if err != nil { return 0, 0, fmt.Errorf("failed to create gzip reader: %v", err) } defer gzReader.Close() bodyReader = gzReader } else { bodyReader = bufReader } scanner := bufio.NewScanner(bodyReader) buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, 1024*1024) count := 0 const batchSize = 100 type domainEntry struct { host string tld string } for { // Read and canonicalize batch var domains []domainEntry for len(domains) < batchSize && scanner.Scan() { if limit > 0 && count >= limit { break } line := scanner.Text() parts := strings.Split(line, "\t") if len(parts) >= 2 { reverseHostName := strings.TrimSpace(parts[1]) if reverseHostName != "" { host := normalizeHost(reverseHost(reverseHostName)) domains = append(domains, domainEntry{host: host, tld: getTLD(host)}) count++ } } } if len(domains) == 0 { break } // Insert with ON CONFLICT, applying auto-skip for spam patterns for _, d := range domains { status := "pass" if shouldAutoSkipDomain(d.host) { status = "skip" } result, err := c.db.Exec(` INSERT INTO domains (host, status, tld) VALUES ($1, $2, $3) ON CONFLICT(host, tld) DO NOTHING `, stripTLD(d.host), status, d.tld) if err != nil { skipped++ } else if result > 0 { imported++ } else { skipped++ } } if limit > 0 && count >= limit { break } } if err := scanner.Err(); err != nil { return imported, skipped, fmt.Errorf("error reading file: %v", err) } return imported, skipped, nil }