package main import ( "bufio" "compress/gzip" "context" "fmt" "io" "os" "strings" "sync/atomic" "time" "github.com/jackc/pgx/v5" ) // Domain represents a host to be crawled for feeds // Status: hold (pending review), pass (approved), skip (not processing), fail (error) type Domain struct { Host string `json:"host"` Status string `json:"status"` DiscoveredAt time.Time `json:"discovered_at"` LastCheckedAt time.Time `json:"last_checked_at,omitempty"` LastCrawledAt time.Time `json:"last_crawled_at,omitempty"` FeedsFound int `json:"feeds_found,omitempty"` LastError string `json:"last_error,omitempty"` TLD string `json:"tld,omitempty"` } // shouldAutoSkipDomain checks if a domain should be auto-skipped based on patterns func shouldAutoSkipDomain(host string) bool { // Never skip our own domain if strings.HasSuffix(host, "1440.news") || host == "1440.news" { return false } // Skip bare TLDs (no dot means it's just "com", "net", etc.) if !strings.Contains(host, ".") { return true } // Skip domains starting with a digit (spam pattern) if len(host) > 0 && host[0] >= '0' && host[0] <= '9' { return true } // Skip domains starting with letter-dash (spam pattern, e.g., "a-example.com") if len(host) > 1 && ((host[0] >= 'a' && host[0] <= 'z') || (host[0] >= 'A' && host[0] <= 'Z')) && host[1] == '-' { return true } return false } // saveDomain stores a domain in PostgreSQL func (c *Crawler) saveDomain(domain *Domain) error { // Auto-skip domains matching spam patterns status := domain.Status if shouldAutoSkipDomain(domain.Host) { status = "skip" } _, err := c.db.Exec(` INSERT INTO domains (host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT(host) DO UPDATE SET status = EXCLUDED.status, last_checked_at = EXCLUDED.last_checked_at, last_crawled_at = EXCLUDED.last_crawled_at, feeds_found = EXCLUDED.feeds_found, last_error = EXCLUDED.last_error, tld = EXCLUDED.tld `, domain.Host, status, domain.DiscoveredAt, NullableTime(domain.LastCheckedAt), NullableTime(domain.LastCrawledAt), domain.FeedsFound, NullableString(domain.LastError), domain.TLD) return err } // saveDomainTx stores a domain using a transaction func (c *Crawler) saveDomainTx(tx pgx.Tx, domain *Domain) error { // Auto-skip domains matching spam patterns status := domain.Status if shouldAutoSkipDomain(domain.Host) { status = "skip" } _, err := tx.Exec(context.Background(), ` INSERT INTO domains (host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT(host) DO NOTHING `, domain.Host, status, domain.DiscoveredAt, NullableTime(domain.LastCheckedAt), NullableTime(domain.LastCrawledAt), domain.FeedsFound, NullableString(domain.LastError), domain.TLD) return err } // domainExists checks if a domain already exists in the database func (c *Crawler) domainExists(host string) bool { var exists bool err := c.db.QueryRow("SELECT EXISTS(SELECT 1 FROM domains WHERE host = $1)", normalizeHost(host)).Scan(&exists) return err == nil && exists } // getDomain retrieves a domain from PostgreSQL func (c *Crawler) getDomain(host string) (*Domain, error) { domain := &Domain{} var lastCheckedAt, lastCrawledAt *time.Time var lastError *string err := c.db.QueryRow(` SELECT host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld FROM domains WHERE host = $1 `, normalizeHost(host)).Scan( &domain.Host, &domain.Status, &domain.DiscoveredAt, &lastCheckedAt, &lastCrawledAt, &domain.FeedsFound, &lastError, &domain.TLD, ) if err == pgx.ErrNoRows { return nil, nil } if err != nil { return nil, err } domain.LastCheckedAt = TimeValue(lastCheckedAt) domain.LastCrawledAt = TimeValue(lastCrawledAt) domain.LastError = StringValue(lastError) return domain, nil } // GetDomainsToCheck returns domains ready for checking (status='pass', never checked) func (c *Crawler) GetDomainsToCheck(limit int) ([]*Domain, error) { rows, err := c.db.Query(` SELECT host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld FROM domains WHERE status = 'pass' AND last_checked_at IS NULL ORDER BY discovered_at ASC LIMIT $1 `, limit) if err != nil { return nil, err } defer rows.Close() return c.scanDomains(rows) } // GetDomainsToCrawl returns domains ready for crawling (status='pass', checked but not crawled) func (c *Crawler) GetDomainsToCrawl(limit int) ([]*Domain, error) { rows, err := c.db.Query(` SELECT host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld FROM domains WHERE status = 'pass' AND last_checked_at IS NOT NULL AND last_crawled_at IS NULL ORDER BY discovered_at ASC LIMIT $1 `, limit) if err != nil { return nil, err } defer rows.Close() return c.scanDomains(rows) } // scanDomains is a helper to scan multiple domain rows func (c *Crawler) scanDomains(rows pgx.Rows) ([]*Domain, error) { var domains []*Domain for rows.Next() { domain := &Domain{} var lastCheckedAt, lastCrawledAt *time.Time var lastError *string if err := rows.Scan( &domain.Host, &domain.Status, &domain.DiscoveredAt, &lastCheckedAt, &lastCrawledAt, &domain.FeedsFound, &lastError, &domain.TLD, ); err != nil { continue } domain.LastCheckedAt = TimeValue(lastCheckedAt) domain.LastCrawledAt = TimeValue(lastCrawledAt) domain.LastError = StringValue(lastError) domains = append(domains, domain) } return domains, rows.Err() } // markDomainChecked updates a domain after the check (HEAD request) stage func (c *Crawler) markDomainChecked(host string, lastError string) error { now := time.Now() if lastError != "" { _, err := c.db.Exec(` UPDATE domains SET status = 'fail', last_checked_at = $1, last_error = $2 WHERE host = $3 `, now, lastError, normalizeHost(host)) return err } _, err := c.db.Exec(` UPDATE domains SET last_checked_at = $1, last_error = NULL WHERE host = $2 `, now, normalizeHost(host)) return err } // markDomainCrawled updates a domain after the crawl stage func (c *Crawler) markDomainCrawled(host string, feedsFound int, lastError string) error { now := time.Now() if lastError != "" { _, err := c.db.Exec(` UPDATE domains SET status = 'fail', last_crawled_at = $1, feeds_found = $2, last_error = $3 WHERE host = $4 `, now, feedsFound, lastError, normalizeHost(host)) return err } _, err := c.db.Exec(` UPDATE domains SET last_crawled_at = $1, feeds_found = $2, last_error = NULL WHERE host = $3 `, now, feedsFound, normalizeHost(host)) return err } // GetDomainCount returns the total number of domains and counts by status func (c *Crawler) GetDomainCount() (total int, hold int, err error) { err = c.db.QueryRow("SELECT COUNT(*) FROM domains").Scan(&total) if err != nil { return 0, 0, err } err = c.db.QueryRow("SELECT COUNT(*) FROM domains WHERE status = 'hold'").Scan(&hold) return total, hold, err } // ImportTestDomains adds a list of specific domains for testing func (c *Crawler) ImportTestDomains(domains []string) { now := time.Now() for _, host := range domains { _, err := c.db.Exec(` INSERT INTO domains (host, status, discovered_at, tld) VALUES ($1, 'pass', $2, $3) ON CONFLICT(host) DO NOTHING `, host, now, getTLD(host)) if err != nil { fmt.Printf("Error adding test domain %s: %v\n", host, err) } else { fmt.Printf("Added test domain: %s\n", host) } } } // ImportDomainsFromFile reads a vertices file and stores new domains as "pass" func (c *Crawler) ImportDomainsFromFile(filename string, limit int) (imported int, skipped int, err error) { file, err := os.Open(filename) if err != nil { return 0, 0, fmt.Errorf("failed to open file: %v", err) } defer file.Close() return c.parseAndStoreDomains(file, limit) } // ImportDomainsInBackground starts domain import in a background goroutine func (c *Crawler) ImportDomainsInBackground(filename string) { go func() { file, err := os.Open(filename) if err != nil { fmt.Printf("Failed to open vertices file: %v\n", err) return } defer file.Close() var bodyReader io.Reader bufReader := bufio.NewReader(file) peekBytes, err := bufReader.Peek(2) if err != nil && err != io.EOF { fmt.Printf("Failed to peek at file: %v\n", err) return } if len(peekBytes) >= 2 && peekBytes[0] == 0x1f && peekBytes[1] == 0x8b { gzReader, err := gzip.NewReader(bufReader) if err != nil { fmt.Printf("Failed to create gzip reader: %v\n", err) return } defer gzReader.Close() bodyReader = gzReader } else { bodyReader = bufReader } scanner := bufio.NewScanner(bodyReader) buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, 1024*1024) const batchSize = 100 now := time.Now() totalImported := 0 batchCount := 0 type domainEntry struct { host string tld string } for { // Read and canonicalize batch var domains []domainEntry for len(domains) < batchSize && scanner.Scan() { line := scanner.Text() parts := strings.Split(line, "\t") if len(parts) >= 2 { reverseHostName := strings.TrimSpace(parts[1]) if reverseHostName != "" { host := normalizeHost(reverseHost(reverseHostName)) tld := getTLD(host) domains = append(domains, domainEntry{host: host, tld: tld}) } } } if len(domains) == 0 { break } // Use COPY for bulk insert (much faster than individual INSERTs) ctx := context.Background() conn, err := c.db.Acquire(ctx) if err != nil { fmt.Printf("Failed to acquire connection: %v\n", err) break } // Build rows for copy, applying auto-skip for spam patterns rows := make([][]interface{}, len(domains)) for i, d := range domains { status := "pass" if shouldAutoSkipDomain(d.host) { status = "skip" } rows[i] = []interface{}{d.host, status, now, d.tld} } // Use CopyFrom for bulk insert imported, err := conn.CopyFrom( ctx, pgx.Identifier{"domains"}, []string{"host", "status", "discovered_at", "tld"}, pgx.CopyFromRows(rows), ) conn.Release() if err != nil { // Fall back to individual inserts with ON CONFLICT for _, d := range domains { status := "pass" if shouldAutoSkipDomain(d.host) { status = "skip" } c.db.Exec(` INSERT INTO domains (host, status, discovered_at, tld) VALUES ($1, $2, $3, $4) ON CONFLICT(host) DO NOTHING `, d.host, status, now, d.tld) } imported = int64(len(domains)) } batchCount++ totalImported += int(imported) atomic.AddInt32(&c.domainsImported, int32(imported)) fmt.Printf("Import batch %d: %d domains (total: %d)\n", batchCount, imported, totalImported) // Wait 1 second before the next batch time.Sleep(1 * time.Second) } if err := scanner.Err(); err != nil { fmt.Printf("Error reading vertices file: %v\n", err) } fmt.Printf("Background import complete: %d domains imported\n", totalImported) }() } func (c *Crawler) parseAndStoreDomains(reader io.Reader, limit int) (imported int, skipped int, err error) { var bodyReader io.Reader bufReader := bufio.NewReader(reader) peekBytes, err := bufReader.Peek(2) if err != nil && err != io.EOF { return 0, 0, fmt.Errorf("failed to peek at file: %v", err) } if len(peekBytes) >= 2 && peekBytes[0] == 0x1f && peekBytes[1] == 0x8b { gzReader, err := gzip.NewReader(bufReader) if err != nil { return 0, 0, fmt.Errorf("failed to create gzip reader: %v", err) } defer gzReader.Close() bodyReader = gzReader } else { bodyReader = bufReader } scanner := bufio.NewScanner(bodyReader) buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, 1024*1024) now := time.Now() count := 0 const batchSize = 100 type domainEntry struct { host string tld string } for { // Read and canonicalize batch var domains []domainEntry for len(domains) < batchSize && scanner.Scan() { if limit > 0 && count >= limit { break } line := scanner.Text() parts := strings.Split(line, "\t") if len(parts) >= 2 { reverseHostName := strings.TrimSpace(parts[1]) if reverseHostName != "" { host := normalizeHost(reverseHost(reverseHostName)) domains = append(domains, domainEntry{host: host, tld: getTLD(host)}) count++ } } } if len(domains) == 0 { break } // Insert with ON CONFLICT, applying auto-skip for spam patterns for _, d := range domains { status := "pass" if shouldAutoSkipDomain(d.host) { status = "skip" } result, err := c.db.Exec(` INSERT INTO domains (host, status, discovered_at, tld) VALUES ($1, $2, $3, $4) ON CONFLICT(host) DO NOTHING `, d.host, status, now, d.tld) if err != nil { skipped++ } else if result > 0 { imported++ } else { skipped++ } } if limit > 0 && count >= limit { break } } if err := scanner.Err(); err != nil { return imported, skipped, fmt.Errorf("error reading file: %v", err) } return imported, skipped, nil }