Restore working codebase with all methods

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
primal
2026-02-01 19:08:53 -05:00
parent 211812363a
commit 8a9001c02c
18 changed files with 2357 additions and 331 deletions
+126 -71
View File
@@ -14,19 +14,38 @@ import (
"github.com/jackc/pgx/v5"
)
// Domain represents a host to be crawled for feeds
// Domain represents a host to process for feeds
// Status: hold (pending review), pass (approved), skip (not processing)
// CrawledAt: zero time = needs domain_check, +1 sec = needs feed_crawl, real time = done
type Domain struct {
Host string `json:"host"`
Status string `json:"status"`
DiscoveredAt time.Time `json:"discovered_at"`
LastCheckedAt time.Time `json:"last_checked_at,omitempty"`
LastCrawledAt time.Time `json:"last_crawled_at,omitempty"`
FeedsFound int `json:"feeds_found,omitempty"`
LastError string `json:"last_error,omitempty"`
TLD string `json:"tld,omitempty"`
Host string `json:"host"`
Status string `json:"status"`
CrawledAt time.Time `json:"crawled_at"`
FeedsFound int `json:"feeds_found,omitempty"`
LastError string `json:"last_error,omitempty"`
TLD string `json:"tld,omitempty"`
MissCount int `json:"miss_count,omitempty"`
}
// MissCountThreshold is the number of consecutive errors before setting status to hold
const MissCountThreshold = 100
// ErrorRetryDelay is how long to wait before retrying a domain with errors (1 hour minimum)
// At 100 seconds actual rate due to queue, 100 misses = ~2.8 hours
// At 1 hour minimum delay, 100 misses = ~4+ days in practice
var ErrorRetryDelay = 1 * time.Hour
// FullHost returns the complete hostname (host + tld)
func (d *Domain) FullHost() string {
return fullHost(d.Host, d.TLD)
}
// Sentinel values for domain processing state
var (
DomainStateUnchecked = time.Time{} // 0001-01-01 00:00:00 - needs domain_check
DomainStateChecked = time.Time{}.Add(time.Second) // 0001-01-01 00:00:01 - needs feed_crawl
)
// shouldAutoSkipDomain checks if a domain should be auto-skipped based on patterns
func shouldAutoSkipDomain(host string) bool {
// Never skip our own domain
@@ -51,62 +70,63 @@ func shouldAutoSkipDomain(host string) bool {
// saveDomain stores a domain in PostgreSQL
func (c *Crawler) saveDomain(domain *Domain) error {
// Auto-skip domains matching spam patterns
fh := domain.FullHost()
status := domain.Status
if shouldAutoSkipDomain(domain.Host) {
if shouldAutoSkipDomain(fh) {
status = "skip"
}
_, err := c.db.Exec(`
INSERT INTO domains (host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT(host) DO UPDATE SET
INSERT INTO domains (host, status, crawled_at, feeds_found, last_error, tld)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT(host, tld) DO UPDATE SET
status = EXCLUDED.status,
last_checked_at = EXCLUDED.last_checked_at,
last_crawled_at = EXCLUDED.last_crawled_at,
crawled_at = EXCLUDED.crawled_at,
feeds_found = EXCLUDED.feeds_found,
last_error = EXCLUDED.last_error,
tld = EXCLUDED.tld
`, domain.Host, status, domain.DiscoveredAt, NullableTime(domain.LastCheckedAt),
NullableTime(domain.LastCrawledAt), domain.FeedsFound, NullableString(domain.LastError), domain.TLD)
last_error = EXCLUDED.last_error
`, stripTLD(fh), status, domain.CrawledAt,
domain.FeedsFound, NullableString(domain.LastError), domain.TLD)
return err
}
// saveDomainTx stores a domain using a transaction
func (c *Crawler) saveDomainTx(tx pgx.Tx, domain *Domain) error {
// Auto-skip domains matching spam patterns
fh := domain.FullHost()
status := domain.Status
if shouldAutoSkipDomain(domain.Host) {
if shouldAutoSkipDomain(fh) {
status = "skip"
}
_, err := tx.Exec(context.Background(), `
INSERT INTO domains (host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT(host) DO NOTHING
`, domain.Host, status, domain.DiscoveredAt, NullableTime(domain.LastCheckedAt),
NullableTime(domain.LastCrawledAt), domain.FeedsFound, NullableString(domain.LastError), domain.TLD)
INSERT INTO domains (host, status, crawled_at, feeds_found, last_error, tld)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT(host, tld) DO NOTHING
`, stripTLD(fh), status, domain.CrawledAt,
domain.FeedsFound, NullableString(domain.LastError), domain.TLD)
return err
}
// domainExists checks if a domain already exists in the database
func (c *Crawler) domainExists(host string) bool {
host = normalizeHost(host)
var exists bool
err := c.db.QueryRow("SELECT EXISTS(SELECT 1 FROM domains WHERE host = $1)", normalizeHost(host)).Scan(&exists)
err := c.db.QueryRow("SELECT EXISTS(SELECT 1 FROM domains WHERE host = $1 AND tld = $2)", stripTLD(host), getTLD(host)).Scan(&exists)
return err == nil && exists
}
// getDomain retrieves a domain from PostgreSQL
func (c *Crawler) getDomain(host string) (*Domain, error) {
host = normalizeHost(host)
domain := &Domain{}
var lastCheckedAt, lastCrawledAt *time.Time
var lastError *string
err := c.db.QueryRow(`
SELECT host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld
FROM domains WHERE host = $1
`, normalizeHost(host)).Scan(
&domain.Host, &domain.Status, &domain.DiscoveredAt, &lastCheckedAt, &lastCrawledAt,
&domain.FeedsFound, &lastError, &domain.TLD,
SELECT host, tld, status, crawled_at, feeds_found, last_error
FROM domains WHERE host = $1 AND tld = $2
`, stripTLD(host), getTLD(host)).Scan(
&domain.Host, &domain.TLD, &domain.Status, &domain.CrawledAt,
&domain.FeedsFound, &lastError,
)
if err == pgx.ErrNoRows {
@@ -116,21 +136,26 @@ func (c *Crawler) getDomain(host string) (*Domain, error) {
return nil, err
}
domain.LastCheckedAt = TimeValue(lastCheckedAt)
domain.LastCrawledAt = TimeValue(lastCrawledAt)
domain.LastError = StringValue(lastError)
return domain, nil
}
// GetDomainsToCrawl returns domains ready for crawling (status='pass', not yet crawled)
func (c *Crawler) GetDomainsToCrawl(limit int) ([]*Domain, error) {
// GetDomainsToProcess returns domains needing processing (domain_check or feed_crawl)
// crawled_at = zero time means needs domain_check, +1 sec means needs feed_crawl
// Domains with errors are retried when crawled_at < now (scheduled by ErrorRetryDelay)
func (c *Crawler) GetDomainsToProcess(limit int) ([]*Domain, error) {
now := time.Now()
rows, err := c.db.Query(`
SELECT host, status, discovered_at, last_checked_at, last_crawled_at, feeds_found, last_error, tld
FROM domains WHERE status = 'pass' AND last_crawled_at IS NULL
ORDER BY discovered_at DESC
LIMIT $1
`, limit)
SELECT host, status, crawled_at, feeds_found, last_error, tld
FROM domains
WHERE status = 'pass' AND (
(crawled_at < '0001-01-02' AND last_error IS NULL) -- new domains
OR (crawled_at < $1 AND last_error IS NOT NULL) -- retry errors after delay
)
ORDER BY crawled_at ASC
LIMIT $2
`, now, limit)
if err != nil {
return nil, err
}
@@ -139,23 +164,45 @@ func (c *Crawler) GetDomainsToCrawl(limit int) ([]*Domain, error) {
return c.scanDomains(rows)
}
// markDomainChecked updates a domain after domain_check (sets to +1 sec for feed_crawl)
// host parameter should be the stripped host (without TLD)
func (c *Crawler) markDomainChecked(host, tld, lastError string) error {
if lastError != "" {
// Increment miss_count, set to 'hold' only at threshold
// Schedule retry after ErrorRetryDelay
retryAt := time.Now().Add(ErrorRetryDelay)
_, err := c.db.Exec(`
UPDATE domains SET
crawled_at = $1,
last_error = $2,
miss_count = miss_count + 1,
status = CASE WHEN miss_count + 1 >= $3 THEN 'hold' ELSE status END
WHERE host = $4 AND tld = $5
`, retryAt, lastError, MissCountThreshold, host, tld)
return err
}
// Success - reset miss_count
_, err := c.db.Exec(`
UPDATE domains SET crawled_at = $1, last_error = NULL, miss_count = 0
WHERE host = $2 AND tld = $3
`, DomainStateChecked, host, tld)
return err
}
// scanDomains is a helper to scan multiple domain rows
func (c *Crawler) scanDomains(rows pgx.Rows) ([]*Domain, error) {
var domains []*Domain
for rows.Next() {
domain := &Domain{}
var lastCheckedAt, lastCrawledAt *time.Time
var lastError *string
if err := rows.Scan(
&domain.Host, &domain.Status, &domain.DiscoveredAt, &lastCheckedAt, &lastCrawledAt,
&domain.Host, &domain.Status, &domain.CrawledAt,
&domain.FeedsFound, &lastError, &domain.TLD,
); err != nil {
continue
}
domain.LastCheckedAt = TimeValue(lastCheckedAt)
domain.LastCrawledAt = TimeValue(lastCrawledAt)
domain.LastError = StringValue(lastError)
domains = append(domains, domain)
@@ -164,20 +211,30 @@ func (c *Crawler) scanDomains(rows pgx.Rows) ([]*Domain, error) {
return domains, rows.Err()
}
// markDomainCrawled updates a domain after the crawl stage
func (c *Crawler) markDomainCrawled(host string, feedsFound int, lastError string) error {
now := time.Now()
// markDomainCrawled updates a domain after feed_crawl (sets to NOW())
// host parameter should be the stripped host (without TLD)
func (c *Crawler) markDomainCrawled(host, tld string, feedsFound int, lastError string) error {
if lastError != "" {
// Increment miss_count, set to 'hold' only at threshold
// Schedule retry after ErrorRetryDelay
retryAt := time.Now().Add(ErrorRetryDelay)
_, err := c.db.Exec(`
UPDATE domains SET last_crawled_at = $1, feeds_found = $2, last_error = $3
WHERE host = $4
`, now, feedsFound, lastError, normalizeHost(host))
UPDATE domains SET
crawled_at = $1,
feeds_found = $2,
last_error = $3,
miss_count = miss_count + 1,
status = CASE WHEN miss_count + 1 >= $4 THEN 'hold' ELSE status END
WHERE host = $5 AND tld = $6
`, retryAt, feedsFound, lastError, MissCountThreshold, host, tld)
return err
}
// Success - reset miss_count
now := time.Now()
_, err := c.db.Exec(`
UPDATE domains SET last_crawled_at = $1, feeds_found = $2, last_error = NULL
WHERE host = $3
`, now, feedsFound, normalizeHost(host))
UPDATE domains SET crawled_at = $1, feeds_found = $2, last_error = NULL, miss_count = 0
WHERE host = $3 AND tld = $4
`, now, feedsFound, host, tld)
return err
}
@@ -193,13 +250,13 @@ func (c *Crawler) GetDomainCount() (total int, hold int, err error) {
// ImportTestDomains adds a list of specific domains for testing
func (c *Crawler) ImportTestDomains(domains []string) {
now := time.Now()
for _, host := range domains {
host = normalizeHost(host)
_, err := c.db.Exec(`
INSERT INTO domains (host, status, discovered_at, tld)
VALUES ($1, 'pass', $2, $3)
ON CONFLICT(host) DO NOTHING
`, host, now, getTLD(host))
INSERT INTO domains (host, status, tld)
VALUES ($1, 'pass', $2)
ON CONFLICT(host, tld) DO NOTHING
`, stripTLD(host), getTLD(host))
if err != nil {
fmt.Printf("Error adding test domain %s: %v\n", host, err)
} else {
@@ -255,7 +312,6 @@ func (c *Crawler) ImportDomainsInBackground(filename string) {
scanner.Buffer(buf, 1024*1024)
const batchSize = 100
now := time.Now()
totalImported := 0
batchCount := 0
@@ -299,14 +355,14 @@ func (c *Crawler) ImportDomainsInBackground(filename string) {
if shouldAutoSkipDomain(d.host) {
status = "skip"
}
rows[i] = []interface{}{d.host, status, now, d.tld}
rows[i] = []interface{}{stripTLD(d.host), status, d.tld}
}
// Use CopyFrom for bulk insert
imported, err := conn.CopyFrom(
ctx,
pgx.Identifier{"domains"},
[]string{"host", "status", "discovered_at", "tld"},
[]string{"host", "status", "tld"},
pgx.CopyFromRows(rows),
)
conn.Release()
@@ -319,10 +375,10 @@ func (c *Crawler) ImportDomainsInBackground(filename string) {
status = "skip"
}
c.db.Exec(`
INSERT INTO domains (host, status, discovered_at, tld)
VALUES ($1, $2, $3, $4)
ON CONFLICT(host) DO NOTHING
`, d.host, status, now, d.tld)
INSERT INTO domains (host, status, tld)
VALUES ($1, $2, $3)
ON CONFLICT(host, tld) DO NOTHING
`, stripTLD(d.host), status, d.tld)
}
imported = int64(len(domains))
}
@@ -369,7 +425,6 @@ func (c *Crawler) parseAndStoreDomains(reader io.Reader, limit int) (imported in
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
now := time.Now()
count := 0
const batchSize = 100
@@ -408,10 +463,10 @@ func (c *Crawler) parseAndStoreDomains(reader io.Reader, limit int) (imported in
status = "skip"
}
result, err := c.db.Exec(`
INSERT INTO domains (host, status, discovered_at, tld)
VALUES ($1, $2, $3, $4)
ON CONFLICT(host) DO NOTHING
`, d.host, status, now, d.tld)
INSERT INTO domains (host, status, tld)
VALUES ($1, $2, $3)
ON CONFLICT(host, tld) DO NOTHING
`, stripTLD(d.host), status, d.tld)
if err != nil {
skipped++
} else if result > 0 {