diff --git a/CLAUDE.md b/CLAUDE.md index eaf5ae7..18f6a74 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -11,20 +11,47 @@ go fmt ./... # Format go vet ./... # Static analysis ``` +### Database Setup + +Requires PostgreSQL. Start the database first: + +```bash +cd ../postgres && docker compose up -d +``` + +### Environment Variables + +Set via environment or create a `.env` file: + +```bash +# Database connection (individual vars) +DB_HOST=atproto-postgres # Default: atproto-postgres +DB_PORT=5432 # Default: 5432 +DB_USER=news_1440 # Default: news_1440 +DB_PASSWORD= # Or use DB_PASSWORD_FILE +DB_NAME=news_1440 # Default: news_1440 + +# Or use a connection string +DATABASE_URL=postgres://news_1440:password@atproto-postgres:5432/news_1440?sslmode=disable +``` + +For Docker, use `DB_PASSWORD_FILE=/run/secrets/db_password` with Docker secrets. + Requires `vertices.txt.gz` (Common Crawl domain list) in the working directory. ## Architecture -Multi-file Go application that crawls websites for RSS/Atom feeds, stores them in SQLite, and provides a web dashboard. +Multi-file Go application that crawls websites for RSS/Atom feeds, stores them in PostgreSQL, and provides a web dashboard. ### Concurrent Loops (main.go) -The application runs five independent goroutine loops: +The application runs six independent goroutine loops: - **Import loop** - Reads `vertices.txt.gz` and inserts domains into DB in 10k batches - **Crawl loop** - Worker pool processes unchecked domains, discovers feeds - **Check loop** - Worker pool re-checks known feeds for updates (conditional HTTP) - **Stats loop** - Updates cached dashboard statistics every minute - **Cleanup loop** - Removes items older than 12 months (weekly) +- **Publish loop** - Autopublishes items from approved feeds to AT Protocol PDS ### File Structure @@ -36,16 +63,19 @@ The application runs five independent goroutine loops: | `parser.go` | RSS/Atom XML parsing, date parsing, next-crawl calculation | | `html.go` | HTML parsing: feed link extraction, anchor feed detection | | `util.go` | URL normalization, host utilities, TLD extraction | -| `db.go` | SQLite schema (domains, feeds, items tables with FTS5) | +| `db.go` | PostgreSQL schema (domains, feeds, items tables with tsvector FTS) | | `dashboard.go` | HTTP server, JSON APIs, HTML template | +| `publisher.go` | AT Protocol PDS integration for posting items | ### Database Schema -SQLite with WAL mode at `feeds/feeds.db`: +PostgreSQL with pgx driver, using connection pooling: - **domains** - Hosts to crawl (status: unchecked/checked/error) - **feeds** - Discovered RSS/Atom feeds with metadata and cache headers -- **items** - Individual feed entries (guid + feedUrl unique) -- **feeds_fts / items_fts** - FTS5 virtual tables for search +- **items** - Individual feed entries (guid + feed_url unique) +- **search_vector** - GENERATED tsvector columns for full-text search (GIN indexed) + +Column naming: snake_case (e.g., `source_host`, `pub_date`, `item_count`) ### Crawl Logic @@ -53,13 +83,18 @@ SQLite with WAL mode at `feeds/feeds.db`: 2. Try HTTPS, fall back to HTTP 3. Recursive crawl up to MaxDepth=10, MaxPagesPerHost=10 4. Extract `` and anchor hrefs containing rss/atom/feed -5. Parse discovered feeds for metadata, save with nextCrawlAt +5. Parse discovered feeds for metadata, save with next_crawl_at ### Feed Checking Uses conditional HTTP (ETag, If-Modified-Since). Adaptive backoff: base 100s + 100s per consecutive no-change. Respects RSS `` and Syndication namespace hints. -## AT Protocol Integration (Planned) +### Publishing + +Feeds with `publish_status = 'pass'` have their items automatically posted to AT Protocol. +Status values: `held` (default), `pass` (approved), `deny` (rejected). + +## AT Protocol Integration Domain: 1440.news @@ -68,9 +103,8 @@ User structure: - `{domain}.1440.news` - Catch-all feed per source (e.g., `wsj.com.1440.news`) - `{category}.{domain}.1440.news` - Category-specific feeds (future) -Phases: -1. Local PDS setup -2. Account management -3. Auto-create domain users -4. Post articles to accounts -5. Category detection +PDS configuration in `pds.env`: +``` +PDS_HOST=https://pds.1440.news +PDS_ADMIN_PASSWORD= +``` diff --git a/crawler.go b/crawler.go index bd43d6e..a6a4d23 100644 --- a/crawler.go +++ b/crawler.go @@ -1,10 +1,10 @@ package main import ( - "database/sql" "fmt" "io" "net/http" + "os" "runtime" "strings" "sync" @@ -25,17 +25,17 @@ type Crawler struct { hostsProcessed int32 feedsChecked int32 startTime time.Time - db *sql.DB + db *DB displayedCrawlRate int displayedCheckRate int domainsImported int32 - cachedStats *DashboardStats - cachedAllDomains []DomainStat - statsMu sync.RWMutex + cachedStats *DashboardStats + cachedAllDomains []DomainStat + statsMu sync.RWMutex } -func NewCrawler(dbPath string) (*Crawler, error) { - db, err := OpenDatabase(dbPath) +func NewCrawler(connString string) (*Crawler, error) { + db, err := OpenDatabase(connString) if err != nil { return nil, fmt.Errorf("failed to open database: %v", err) } @@ -61,12 +61,6 @@ func NewCrawler(dbPath string) (*Crawler, error) { func (c *Crawler) Close() error { if c.db != nil { - // Checkpoint WAL to merge it back into main database before closing - // This prevents corruption if the container is stopped mid-write - fmt.Println("Checkpointing WAL...") - if _, err := c.db.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil { - fmt.Printf("WAL checkpoint warning: %v\n", err) - } fmt.Println("Closing database...") return c.db.Close() } @@ -95,53 +89,247 @@ func (c *Crawler) StartCleanupLoop() { } // StartMaintenanceLoop performs periodic database maintenance -// - WAL checkpoint every 5 minutes to prevent WAL bloat and reduce corruption risk -// - Quick integrity check every hour to detect issues early -// - Hot backup every 24 hours for recovery func (c *Crawler) StartMaintenanceLoop() { - checkpointTicker := time.NewTicker(5 * time.Minute) - integrityTicker := time.NewTicker(1 * time.Hour) - backupTicker := time.NewTicker(24 * time.Hour) - defer checkpointTicker.Stop() - defer integrityTicker.Stop() - defer backupTicker.Stop() + vacuumTicker := time.NewTicker(24 * time.Hour) + analyzeTicker := time.NewTicker(1 * time.Hour) + defer vacuumTicker.Stop() + defer analyzeTicker.Stop() for { select { - case <-checkpointTicker.C: - // Passive checkpoint - doesn't block writers - if _, err := c.db.Exec("PRAGMA wal_checkpoint(PASSIVE)"); err != nil { - fmt.Printf("WAL checkpoint error: %v\n", err) + case <-analyzeTicker.C: + // Update statistics for query planner + if _, err := c.db.Exec("ANALYZE"); err != nil { + fmt.Printf("ANALYZE error: %v\n", err) } - case <-integrityTicker.C: - // Quick check is faster than full integrity_check - var result string - if err := c.db.QueryRow("PRAGMA quick_check").Scan(&result); err != nil { - fmt.Printf("Integrity check error: %v\n", err) - } else if result != "ok" { - fmt.Printf("WARNING: Database integrity issue detected: %s\n", result) + case <-vacuumTicker.C: + // Reclaim dead tuple space (VACUUM is lighter than VACUUM FULL) + fmt.Println("Running VACUUM...") + if _, err := c.db.Exec("VACUUM"); err != nil { + fmt.Printf("VACUUM error: %v\n", err) + } else { + fmt.Println("VACUUM complete") } - - case <-backupTicker.C: - c.createBackup() } } } -// createBackup creates a hot backup of the database using SQLite's backup API -func (c *Crawler) createBackup() { - backupPath := "feeds/feeds.db.backup" - fmt.Println("Creating database backup...") +// StartPublishLoop automatically publishes unpublished items for approved feeds +// Grabs up to 50 items sorted by discovered_at, publishes one per second, then reloops +func (c *Crawler) StartPublishLoop() { + // Load PDS credentials from environment or pds.env file + pdsHost := os.Getenv("PDS_HOST") + pdsAdminPassword := os.Getenv("PDS_ADMIN_PASSWORD") - // Use SQLite's online backup via VACUUM INTO (available in SQLite 3.27+) - // This creates a consistent snapshot without blocking writers - if _, err := c.db.Exec("VACUUM INTO ?", backupPath); err != nil { - fmt.Printf("Backup error: %v\n", err) + if pdsHost == "" || pdsAdminPassword == "" { + if data, err := os.ReadFile("pds.env"); err == nil { + for _, line := range strings.Split(string(data), "\n") { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "#") || line == "" { + continue + } + parts := strings.SplitN(line, "=", 2) + if len(parts) == 2 { + key := strings.TrimSpace(parts[0]) + value := strings.TrimSpace(parts[1]) + switch key { + case "PDS_HOST": + pdsHost = value + case "PDS_ADMIN_PASSWORD": + pdsAdminPassword = value + } + } + } + } + } + + if pdsHost == "" || pdsAdminPassword == "" { + fmt.Println("Publish loop: PDS credentials not configured, skipping") return } - fmt.Printf("Backup created: %s\n", backupPath) + fmt.Printf("Publish loop: starting with PDS %s\n", pdsHost) + feedPassword := "feed1440!" + + // Cache sessions per account + sessions := make(map[string]*PDSSession) + publisher := NewPublisher(pdsHost) + + for { + // Get up to 50 unpublished items from approved feeds, sorted by discovered_at ASC + items, err := c.GetAllUnpublishedItems(50) + if err != nil { + fmt.Printf("Publish loop error: %v\n", err) + time.Sleep(1 * time.Second) + continue + } + + if len(items) == 0 { + time.Sleep(1 * time.Second) + continue + } + + // Publish one item per second + for _, item := range items { + // Get or create session for this feed's account + account := c.getAccountForFeed(item.FeedURL) + if account == "" { + time.Sleep(1 * time.Second) + continue + } + + session, ok := sessions[account] + if !ok { + // Try to log in + session, err = publisher.CreateSession(account, feedPassword) + if err != nil { + // Account might not exist - try to create it + inviteCode, err := publisher.CreateInviteCode(pdsAdminPassword, 1) + if err != nil { + fmt.Printf("Publish: failed to create invite for %s: %v\n", account, err) + time.Sleep(1 * time.Second) + continue + } + + email := account + "@1440.news" + session, err = publisher.CreateAccount(account, email, feedPassword, inviteCode) + if err != nil { + fmt.Printf("Publish: failed to create account %s: %v\n", account, err) + time.Sleep(1 * time.Second) + continue + } + fmt.Printf("Publish: created account %s\n", account) + c.db.Exec("UPDATE feeds SET publish_account = $1 WHERE url = $2", account, item.FeedURL) + + // Set up profile for new account + feedInfo := c.getFeedInfo(item.FeedURL) + if feedInfo != nil { + displayName := feedInfo.Title + if displayName == "" { + displayName = account + } + description := feedInfo.Description + if description == "" { + description = "News feed via 1440.news" + } + // Truncate if needed + if len(displayName) > 64 { + displayName = displayName[:61] + "..." + } + if len(description) > 256 { + description = description[:253] + "..." + } + if err := publisher.UpdateProfile(session, displayName, description, nil); err != nil { + fmt.Printf("Publish: failed to set profile for %s: %v\n", account, err) + } else { + fmt.Printf("Publish: set profile for %s\n", account) + } + } + } + sessions[account] = session + } + + // Publish the item + uri, err := publisher.PublishItem(session, &item) + if err != nil { + fmt.Printf("Publish: failed item %d: %v\n", item.ID, err) + // Clear session cache on auth errors + if strings.Contains(err.Error(), "401") || strings.Contains(err.Error(), "auth") { + delete(sessions, account) + } + } else { + c.MarkItemPublished(item.ID, uri) + fmt.Printf("Publish: %s -> %s\n", item.Title[:min(40, len(item.Title))], account) + } + + time.Sleep(1 * time.Second) + } + + time.Sleep(1 * time.Second) + } +} + +// getAccountForFeed returns the publish account for a feed URL +func (c *Crawler) getAccountForFeed(feedURL string) string { + var account *string + err := c.db.QueryRow(` + SELECT publish_account FROM feeds + WHERE url = $1 AND publish_status = 'pass' AND status = 'active' + `, feedURL).Scan(&account) + if err != nil || account == nil || *account == "" { + // Derive handle from feed URL + return DeriveHandleFromFeed(feedURL) + } + return *account +} + +// FeedInfo holds basic feed metadata for profile setup +type FeedInfo struct { + Title string + Description string + SiteURL string +} + +// getFeedInfo returns feed metadata for profile setup +func (c *Crawler) getFeedInfo(feedURL string) *FeedInfo { + var title, description, siteURL *string + err := c.db.QueryRow(` + SELECT title, description, site_url FROM feeds WHERE url = $1 + `, feedURL).Scan(&title, &description, &siteURL) + if err != nil { + return nil + } + return &FeedInfo{ + Title: StringValue(title), + Description: StringValue(description), + SiteURL: StringValue(siteURL), + } +} + +// GetAllUnpublishedItems returns unpublished items from all approved feeds +func (c *Crawler) GetAllUnpublishedItems(limit int) ([]Item, error) { + rows, err := c.db.Query(` + SELECT i.id, i.feed_url, i.guid, i.title, i.link, i.description, i.content, + i.author, i.pub_date, i.discovered_at + FROM items i + JOIN feeds f ON i.feed_url = f.url + WHERE f.publish_status = 'pass' + AND f.status = 'active' + AND i.published_at IS NULL + ORDER BY i.discovered_at ASC + LIMIT $1 + `, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + var items []Item + for rows.Next() { + var item Item + var guid, title, link, description, content, author *string + var pubDate, discoveredAt *time.Time + + err := rows.Scan(&item.ID, &item.FeedURL, &guid, &title, &link, &description, + &content, &author, &pubDate, &discoveredAt) + if err != nil { + continue + } + + item.GUID = StringValue(guid) + item.Title = StringValue(title) + item.Link = StringValue(link) + item.Description = StringValue(description) + item.Content = StringValue(content) + item.Author = StringValue(author) + item.PubDate = TimeValue(pubDate) + item.DiscoveredAt = TimeValue(discoveredAt) + + items = append(items, item) + } + + return items, nil } // StartCrawlLoop runs the domain crawling loop independently diff --git a/dashboard.go b/dashboard.go index 83afdea..d008e04 100644 --- a/dashboard.go +++ b/dashboard.go @@ -1,12 +1,13 @@ package main import ( - "database/sql" "encoding/json" "fmt" "html/template" "net/http" "time" + + "github.com/jackc/pgx/v5" ) // DashboardStats holds all statistics for the dashboard @@ -85,10 +86,10 @@ func (c *Crawler) UpdateStats() { } func (c *Crawler) fetchAllDomainsFromDB() []DomainStat { - rows, err := c.db.Query(` - SELECT tld, sourceHost, COUNT(*) as cnt FROM feeds - GROUP BY tld, sourceHost - ORDER BY tld, sourceHost + rows, err := c.db.Query( ` + SELECT tld, source_host, COUNT(*) as cnt FROM feeds + GROUP BY tld, source_host + ORDER BY tld, source_host `) if err != nil { fmt.Printf("fetchAllDomainsFromDB error: %v\n", err) @@ -163,14 +164,14 @@ func (c *Crawler) calculateStats() (*DashboardStats, error) { } func (c *Crawler) collectDomainStats(stats *DashboardStats) error { - // Use MAX(rowid) for fast approximate total count - err := c.db.QueryRow("SELECT COALESCE(MAX(rowid), 0) FROM domains").Scan(&stats.TotalDomains) + // Use COUNT(*) for total count + err := c.db.QueryRow( "SELECT COUNT(*) FROM domains").Scan(&stats.TotalDomains) if err != nil { return err } // Single query to get all status counts (one index scan instead of three) - rows, err := c.db.Query("SELECT status, COUNT(*) FROM domains GROUP BY status") + rows, err := c.db.Query( "SELECT status, COUNT(*) FROM domains GROUP BY status") if err != nil { return err } @@ -197,32 +198,36 @@ func (c *Crawler) collectDomainStats(stats *DashboardStats) error { } func (c *Crawler) collectFeedStats(stats *DashboardStats) error { - // Use MAX(rowid) for fast approximate total count - err := c.db.QueryRow("SELECT COALESCE(MAX(rowid), 0) FROM feeds").Scan(&stats.TotalFeeds) + // Use COUNT(*) for total count + err := c.db.QueryRow( "SELECT COUNT(*) FROM feeds").Scan(&stats.TotalFeeds) if err != nil { return err } // Single query to get all type counts (one index scan instead of three) - rows, err := c.db.Query("SELECT type, COUNT(*) FROM feeds GROUP BY type") + rows, err := c.db.Query( "SELECT type, COUNT(*) FROM feeds GROUP BY type") if err != nil { return err } defer rows.Close() for rows.Next() { - var feedType sql.NullString + var feedType *string var count int if err := rows.Scan(&feedType, &count); err != nil { continue } - switch feedType.String { - case "rss": - stats.RSSFeeds = count - case "atom": - stats.AtomFeeds = count - default: + if feedType == nil { stats.UnknownFeeds += count + } else { + switch *feedType { + case "rss": + stats.RSSFeeds = count + case "atom": + stats.AtomFeeds = count + default: + stats.UnknownFeeds += count + } } } return rows.Err() @@ -284,6 +289,9 @@ func (c *Crawler) StartDashboard(addr string) error { http.HandleFunc("/api/publishEnabled", func(w http.ResponseWriter, r *http.Request) { c.handleAPIPublishEnabled(w, r) }) + http.HandleFunc("/api/publishDenied", func(w http.ResponseWriter, r *http.Request) { + c.handleAPIPublishDenied(w, r) + }) http.HandleFunc("/api/publishCandidates", func(w http.ResponseWriter, r *http.Request) { c.handleAPIPublishCandidates(w, r) }) @@ -360,10 +368,10 @@ func (c *Crawler) handleAPIDomainFeeds(w http.ResponseWriter, r *http.Request) { return } - rows, err := c.db.Query(` - SELECT url, title, type, status, errorCount, lastError, itemCount + rows, err := c.db.Query( ` + SELECT url, title, type, status, error_count, last_error, item_count FROM feeds - WHERE sourceHost = ? + WHERE source_host = $1 ORDER BY url ASC LIMIT 1000 `, host) @@ -386,25 +394,19 @@ func (c *Crawler) handleAPIDomainFeeds(w http.ResponseWriter, r *http.Request) { var feeds []FeedInfo for rows.Next() { var f FeedInfo - var title, status, lastError sql.NullString - var errorCount, itemCount sql.NullInt64 + var title, status, lastError *string + var errorCount, itemCount *int if err := rows.Scan(&f.URL, &title, &f.Type, &status, &errorCount, &lastError, &itemCount); err != nil { continue } - if title.Valid { - f.Title = title.String + f.Title = StringValue(title) + f.Status = StringValue(status) + f.LastError = StringValue(lastError) + if errorCount != nil { + f.ErrorCount = *errorCount } - if status.Valid { - f.Status = status.String - } - if errorCount.Valid { - f.ErrorCount = int(errorCount.Int64) - } - if lastError.Valid { - f.LastError = lastError.String - } - if itemCount.Valid { - f.ItemCount = int(itemCount.Int64) + if itemCount != nil { + f.ItemCount = *itemCount } feeds = append(feeds, f) } @@ -443,27 +445,30 @@ func (c *Crawler) handleAPIFeedInfo(w http.ResponseWriter, r *http.Request) { } var f FeedDetails - var title, description, language, siteUrl, lastCrawledAt, lastBuildDate sql.NullString - var updatePeriod, status, lastError, oldestItemDate, newestItemDate sql.NullString - var ttlMinutes, updateFreq, errorCount, itemCount sql.NullInt64 - var avgPostFreqHrs sql.NullFloat64 + var title, description, language, siteUrl *string + var lastCrawledAt, lastBuildDate *time.Time + var updatePeriod, status, lastError *string + var oldestItemDate, newestItemDate *time.Time + var ttlMinutes, updateFreq, errorCount, itemCount *int + var avgPostFreqHrs *float64 + var discoveredAt time.Time - err := c.db.QueryRow(` - SELECT url, type, title, description, language, siteUrl, - discoveredAt, lastCrawledAt, lastBuildDate, - ttlMinutes, updatePeriod, updateFreq, - status, errorCount, lastError, - itemCount, avgPostFreqHrs, oldestItemDate, newestItemDate - FROM feeds WHERE url = ? + err := c.db.QueryRow( ` + SELECT url, type, title, description, language, site_url, + discovered_at, last_crawled_at, last_build_date, + ttl_minutes, update_period, update_freq, + status, error_count, last_error, + item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date + FROM feeds WHERE url = $1 `, feedURL).Scan( &f.URL, &f.Type, &title, &description, &language, &siteUrl, - &f.DiscoveredAt, &lastCrawledAt, &lastBuildDate, + &discoveredAt, &lastCrawledAt, &lastBuildDate, &ttlMinutes, &updatePeriod, &updateFreq, &status, &errorCount, &lastError, &itemCount, &avgPostFreqHrs, &oldestItemDate, &newestItemDate, ) - if err == sql.ErrNoRows { + if err == pgx.ErrNoRows { http.Error(w, "feed not found", http.StatusNotFound) return } @@ -472,53 +477,40 @@ func (c *Crawler) handleAPIFeedInfo(w http.ResponseWriter, r *http.Request) { return } - if title.Valid { - f.Title = title.String + f.Title = StringValue(title) + f.Description = StringValue(description) + f.Language = StringValue(language) + f.SiteURL = StringValue(siteUrl) + f.DiscoveredAt = discoveredAt.Format(time.RFC3339) + if lastCrawledAt != nil { + f.LastCrawledAt = lastCrawledAt.Format(time.RFC3339) } - if description.Valid { - f.Description = description.String + if lastBuildDate != nil { + f.LastBuildDate = lastBuildDate.Format(time.RFC3339) } - if language.Valid { - f.Language = language.String + if ttlMinutes != nil { + f.TTLMinutes = *ttlMinutes } - if siteUrl.Valid { - f.SiteURL = siteUrl.String + f.UpdatePeriod = StringValue(updatePeriod) + if updateFreq != nil { + f.UpdateFreq = *updateFreq } - if lastCrawledAt.Valid { - f.LastCrawledAt = lastCrawledAt.String + f.Status = StringValue(status) + if errorCount != nil { + f.ErrorCount = *errorCount } - if lastBuildDate.Valid { - f.LastBuildDate = lastBuildDate.String + f.LastError = StringValue(lastError) + if itemCount != nil { + f.ItemCount = *itemCount } - if ttlMinutes.Valid { - f.TTLMinutes = int(ttlMinutes.Int64) + if avgPostFreqHrs != nil { + f.AvgPostFreqHrs = *avgPostFreqHrs } - if updatePeriod.Valid { - f.UpdatePeriod = updatePeriod.String + if oldestItemDate != nil { + f.OldestItemDate = oldestItemDate.Format(time.RFC3339) } - if updateFreq.Valid { - f.UpdateFreq = int(updateFreq.Int64) - } - if status.Valid { - f.Status = status.String - } - if errorCount.Valid { - f.ErrorCount = int(errorCount.Int64) - } - if lastError.Valid { - f.LastError = lastError.String - } - if itemCount.Valid { - f.ItemCount = int(itemCount.Int64) - } - if avgPostFreqHrs.Valid { - f.AvgPostFreqHrs = avgPostFreqHrs.Float64 - } - if oldestItemDate.Valid { - f.OldestItemDate = oldestItemDate.String - } - if newestItemDate.Valid { - f.NewestItemDate = newestItemDate.String + if newestItemDate != nil { + f.NewestItemDate = newestItemDate.Format(time.RFC3339) } w.Header().Set("Content-Type", "application/json") @@ -622,16 +614,18 @@ func (c *Crawler) handleAPISearch(w http.ResponseWriter, r *http.Request) { results := make(map[string]*SearchResult) // Helper to scan feed row into SearchFeed - scanFeed := func(rows *sql.Rows) (string, SearchFeed, bool) { + scanFeed := func(rows pgx.Rows) (string, SearchFeed, bool) { var url string - var feedType, category, title, description, language, siteUrl sql.NullString - var discoveredAt, lastCrawledAt, nextCrawlAt, lastBuildDate sql.NullString - var ttlMinutes, updateFreq, errorCount, itemCount sql.NullInt64 - var updatePeriod, status, lastError, lastErrorAt sql.NullString - var sourceUrl, sourceHost, tld sql.NullString - var avgPostFreqHrs sql.NullFloat64 - var oldestItemDate, newestItemDate sql.NullString - var noUpdate sql.NullInt64 + var feedType, category, title, description, language, siteUrl *string + var discoveredAt time.Time + var lastCrawledAt, nextCrawlAt, lastBuildDate *time.Time + var ttlMinutes, updateFreq, errorCount, itemCount *int + var updatePeriod, status, lastError *string + var lastErrorAt *time.Time + var sourceUrl, sourceHost, tld *string + var avgPostFreqHrs *float64 + var oldestItemDate, newestItemDate *time.Time + var noUpdate *bool if err := rows.Scan(&url, &feedType, &category, &title, &description, &language, &siteUrl, &discoveredAt, &lastCrawledAt, &nextCrawlAt, &lastBuildDate, @@ -641,52 +635,77 @@ func (c *Crawler) handleAPISearch(w http.ResponseWriter, r *http.Request) { &itemCount, &avgPostFreqHrs, &oldestItemDate, &newestItemDate, &noUpdate); err != nil { return "", SearchFeed{}, false } - cat := category.String + cat := StringValue(category) if cat == "" { cat = "main" } - return url, SearchFeed{ + sf := SearchFeed{ URL: url, - Type: feedType.String, + Type: StringValue(feedType), Category: cat, - Title: title.String, - Description: description.String, - Language: language.String, - SiteURL: siteUrl.String, - DiscoveredAt: discoveredAt.String, - LastCrawledAt: lastCrawledAt.String, - NextCrawlAt: nextCrawlAt.String, - LastBuildDate: lastBuildDate.String, - TTLMinutes: int(ttlMinutes.Int64), - UpdatePeriod: updatePeriod.String, - UpdateFreq: int(updateFreq.Int64), - Status: status.String, - ErrorCount: int(errorCount.Int64), - LastError: lastError.String, - LastErrorAt: lastErrorAt.String, - SourceURL: sourceUrl.String, - SourceHost: sourceHost.String, - TLD: tld.String, - ItemCount: int(itemCount.Int64), - AvgPostFreqHrs: avgPostFreqHrs.Float64, - OldestItemDate: oldestItemDate.String, - NewestItemDate: newestItemDate.String, - NoUpdate: noUpdate.Int64 != 0, - }, true + Title: StringValue(title), + Description: StringValue(description), + Language: StringValue(language), + SiteURL: StringValue(siteUrl), + DiscoveredAt: discoveredAt.Format(time.RFC3339), + UpdatePeriod: StringValue(updatePeriod), + Status: StringValue(status), + LastError: StringValue(lastError), + SourceURL: StringValue(sourceUrl), + SourceHost: StringValue(sourceHost), + TLD: StringValue(tld), + } + if lastCrawledAt != nil { + sf.LastCrawledAt = lastCrawledAt.Format(time.RFC3339) + } + if nextCrawlAt != nil { + sf.NextCrawlAt = nextCrawlAt.Format(time.RFC3339) + } + if lastBuildDate != nil { + sf.LastBuildDate = lastBuildDate.Format(time.RFC3339) + } + if ttlMinutes != nil { + sf.TTLMinutes = *ttlMinutes + } + if updateFreq != nil { + sf.UpdateFreq = *updateFreq + } + if errorCount != nil { + sf.ErrorCount = *errorCount + } + if lastErrorAt != nil { + sf.LastErrorAt = lastErrorAt.Format(time.RFC3339) + } + if itemCount != nil { + sf.ItemCount = *itemCount + } + if avgPostFreqHrs != nil { + sf.AvgPostFreqHrs = *avgPostFreqHrs + } + if oldestItemDate != nil { + sf.OldestItemDate = oldestItemDate.Format(time.RFC3339) + } + if newestItemDate != nil { + sf.NewestItemDate = newestItemDate.Format(time.RFC3339) + } + if noUpdate != nil { + sf.NoUpdate = *noUpdate + } + return url, sf, true } - // Search feeds by sourceHost (LIKE search for domain matching) - hostRows, err := c.db.Query(` - SELECT url, type, category, title, description, language, siteUrl, - discoveredAt, lastCrawledAt, nextCrawlAt, lastBuildDate, - ttlMinutes, updatePeriod, updateFreq, - status, errorCount, lastError, lastErrorAt, - sourceUrl, sourceHost, tld, - itemCount, avgPostFreqHrs, oldestItemDate, newestItemDate, noUpdate + // Search feeds by source_host (LIKE search for domain matching) + hostRows, err := c.db.Query( ` + SELECT url, type, category, title, description, language, site_url, + discovered_at, last_crawled_at, next_crawl_at, last_build_date, + ttl_minutes, update_period, update_freq, + status, error_count, last_error, last_error_at, + source_url, source_host, tld, + item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date, no_update FROM feeds - WHERE sourceHost LIKE ? OR url LIKE ? - LIMIT ? - `, "%"+query+"%", "%"+query+"%", limit) + WHERE source_host ILIKE $1 OR url ILIKE $1 + LIMIT $2 + `, "%"+query+"%", limit) if err == nil { defer hostRows.Close() for hostRows.Next() { @@ -698,19 +717,19 @@ func (c *Crawler) handleAPISearch(w http.ResponseWriter, r *http.Request) { } } - // Search feeds via FTS (title, description, url content) - feedRows, err := c.db.Query(` - SELECT f.url, f.type, f.category, f.title, f.description, f.language, f.siteUrl, - f.discoveredAt, f.lastCrawledAt, f.nextCrawlAt, f.lastBuildDate, - f.ttlMinutes, f.updatePeriod, f.updateFreq, - f.status, f.errorCount, f.lastError, f.lastErrorAt, - f.sourceUrl, f.sourceHost, f.tld, - f.itemCount, f.avgPostFreqHrs, f.oldestItemDate, f.newestItemDate, f.noUpdate - FROM feeds f - JOIN feeds_fts fts ON f.rowid = fts.rowid - WHERE feeds_fts MATCH ? - LIMIT ? - `, query, limit) + // Search feeds via full-text search + tsQuery := ToSearchQuery(query) + feedRows, err := c.db.Query( ` + SELECT url, type, category, title, description, language, site_url, + discovered_at, last_crawled_at, next_crawl_at, last_build_date, + ttl_minutes, update_period, update_freq, + status, error_count, last_error, last_error_at, + source_url, source_host, tld, + item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date, no_update + FROM feeds + WHERE search_vector @@ to_tsquery('english', $1) + LIMIT $2 + `, tsQuery, limit) if err == nil { defer feedRows.Close() for feedRows.Next() { @@ -722,37 +741,43 @@ func (c *Crawler) handleAPISearch(w http.ResponseWriter, r *http.Request) { } } - // Search items - itemRows, err := c.db.Query(` - SELECT i.id, i.feedUrl, i.guid, i.title, i.link, i.description, i.content, i.author, i.pubDate, i.discoveredAt, i.updatedAt + // Search items via full-text search + itemRows, err := c.db.Query( ` + SELECT i.id, i.feed_url, i.guid, i.title, i.link, i.description, i.content, i.author, i.pub_date, i.discovered_at, i.updated_at FROM items i - JOIN items_fts fts ON i.id = fts.rowid - WHERE items_fts MATCH ? - ORDER BY i.pubDate DESC - LIMIT ? - `, query, limit) + WHERE i.search_vector @@ to_tsquery('english', $1) + ORDER BY i.pub_date DESC + LIMIT $2 + `, tsQuery, limit) if err == nil { defer itemRows.Close() for itemRows.Next() { var id int64 var feedUrl string - var guid, title, link, description, content, author, pubDate, discoveredAt, updatedAt sql.NullString + var guid, title, link, description, content, author *string + var pubDate, discoveredAt, updatedAt *time.Time if err := itemRows.Scan(&id, &feedUrl, &guid, &title, &link, &description, &content, &author, &pubDate, &discoveredAt, &updatedAt); err != nil { continue } item := SearchItem{ - ID: id, - FeedURL: feedUrl, - GUID: guid.String, - Title: title.String, - Link: link.String, - Description: description.String, - Content: content.String, - Author: author.String, - PubDate: pubDate.String, - DiscoveredAt: discoveredAt.String, - UpdatedAt: updatedAt.String, + ID: id, + FeedURL: feedUrl, + GUID: StringValue(guid), + Title: StringValue(title), + Link: StringValue(link), + Description: StringValue(description), + Content: StringValue(content), + Author: StringValue(author), + } + if pubDate != nil { + item.PubDate = pubDate.Format(time.RFC3339) + } + if discoveredAt != nil { + item.DiscoveredAt = discoveredAt.Format(time.RFC3339) + } + if updatedAt != nil { + item.UpdatedAt = updatedAt.Format(time.RFC3339) } // Add to existing result or create new one @@ -760,23 +785,25 @@ func (c *Crawler) handleAPISearch(w http.ResponseWriter, r *http.Request) { result.Items = append(result.Items, item) } else { // Fetch feed info for this item's feed - var fType, fCategory, fTitle, fDesc, fLang, fSiteUrl sql.NullString - var fDiscoveredAt, fLastCrawledAt, fNextCrawlAt, fLastBuildDate sql.NullString - var fTTLMinutes, fUpdateFreq, fErrorCount, fItemCount sql.NullInt64 - var fUpdatePeriod, fStatus, fLastError, fLastErrorAt sql.NullString - var fSourceUrl, fSourceHost, fTLD sql.NullString - var fAvgPostFreqHrs sql.NullFloat64 - var fOldestItemDate, fNewestItemDate sql.NullString - var fNoUpdate sql.NullInt64 + var fType, fCategory, fTitle, fDesc, fLang, fSiteUrl *string + var fDiscoveredAt time.Time + var fLastCrawledAt, fNextCrawlAt, fLastBuildDate *time.Time + var fTTLMinutes, fUpdateFreq, fErrorCount, fItemCount *int + var fUpdatePeriod, fStatus, fLastError *string + var fLastErrorAt *time.Time + var fSourceUrl, fSourceHost, fTLD *string + var fAvgPostFreqHrs *float64 + var fOldestItemDate, fNewestItemDate *time.Time + var fNoUpdate *bool - c.db.QueryRow(` - SELECT type, category, title, description, language, siteUrl, - discoveredAt, lastCrawledAt, nextCrawlAt, lastBuildDate, - ttlMinutes, updatePeriod, updateFreq, - status, errorCount, lastError, lastErrorAt, - sourceUrl, sourceHost, tld, - itemCount, avgPostFreqHrs, oldestItemDate, newestItemDate, noUpdate - FROM feeds WHERE url = ? + c.db.QueryRow( ` + SELECT type, category, title, description, language, site_url, + discovered_at, last_crawled_at, next_crawl_at, last_build_date, + ttl_minutes, update_period, update_freq, + status, error_count, last_error, last_error_at, + source_url, source_host, tld, + item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date, no_update + FROM feeds WHERE url = $1 `, feedUrl).Scan(&fType, &fCategory, &fTitle, &fDesc, &fLang, &fSiteUrl, &fDiscoveredAt, &fLastCrawledAt, &fNextCrawlAt, &fLastBuildDate, &fTTLMinutes, &fUpdatePeriod, &fUpdateFreq, @@ -784,39 +811,64 @@ func (c *Crawler) handleAPISearch(w http.ResponseWriter, r *http.Request) { &fSourceUrl, &fSourceHost, &fTLD, &fItemCount, &fAvgPostFreqHrs, &fOldestItemDate, &fNewestItemDate, &fNoUpdate) - fCat := fCategory.String + fCat := StringValue(fCategory) if fCat == "" { fCat = "main" } + sf := SearchFeed{ + URL: feedUrl, + Type: StringValue(fType), + Category: fCat, + Title: StringValue(fTitle), + Description: StringValue(fDesc), + Language: StringValue(fLang), + SiteURL: StringValue(fSiteUrl), + DiscoveredAt: fDiscoveredAt.Format(time.RFC3339), + UpdatePeriod: StringValue(fUpdatePeriod), + Status: StringValue(fStatus), + LastError: StringValue(fLastError), + SourceURL: StringValue(fSourceUrl), + SourceHost: StringValue(fSourceHost), + TLD: StringValue(fTLD), + } + if fLastCrawledAt != nil { + sf.LastCrawledAt = fLastCrawledAt.Format(time.RFC3339) + } + if fNextCrawlAt != nil { + sf.NextCrawlAt = fNextCrawlAt.Format(time.RFC3339) + } + if fLastBuildDate != nil { + sf.LastBuildDate = fLastBuildDate.Format(time.RFC3339) + } + if fTTLMinutes != nil { + sf.TTLMinutes = *fTTLMinutes + } + if fUpdateFreq != nil { + sf.UpdateFreq = *fUpdateFreq + } + if fErrorCount != nil { + sf.ErrorCount = *fErrorCount + } + if fLastErrorAt != nil { + sf.LastErrorAt = fLastErrorAt.Format(time.RFC3339) + } + if fItemCount != nil { + sf.ItemCount = *fItemCount + } + if fAvgPostFreqHrs != nil { + sf.AvgPostFreqHrs = *fAvgPostFreqHrs + } + if fOldestItemDate != nil { + sf.OldestItemDate = fOldestItemDate.Format(time.RFC3339) + } + if fNewestItemDate != nil { + sf.NewestItemDate = fNewestItemDate.Format(time.RFC3339) + } + if fNoUpdate != nil { + sf.NoUpdate = *fNoUpdate + } results[feedUrl] = &SearchResult{ - Feed: SearchFeed{ - URL: feedUrl, - Type: fType.String, - Category: fCat, - Title: fTitle.String, - Description: fDesc.String, - Language: fLang.String, - SiteURL: fSiteUrl.String, - DiscoveredAt: fDiscoveredAt.String, - LastCrawledAt: fLastCrawledAt.String, - NextCrawlAt: fNextCrawlAt.String, - LastBuildDate: fLastBuildDate.String, - TTLMinutes: int(fTTLMinutes.Int64), - UpdatePeriod: fUpdatePeriod.String, - UpdateFreq: int(fUpdateFreq.Int64), - Status: fStatus.String, - ErrorCount: int(fErrorCount.Int64), - LastError: fLastError.String, - LastErrorAt: fLastErrorAt.String, - SourceURL: fSourceUrl.String, - SourceHost: fSourceHost.String, - TLD: fTLD.String, - ItemCount: int(fItemCount.Int64), - AvgPostFreqHrs: fAvgPostFreqHrs.Float64, - OldestItemDate: fOldestItemDate.String, - NewestItemDate: fNewestItemDate.String, - NoUpdate: fNoUpdate.Int64 != 0, - }, + Feed: sf, Items: []SearchItem{item}, } } @@ -852,17 +904,17 @@ func (c *Crawler) handleAPIDomainsByStatus(w http.ResponseWriter, r *http.Reques fmt.Sscanf(o, "%d", &offset) } - rows, err := c.db.Query(` - SELECT d.host, d.tld, d.status, d.lastError, COALESCE(f.feed_count, 0) as feed_count + rows, err := c.db.Query( ` + SELECT d.host, d.tld, d.status, d.last_error, COALESCE(f.feed_count, 0) as feed_count FROM domains d LEFT JOIN ( - SELECT sourceHost, COUNT(*) as feed_count + SELECT source_host, COUNT(*) as feed_count FROM feeds - GROUP BY sourceHost - ) f ON d.host = f.sourceHost - WHERE d.status = ? + GROUP BY source_host + ) f ON d.host = f.source_host + WHERE d.status = $1 ORDER BY d.host ASC - LIMIT ? OFFSET ? + LIMIT $2 OFFSET $3 `, status, limit, offset) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -881,16 +933,12 @@ func (c *Crawler) handleAPIDomainsByStatus(w http.ResponseWriter, r *http.Reques var domains []DomainInfo for rows.Next() { var d DomainInfo - var tld, lastError sql.NullString + var tld, lastError *string if err := rows.Scan(&d.Host, &tld, &d.Status, &lastError, &d.FeedCount); err != nil { continue } - if tld.Valid { - d.TLD = tld.String - } - if lastError.Valid { - d.LastError = lastError.String - } + d.TLD = StringValue(tld) + d.LastError = StringValue(lastError) domains = append(domains, d) } @@ -917,12 +965,12 @@ func (c *Crawler) handleAPIFeedsByStatus(w http.ResponseWriter, r *http.Request) fmt.Sscanf(o, "%d", &offset) } - rows, err := c.db.Query(` - SELECT url, title, type, sourceHost, tld, status, errorCount, lastError, itemCount + rows, err := c.db.Query( ` + SELECT url, title, type, source_host, tld, status, error_count, last_error, item_count FROM feeds - WHERE status = ? + WHERE status = $1 ORDER BY url ASC - LIMIT ? OFFSET ? + LIMIT $2 OFFSET $3 `, status, limit, offset) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -945,28 +993,20 @@ func (c *Crawler) handleAPIFeedsByStatus(w http.ResponseWriter, r *http.Request) var feeds []FeedInfo for rows.Next() { var f FeedInfo - var title, sourceHost, tld, lastError sql.NullString - var errorCount, itemCount sql.NullInt64 + var title, sourceHost, tld, lastError *string + var errorCount, itemCount *int if err := rows.Scan(&f.URL, &title, &f.Type, &sourceHost, &tld, &f.Status, &errorCount, &lastError, &itemCount); err != nil { continue } - if title.Valid { - f.Title = title.String + f.Title = StringValue(title) + f.SourceHost = StringValue(sourceHost) + f.TLD = StringValue(tld) + f.LastError = StringValue(lastError) + if errorCount != nil { + f.ErrorCount = *errorCount } - if sourceHost.Valid { - f.SourceHost = sourceHost.String - } - if tld.Valid { - f.TLD = tld.String - } - if errorCount.Valid { - f.ErrorCount = int(errorCount.Int64) - } - if lastError.Valid { - f.LastError = lastError.String - } - if itemCount.Valid { - f.ItemCount = int(itemCount.Int64) + if itemCount != nil { + f.ItemCount = *itemCount } feeds = append(feeds, f) } @@ -982,9 +1022,9 @@ func (c *Crawler) handleAPIRevisitDomain(w http.ResponseWriter, r *http.Request) return } - _, err := c.db.Exec(` - UPDATE domains SET status = 'unchecked', lastError = NULL - WHERE host = ? + _, err := c.db.Exec( ` + UPDATE domains SET status = 'unchecked', last_error = NULL + WHERE host = $1 `, host) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -1006,10 +1046,10 @@ func (c *Crawler) handleAPIPriorityCrawl(w http.ResponseWriter, r *http.Request) host = normalizeHost(host) // Add domain if it doesn't exist, or reset to unchecked - _, err := c.db.Exec(` - INSERT INTO domains (host, status, discoveredAt, tld) - VALUES (?, 'unchecked', datetime('now'), ?) - ON CONFLICT(host) DO UPDATE SET status = 'unchecked', lastError = NULL + _, err := c.db.Exec( ` + INSERT INTO domains (host, status, discovered_at, tld) + VALUES ($1, 'unchecked', NOW(), $2) + ON CONFLICT(host) DO UPDATE SET status = 'unchecked', last_error = NULL `, host, getTLD(host)) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -1178,29 +1218,32 @@ func (c *Crawler) handleAPIFilter(w http.ResponseWriter, r *http.Request) { func (c *Crawler) filterDomains(w http.ResponseWriter, tld, status string, limit, offset int) { var args []interface{} + argNum := 1 query := ` - SELECT d.host, d.tld, d.status, d.lastError, COALESCE(f.feed_count, 0) as feed_count + SELECT d.host, d.tld, d.status, d.last_error, COALESCE(f.feed_count, 0) as feed_count FROM domains d LEFT JOIN ( - SELECT sourceHost, COUNT(*) as feed_count + SELECT source_host, COUNT(*) as feed_count FROM feeds - GROUP BY sourceHost - ) f ON d.host = f.sourceHost + GROUP BY source_host + ) f ON d.host = f.source_host WHERE 1=1` if tld != "" { - query += " AND d.tld = ?" + query += fmt.Sprintf(" AND d.tld = $%d", argNum) args = append(args, tld) + argNum++ } if status != "" { - query += " AND d.status = ?" + query += fmt.Sprintf(" AND d.status = $%d", argNum) args = append(args, status) + argNum++ } - query += " ORDER BY d.host ASC LIMIT ? OFFSET ?" + query += fmt.Sprintf(" ORDER BY d.host ASC LIMIT $%d OFFSET $%d", argNum, argNum+1) args = append(args, limit, offset) - rows, err := c.db.Query(query, args...) + rows, err := c.db.Query( query, args...) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -1218,16 +1261,12 @@ func (c *Crawler) filterDomains(w http.ResponseWriter, tld, status string, limit var domains []DomainInfo for rows.Next() { var d DomainInfo - var tldVal, lastError sql.NullString + var tldVal, lastError *string if err := rows.Scan(&d.Host, &tldVal, &d.Status, &lastError, &d.FeedCount); err != nil { continue } - if tldVal.Valid { - d.TLD = tldVal.String - } - if lastError.Valid { - d.LastError = lastError.String - } + d.TLD = StringValue(tldVal) + d.LastError = StringValue(lastError) domains = append(domains, d) } @@ -1240,28 +1279,32 @@ func (c *Crawler) filterDomains(w http.ResponseWriter, tld, status string, limit func (c *Crawler) filterFeeds(w http.ResponseWriter, tld, domain, status string, limit, offset int) { var args []interface{} + argNum := 1 query := ` - SELECT url, title, type, category, sourceHost, tld, status, errorCount, lastError, itemCount + SELECT url, title, type, category, source_host, tld, status, error_count, last_error, item_count FROM feeds WHERE 1=1` if tld != "" { - query += " AND tld = ?" + query += fmt.Sprintf(" AND tld = $%d", argNum) args = append(args, tld) + argNum++ } if domain != "" { - query += " AND sourceHost = ?" + query += fmt.Sprintf(" AND source_host = $%d", argNum) args = append(args, domain) + argNum++ } if status != "" { - query += " AND status = ?" + query += fmt.Sprintf(" AND status = $%d", argNum) args = append(args, status) + argNum++ } - query += " ORDER BY url ASC LIMIT ? OFFSET ?" + query += fmt.Sprintf(" ORDER BY url ASC LIMIT $%d OFFSET $%d", argNum, argNum+1) args = append(args, limit, offset) - rows, err := c.db.Query(query, args...) + rows, err := c.db.Query( query, args...) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -1284,33 +1327,25 @@ func (c *Crawler) filterFeeds(w http.ResponseWriter, tld, domain, status string, var feeds []FeedInfo for rows.Next() { var f FeedInfo - var title, category, sourceHost, tldVal, lastError sql.NullString - var errorCount, itemCount sql.NullInt64 + var title, category, sourceHost, tldVal, lastError *string + var errorCount, itemCount *int if err := rows.Scan(&f.URL, &title, &f.Type, &category, &sourceHost, &tldVal, &f.Status, &errorCount, &lastError, &itemCount); err != nil { continue } - if title.Valid { - f.Title = title.String - } - if category.Valid { - f.Category = category.String + f.Title = StringValue(title) + if category != nil && *category != "" { + f.Category = *category } else { f.Category = "main" } - if sourceHost.Valid { - f.SourceHost = sourceHost.String + f.SourceHost = StringValue(sourceHost) + f.TLD = StringValue(tldVal) + f.LastError = StringValue(lastError) + if errorCount != nil { + f.ErrorCount = *errorCount } - if tldVal.Valid { - f.TLD = tldVal.String - } - if errorCount.Valid { - f.ErrorCount = int(errorCount.Int64) - } - if lastError.Valid { - f.LastError = lastError.String - } - if itemCount.Valid { - f.ItemCount = int(itemCount.Int64) + if itemCount != nil { + f.ItemCount = *itemCount } feeds = append(feeds, f) } @@ -1324,13 +1359,13 @@ func (c *Crawler) filterFeeds(w http.ResponseWriter, tld, domain, status string, func (c *Crawler) handleAPITLDs(w http.ResponseWriter, r *http.Request) { rows, err := c.db.Query(` - SELECT d.tld, COUNT(DISTINCT d.host) as domain_count, COALESCE(SUM(f.feed_count), 0) as feed_count + SELECT d.tld, COUNT(DISTINCT d.host)::int as domain_count, COALESCE(SUM(f.feed_count), 0)::int as feed_count FROM domains d LEFT JOIN ( - SELECT sourceHost, COUNT(*) as feed_count + SELECT source_host, COUNT(*)::int as feed_count FROM feeds - GROUP BY sourceHost - ) f ON d.host = f.sourceHost + GROUP BY source_host + ) f ON d.host = f.source_host WHERE d.tld IS NOT NULL AND d.tld != '' GROUP BY d.tld ORDER BY d.tld ASC @@ -1347,7 +1382,7 @@ func (c *Crawler) handleAPITLDs(w http.ResponseWriter, r *http.Request) { FeedCount int `json:"feed_count"` } - var tlds []TLDInfo + tlds := []TLDInfo{} // Initialize as empty slice, not nil for rows.Next() { var t TLDInfo if err := rows.Scan(&t.TLD, &t.DomainCount, &t.FeedCount); err != nil { @@ -1379,17 +1414,17 @@ func (c *Crawler) handleAPITLDDomains(w http.ResponseWriter, r *http.Request) { fmt.Sscanf(o, "%d", &offset) } - rows, err := c.db.Query(` - SELECT d.host, d.status, d.lastError, COALESCE(f.feed_count, 0) as feed_count + rows, err := c.db.Query( ` + SELECT d.host, d.status, d.last_error, COALESCE(f.feed_count, 0) as feed_count FROM domains d LEFT JOIN ( - SELECT sourceHost, COUNT(*) as feed_count + SELECT source_host, COUNT(*) as feed_count FROM feeds - GROUP BY sourceHost - ) f ON d.host = f.sourceHost - WHERE d.tld = ? + GROUP BY source_host + ) f ON d.host = f.source_host + WHERE d.tld = $1 ORDER BY d.host ASC - LIMIT ? OFFSET ? + LIMIT $2 OFFSET $3 `, tld, limit, offset) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -1407,13 +1442,11 @@ func (c *Crawler) handleAPITLDDomains(w http.ResponseWriter, r *http.Request) { var domains []DomainInfo for rows.Next() { var d DomainInfo - var lastError sql.NullString + var lastError *string if err := rows.Scan(&d.Host, &d.Status, &lastError, &d.FeedCount); err != nil { continue } - if lastError.Valid { - d.LastError = lastError.String - } + d.LastError = StringValue(lastError) domains = append(domains, d) } @@ -1487,7 +1520,7 @@ func (c *Crawler) handleAPIDeriveHandle(w http.ResponseWriter, r *http.Request) }) } -// handleAPIDisablePublish sets a feed's publish status to 'fail' +// handleAPIDisablePublish sets a feed's publish status to 'deny' func (c *Crawler) handleAPIDisablePublish(w http.ResponseWriter, r *http.Request) { feedURL := r.URL.Query().Get("url") if feedURL == "" { @@ -1497,14 +1530,14 @@ func (c *Crawler) handleAPIDisablePublish(w http.ResponseWriter, r *http.Request feedURL = normalizeURL(feedURL) - if err := c.SetPublishStatus(feedURL, "fail", ""); err != nil { + if err := c.SetPublishStatus(feedURL, "deny", ""); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ - "status": "fail", + "status": "deny", "url": feedURL, }) } @@ -1543,6 +1576,37 @@ func (c *Crawler) handleAPIPublishEnabled(w http.ResponseWriter, r *http.Request json.NewEncoder(w).Encode(result) } +// handleAPIPublishDenied returns all feeds with publish status 'deny' +func (c *Crawler) handleAPIPublishDenied(w http.ResponseWriter, r *http.Request) { + feeds, err := c.GetFeedsByPublishStatus("deny") + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + type FeedDeniedInfo struct { + URL string `json:"url"` + Title string `json:"title"` + SourceHost string `json:"source_host"` + } + + var result []FeedDeniedInfo + for _, f := range feeds { + result = append(result, FeedDeniedInfo{ + URL: f.URL, + Title: f.Title, + SourceHost: f.SourceHost, + }) + } + + if result == nil { + result = []FeedDeniedInfo{} + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(result) +} + // handleAPIPublishCandidates returns feeds pending review that have items func (c *Crawler) handleAPIPublishCandidates(w http.ResponseWriter, r *http.Request) { limit := 50 @@ -1589,7 +1653,7 @@ func (c *Crawler) handleAPIPublishCandidates(w http.ResponseWriter, r *http.Requ } // handleAPISetPublishStatus sets the publish status for a feed -// status must be 'pass', 'fail', or 'pending' +// status must be 'pass', 'deny', or 'held' func (c *Crawler) handleAPISetPublishStatus(w http.ResponseWriter, r *http.Request) { feedURL := r.URL.Query().Get("url") status := r.URL.Query().Get("status") @@ -1599,8 +1663,8 @@ func (c *Crawler) handleAPISetPublishStatus(w http.ResponseWriter, r *http.Reque http.Error(w, "url parameter required", http.StatusBadRequest) return } - if status != "pass" && status != "fail" && status != "held" { - http.Error(w, "status must be 'pass', 'fail', or 'held'", http.StatusBadRequest) + if status != "pass" && status != "deny" && status != "held" { + http.Error(w, "status must be 'pass', 'deny', or 'held'", http.StatusBadRequest) return } @@ -1679,13 +1743,13 @@ func (c *Crawler) handleAPITestPublish(w http.ResponseWriter, r *http.Request) { // Get the item var item Item - var guid, title, link, description, content, author sql.NullString - var pubDate, updatedAt, publishedAt sql.NullTime - var publishedUri sql.NullString + var guid, title, link, description, content, author *string + var pubDate, updatedAt, publishedAt *time.Time + var publishedUri *string - err := c.db.QueryRow(` - SELECT id, feedUrl, guid, title, link, description, content, author, pubDate, discoveredAt, updatedAt, publishedAt, publishedUri - FROM items WHERE id = ? + err := c.db.QueryRow( ` + SELECT id, feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at, published_at, published_uri + FROM items WHERE id = $1 `, itemID).Scan( &item.ID, &item.FeedURL, &guid, &title, &link, &description, &content, &author, &pubDate, @@ -1696,26 +1760,14 @@ func (c *Crawler) handleAPITestPublish(w http.ResponseWriter, r *http.Request) { return } - if guid.Valid { - item.GUID = guid.String - } - if title.Valid { - item.Title = title.String - } - if link.Valid { - item.Link = link.String - } - if description.Valid { - item.Description = description.String - } - if content.Valid { - item.Content = content.String - } - if author.Valid { - item.Author = author.String - } - if pubDate.Valid { - item.PubDate = pubDate.Time + item.GUID = StringValue(guid) + item.Title = StringValue(title) + item.Link = StringValue(link) + item.Description = StringValue(description) + item.Content = StringValue(content) + item.Author = StringValue(author) + if pubDate != nil { + item.PubDate = *pubDate } // Create publisher and authenticate @@ -1738,11 +1790,11 @@ func (c *Crawler) handleAPITestPublish(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ - "status": "published", - "uri": uri, - "itemId": item.ID, - "title": item.Title, - "rkey": GenerateRkey(item.GUID, item.DiscoveredAt), + "status": "published", + "uri": uri, + "itemId": item.ID, + "title": item.Title, + "rkey": GenerateRkey(item.GUID, item.DiscoveredAt), }) } @@ -1811,7 +1863,7 @@ func (c *Crawler) handleAPIPublishFeed(w http.ResponseWriter, r *http.Request) { published := 0 failed := 0 - for _, item := range items { + for i, item := range items { result := PublishResult{ ItemID: item.ID, Title: item.Title, @@ -1828,6 +1880,11 @@ func (c *Crawler) handleAPIPublishFeed(w http.ResponseWriter, r *http.Request) { } results = append(results, result) + + // Add delay between posts to ensure unique timestamps for relay indexing + if i < len(items)-1 { + time.Sleep(1100 * time.Millisecond) + } } w.Header().Set("Content-Type", "application/json") @@ -2019,7 +2076,7 @@ func (c *Crawler) handleAPIPublishFeedFull(w http.ResponseWriter, r *http.Reques published := 0 failed := 0 - for _, item := range items { + for i, item := range items { result := PublishResult{ ItemID: item.ID, Title: item.Title, @@ -2036,6 +2093,11 @@ func (c *Crawler) handleAPIPublishFeedFull(w http.ResponseWriter, r *http.Reques } results = append(results, result) + + // Add delay between posts to ensure unique timestamps for relay indexing + if i < len(items)-1 { + time.Sleep(1100 * time.Millisecond) + } } w.Header().Set("Content-Type", "application/json") @@ -2166,7 +2228,7 @@ const dashboardHTML = ` 1440.news Feed Crawler - +

1440.news Feed Crawler

@@ -2240,6 +2302,8 @@ const dashboardHTML = `
+
v18
+
Last updated: {{.UpdatedAt.Format "2006-01-02 15:04:05"}}
` diff --git a/db.go b/db.go index 03d48cb..db3d740 100644 --- a/db.go +++ b/db.go @@ -1,27 +1,31 @@ package main import ( - "database/sql" + "context" "fmt" + "net/url" + "os" + "strings" "time" - _ "modernc.org/sqlite" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" ) const schema = ` CREATE TABLE IF NOT EXISTS domains ( host TEXT PRIMARY KEY, status TEXT NOT NULL DEFAULT 'unchecked', - discoveredAt DATETIME NOT NULL, - lastCrawledAt DATETIME, - feedsFound INTEGER DEFAULT 0, - lastError TEXT, + discovered_at TIMESTAMPTZ NOT NULL, + last_crawled_at TIMESTAMPTZ, + feeds_found INTEGER DEFAULT 0, + last_error TEXT, tld TEXT ); CREATE INDEX IF NOT EXISTS idx_domains_status ON domains(status); CREATE INDEX IF NOT EXISTS idx_domains_tld ON domains(tld); -CREATE INDEX IF NOT EXISTS idx_domains_feedsFound ON domains(feedsFound DESC) WHERE feedsFound > 0; +CREATE INDEX IF NOT EXISTS idx_domains_feeds_found ON domains(feeds_found DESC) WHERE feeds_found > 0; CREATE TABLE IF NOT EXISTS feeds ( url TEXT PRIMARY KEY, @@ -30,196 +34,195 @@ CREATE TABLE IF NOT EXISTS feeds ( title TEXT, description TEXT, language TEXT, - siteUrl TEXT, + site_url TEXT, - discoveredAt DATETIME NOT NULL, - lastCrawledAt DATETIME, - nextCrawlAt DATETIME, - lastBuildDate DATETIME, + discovered_at TIMESTAMPTZ NOT NULL, + last_crawled_at TIMESTAMPTZ, + next_crawl_at TIMESTAMPTZ, + last_build_date TIMESTAMPTZ, etag TEXT, - lastModified TEXT, + last_modified TEXT, - ttlMinutes INTEGER, - updatePeriod TEXT, - updateFreq INTEGER, + ttl_minutes INTEGER, + update_period TEXT, + update_freq INTEGER, status TEXT DEFAULT 'active', - errorCount INTEGER DEFAULT 0, - lastError TEXT, - lastErrorAt DATETIME, + error_count INTEGER DEFAULT 0, + last_error TEXT, + last_error_at TIMESTAMPTZ, - sourceUrl TEXT, - sourceHost TEXT, + source_url TEXT, + source_host TEXT, tld TEXT, - itemCount INTEGER, - avgPostFreqHrs REAL, - oldestItemDate DATETIME, - newestItemDate DATETIME, + item_count INTEGER, + avg_post_freq_hrs DOUBLE PRECISION, + oldest_item_date TIMESTAMPTZ, + newest_item_date TIMESTAMPTZ, - noUpdate INTEGER DEFAULT 0, + no_update INTEGER DEFAULT 0, -- Publishing to PDS - publishStatus TEXT DEFAULT 'held' CHECK(publishStatus IN ('held', 'pass', 'fail')), - publishAccount TEXT + publish_status TEXT DEFAULT 'held' CHECK(publish_status IN ('held', 'pass', 'deny')), + publish_account TEXT, + + -- Full-text search vector + search_vector tsvector GENERATED ALWAYS AS ( + setweight(to_tsvector('english', coalesce(title, '')), 'A') || + setweight(to_tsvector('english', coalesce(description, '')), 'B') || + setweight(to_tsvector('english', coalesce(url, '')), 'C') + ) STORED ); -CREATE INDEX IF NOT EXISTS idx_feeds_sourceHost ON feeds(sourceHost); -CREATE INDEX IF NOT EXISTS idx_feeds_publishStatus ON feeds(publishStatus); -CREATE INDEX IF NOT EXISTS idx_feeds_sourceHost_url ON feeds(sourceHost, url); +CREATE INDEX IF NOT EXISTS idx_feeds_source_host ON feeds(source_host); +CREATE INDEX IF NOT EXISTS idx_feeds_publish_status ON feeds(publish_status); +CREATE INDEX IF NOT EXISTS idx_feeds_source_host_url ON feeds(source_host, url); CREATE INDEX IF NOT EXISTS idx_feeds_tld ON feeds(tld); -CREATE INDEX IF NOT EXISTS idx_feeds_tld_sourceHost ON feeds(tld, sourceHost); +CREATE INDEX IF NOT EXISTS idx_feeds_tld_source_host ON feeds(tld, source_host); CREATE INDEX IF NOT EXISTS idx_feeds_type ON feeds(type); CREATE INDEX IF NOT EXISTS idx_feeds_category ON feeds(category); CREATE INDEX IF NOT EXISTS idx_feeds_status ON feeds(status); -CREATE INDEX IF NOT EXISTS idx_feeds_discoveredAt ON feeds(discoveredAt); +CREATE INDEX IF NOT EXISTS idx_feeds_discovered_at ON feeds(discovered_at); CREATE INDEX IF NOT EXISTS idx_feeds_title ON feeds(title); +CREATE INDEX IF NOT EXISTS idx_feeds_search ON feeds USING GIN(search_vector); CREATE TABLE IF NOT EXISTS items ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - feedUrl TEXT NOT NULL, + id BIGSERIAL PRIMARY KEY, + feed_url TEXT NOT NULL, guid TEXT, title TEXT, link TEXT, description TEXT, content TEXT, author TEXT, - pubDate DATETIME, - discoveredAt DATETIME NOT NULL, - updatedAt DATETIME, + pub_date TIMESTAMPTZ, + discovered_at TIMESTAMPTZ NOT NULL, + updated_at TIMESTAMPTZ, -- Media attachments - enclosureUrl TEXT, - enclosureType TEXT, - enclosureLength INTEGER, - imageUrls TEXT, -- JSON array of image URLs + enclosure_url TEXT, + enclosure_type TEXT, + enclosure_length BIGINT, + image_urls TEXT, -- JSON array of image URLs -- Publishing to PDS - publishedAt DATETIME, - publishedUri TEXT, + published_at TIMESTAMPTZ, + published_uri TEXT, - UNIQUE(feedUrl, guid) + -- Full-text search vector + search_vector tsvector GENERATED ALWAYS AS ( + setweight(to_tsvector('english', coalesce(title, '')), 'A') || + setweight(to_tsvector('english', coalesce(description, '')), 'B') || + setweight(to_tsvector('english', coalesce(content, '')), 'C') || + setweight(to_tsvector('english', coalesce(author, '')), 'D') + ) STORED, + + UNIQUE(feed_url, guid) ); -CREATE INDEX IF NOT EXISTS idx_items_feedUrl ON items(feedUrl); -CREATE INDEX IF NOT EXISTS idx_items_pubDate ON items(pubDate DESC); +CREATE INDEX IF NOT EXISTS idx_items_feed_url ON items(feed_url); +CREATE INDEX IF NOT EXISTS idx_items_pub_date ON items(pub_date DESC); CREATE INDEX IF NOT EXISTS idx_items_link ON items(link); -CREATE INDEX IF NOT EXISTS idx_items_feedUrl_pubDate ON items(feedUrl, pubDate DESC); -CREATE INDEX IF NOT EXISTS idx_items_unpublished ON items(feedUrl, publishedAt) WHERE publishedAt IS NULL; +CREATE INDEX IF NOT EXISTS idx_items_feed_url_pub_date ON items(feed_url, pub_date DESC); +CREATE INDEX IF NOT EXISTS idx_items_unpublished ON items(feed_url, published_at) WHERE published_at IS NULL; +CREATE INDEX IF NOT EXISTS idx_items_search ON items USING GIN(search_vector); --- Full-text search for feeds -CREATE VIRTUAL TABLE IF NOT EXISTS feeds_fts USING fts5( - url, - title, - description, - content='feeds', - content_rowid='rowid' -); - --- Triggers to keep FTS in sync -CREATE TRIGGER IF NOT EXISTS feeds_ai AFTER INSERT ON feeds BEGIN - INSERT INTO feeds_fts(rowid, url, title, description) - VALUES (NEW.rowid, NEW.url, NEW.title, NEW.description); +-- Trigger to normalize feed URLs on insert/update (strips https://, http://, www.) +CREATE OR REPLACE FUNCTION normalize_feed_url() +RETURNS TRIGGER AS $$ +BEGIN + NEW.url = regexp_replace(NEW.url, '^https?://', ''); + NEW.url = regexp_replace(NEW.url, '^www\.', ''); + RETURN NEW; END; +$$ LANGUAGE plpgsql; -CREATE TRIGGER IF NOT EXISTS feeds_ad AFTER DELETE ON feeds BEGIN - INSERT INTO feeds_fts(feeds_fts, rowid, url, title, description) - VALUES ('delete', OLD.rowid, OLD.url, OLD.title, OLD.description); -END; - -CREATE TRIGGER IF NOT EXISTS feeds_au AFTER UPDATE ON feeds BEGIN - INSERT INTO feeds_fts(feeds_fts, rowid, url, title, description) - VALUES ('delete', OLD.rowid, OLD.url, OLD.title, OLD.description); - INSERT INTO feeds_fts(rowid, url, title, description) - VALUES (NEW.rowid, NEW.url, NEW.title, NEW.description); -END; - --- Full-text search for items -CREATE VIRTUAL TABLE IF NOT EXISTS items_fts USING fts5( - title, - description, - content, - author, - content='items', - content_rowid='id' -); - --- Triggers to keep items FTS in sync -CREATE TRIGGER IF NOT EXISTS items_ai AFTER INSERT ON items BEGIN - INSERT INTO items_fts(rowid, title, description, content, author) - VALUES (NEW.id, NEW.title, NEW.description, NEW.content, NEW.author); -END; - -CREATE TRIGGER IF NOT EXISTS items_ad AFTER DELETE ON items BEGIN - INSERT INTO items_fts(items_fts, rowid, title, description, content, author) - VALUES ('delete', OLD.id, OLD.title, OLD.description, OLD.content, OLD.author); -END; - -CREATE TRIGGER IF NOT EXISTS items_au AFTER UPDATE ON items BEGIN - INSERT INTO items_fts(items_fts, rowid, title, description, content, author) - VALUES ('delete', OLD.id, OLD.title, OLD.description, OLD.content, OLD.author); - INSERT INTO items_fts(rowid, title, description, content, author) - VALUES (NEW.id, NEW.title, NEW.description, NEW.content, NEW.author); -END; +DROP TRIGGER IF EXISTS normalize_feed_url_trigger ON feeds; +CREATE TRIGGER normalize_feed_url_trigger + BEFORE INSERT OR UPDATE ON feeds + FOR EACH ROW + EXECUTE FUNCTION normalize_feed_url(); ` -func OpenDatabase(dbPath string) (*sql.DB, error) { - fmt.Printf("Opening database: %s\n", dbPath) +// DB wraps pgxpool.Pool with helper methods +type DB struct { + *pgxpool.Pool +} - // Use pragmas in connection string for consistent application - // - busy_timeout: wait up to 10s for locks instead of failing immediately - // - journal_mode: WAL for better concurrency and crash recovery - // - synchronous: NORMAL is safe with WAL (fsync at checkpoint, not every commit) - // - wal_autocheckpoint: checkpoint every 1000 pages (~4MB) to prevent WAL bloat - // - foreign_keys: enforce referential integrity - connStr := dbPath + "?_pragma=busy_timeout(10000)&_pragma=journal_mode(WAL)&_pragma=synchronous(NORMAL)&_pragma=wal_autocheckpoint(1000)&_pragma=foreign_keys(ON)" - db, err := sql.Open("sqlite", connStr) +func OpenDatabase(connString string) (*DB, error) { + fmt.Printf("Connecting to database...\n") + + // If connection string not provided, try environment variables + if connString == "" { + connString = os.Getenv("DATABASE_URL") + } + if connString == "" { + // Build from individual env vars + host := getEnvOrDefault("DB_HOST", "atproto-postgres") + port := getEnvOrDefault("DB_PORT", "5432") + user := getEnvOrDefault("DB_USER", "news_1440") + dbname := getEnvOrDefault("DB_NAME", "news_1440") + + // Support Docker secrets (password file) or direct password + password := os.Getenv("DB_PASSWORD") + if password == "" { + if passwordFile := os.Getenv("DB_PASSWORD_FILE"); passwordFile != "" { + data, err := os.ReadFile(passwordFile) + if err != nil { + return nil, fmt.Errorf("failed to read password file: %v", err) + } + password = strings.TrimSpace(string(data)) + } + } + + connString = fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", + user, url.QueryEscape(password), host, port, dbname) + } + + config, err := pgxpool.ParseConfig(connString) if err != nil { - return nil, fmt.Errorf("failed to open database: %v", err) + return nil, fmt.Errorf("failed to parse connection string: %v", err) } - // Connection pool settings for stability - db.SetMaxOpenConns(4) // Limit concurrent connections - db.SetMaxIdleConns(2) // Keep some connections warm - db.SetConnMaxLifetime(5 * time.Minute) // Recycle connections periodically - db.SetConnMaxIdleTime(1 * time.Minute) // Close idle connections + // Connection pool settings + config.MaxConns = 10 + config.MinConns = 2 + config.MaxConnLifetime = 5 * time.Minute + config.MaxConnIdleTime = 1 * time.Minute - // Verify connection and show journal mode - var journalMode string - if err := db.QueryRow("PRAGMA journal_mode").Scan(&journalMode); err != nil { - fmt.Printf(" Warning: could not query journal_mode: %v\n", err) - } else { - fmt.Printf(" Journal mode: %s\n", journalMode) + ctx := context.Background() + pool, err := pgxpool.NewWithConfig(ctx, config) + if err != nil { + return nil, fmt.Errorf("failed to connect to database: %v", err) } + // Verify connection + if err := pool.Ping(ctx); err != nil { + pool.Close() + return nil, fmt.Errorf("failed to ping database: %v", err) + } + fmt.Println(" Connected to PostgreSQL") + + db := &DB{pool} + // Create schema - if _, err := db.Exec(schema); err != nil { - db.Close() + if _, err := pool.Exec(ctx, schema); err != nil { + pool.Close() return nil, fmt.Errorf("failed to create schema: %v", err) } fmt.Println(" Schema OK") - // Migrations for existing databases - migrations := []string{ - "ALTER TABLE items ADD COLUMN enclosureUrl TEXT", - "ALTER TABLE items ADD COLUMN enclosureType TEXT", - "ALTER TABLE items ADD COLUMN enclosureLength INTEGER", - "ALTER TABLE items ADD COLUMN imageUrls TEXT", - } - for _, m := range migrations { - db.Exec(m) // Ignore errors (column may already exist) - } - - // Run stats and ANALYZE in background to avoid blocking startup with large databases + // Run stats in background go func() { var domainCount, feedCount int - db.QueryRow("SELECT COUNT(*) FROM domains").Scan(&domainCount) - db.QueryRow("SELECT COUNT(*) FROM feeds").Scan(&feedCount) + pool.QueryRow(context.Background(), "SELECT COUNT(*) FROM domains").Scan(&domainCount) + pool.QueryRow(context.Background(), "SELECT COUNT(*) FROM feeds").Scan(&feedCount) fmt.Printf(" Existing data: %d domains, %d feeds\n", domainCount, feedCount) fmt.Println(" Running ANALYZE...") - if _, err := db.Exec("ANALYZE"); err != nil { + if _, err := pool.Exec(context.Background(), "ANALYZE"); err != nil { fmt.Printf(" Warning: ANALYZE failed: %v\n", err) } else { fmt.Println(" ANALYZE complete") @@ -228,3 +231,82 @@ func OpenDatabase(dbPath string) (*sql.DB, error) { return db, nil } + +func getEnvOrDefault(key, defaultVal string) string { + if val := os.Getenv(key); val != "" { + return val + } + return defaultVal +} + +// QueryRow wraps pool.QueryRow for compatibility +func (db *DB) QueryRow(query string, args ...interface{}) pgx.Row { + return db.Pool.QueryRow(context.Background(), query, args...) +} + +// Query wraps pool.Query for compatibility +func (db *DB) Query(query string, args ...interface{}) (pgx.Rows, error) { + return db.Pool.Query(context.Background(), query, args...) +} + +// Exec wraps pool.Exec for compatibility +func (db *DB) Exec(query string, args ...interface{}) (int64, error) { + result, err := db.Pool.Exec(context.Background(), query, args...) + if err != nil { + return 0, err + } + return result.RowsAffected(), nil +} + +// Begin starts a transaction +func (db *DB) Begin() (pgx.Tx, error) { + return db.Pool.Begin(context.Background()) +} + +// Close closes the connection pool +func (db *DB) Close() error { + db.Pool.Close() + return nil +} + +// NullableString returns nil for empty strings, otherwise the string pointer +func NullableString(s string) *string { + if s == "" { + return nil + } + return &s +} + +// NullableTime returns nil for zero times, otherwise the time pointer +func NullableTime(t time.Time) *time.Time { + if t.IsZero() { + return nil + } + return &t +} + +// StringValue returns empty string for nil, otherwise the dereferenced value +func StringValue(s *string) string { + if s == nil { + return "" + } + return *s +} + +// TimeValue returns zero time for nil, otherwise the dereferenced value +func TimeValue(t *time.Time) time.Time { + if t == nil { + return time.Time{} + } + return *t +} + +// ToSearchQuery converts a user query to PostgreSQL tsquery format +func ToSearchQuery(query string) string { + // Simple conversion: split on spaces and join with & + words := strings.Fields(query) + if len(words) == 0 { + return "" + } + return strings.Join(words, " & ") +} diff --git a/docker-compose.yml b/docker-compose.yml index c067351..6abd36a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,11 +6,19 @@ services: stop_grace_period: 30s env_file: - pds.env + environment: + DB_HOST: atproto-postgres + DB_PORT: 5432 + DB_USER: news_1440 + DB_PASSWORD_FILE: /run/secrets/db_password + DB_NAME: news_1440 + secrets: + - db_password volumes: - - ./feeds:/app/feeds - ./vertices.txt.gz:/app/vertices.txt.gz:ro networks: - proxy + - atproto labels: - "traefik.enable=true" # Production: HTTPS with Let's Encrypt @@ -29,6 +37,12 @@ services: # Shared service - "traefik.http.services.app-1440-news.loadbalancer.server.port=4321" +secrets: + db_password: + file: ../postgres/secrets/news_1440_password.txt + networks: proxy: external: true + atproto: + external: true diff --git a/domain.go b/domain.go index 86186eb..75f36d4 100644 --- a/domain.go +++ b/domain.go @@ -3,13 +3,15 @@ package main import ( "bufio" "compress/gzip" - "database/sql" + "context" "fmt" "io" "os" "strings" "sync/atomic" "time" + + "github.com/jackc/pgx/v5" ) // Domain represents a host to be crawled for feeds @@ -23,78 +25,74 @@ type Domain struct { TLD string `json:"tld,omitempty"` } -// saveDomain stores a domain in SQLite +// saveDomain stores a domain in PostgreSQL func (c *Crawler) saveDomain(domain *Domain) error { _, err := c.db.Exec(` - INSERT INTO domains (host, status, discoveredAt, lastCrawledAt, feedsFound, lastError, tld) - VALUES (?, ?, ?, ?, ?, ?, ?) + INSERT INTO domains (host, status, discovered_at, last_crawled_at, feeds_found, last_error, tld) + VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT(host) DO UPDATE SET - status = excluded.status, - lastCrawledAt = excluded.lastCrawledAt, - feedsFound = excluded.feedsFound, - lastError = excluded.lastError, - tld = excluded.tld - `, domain.Host, domain.Status, domain.DiscoveredAt, nullTime(domain.LastCrawledAt), - domain.FeedsFound, nullString(domain.LastError), domain.TLD) + status = EXCLUDED.status, + last_crawled_at = EXCLUDED.last_crawled_at, + feeds_found = EXCLUDED.feeds_found, + last_error = EXCLUDED.last_error, + tld = EXCLUDED.tld + `, domain.Host, domain.Status, domain.DiscoveredAt, NullableTime(domain.LastCrawledAt), + domain.FeedsFound, NullableString(domain.LastError), domain.TLD) return err } // saveDomainTx stores a domain using a transaction -func (c *Crawler) saveDomainTx(tx *sql.Tx, domain *Domain) error { - _, err := tx.Exec(` - INSERT INTO domains (host, status, discoveredAt, lastCrawledAt, feedsFound, lastError, tld) - VALUES (?, ?, ?, ?, ?, ?, ?) +func (c *Crawler) saveDomainTx(tx pgx.Tx, domain *Domain) error { + _, err := tx.Exec(context.Background(), ` + INSERT INTO domains (host, status, discovered_at, last_crawled_at, feeds_found, last_error, tld) + VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT(host) DO NOTHING - `, domain.Host, domain.Status, domain.DiscoveredAt, nullTime(domain.LastCrawledAt), - domain.FeedsFound, nullString(domain.LastError), domain.TLD) + `, domain.Host, domain.Status, domain.DiscoveredAt, NullableTime(domain.LastCrawledAt), + domain.FeedsFound, NullableString(domain.LastError), domain.TLD) return err } // domainExists checks if a domain already exists in the database func (c *Crawler) domainExists(host string) bool { var exists bool - err := c.db.QueryRow("SELECT EXISTS(SELECT 1 FROM domains WHERE host = ?)", normalizeHost(host)).Scan(&exists) + err := c.db.QueryRow("SELECT EXISTS(SELECT 1 FROM domains WHERE host = $1)", normalizeHost(host)).Scan(&exists) return err == nil && exists } -// getDomain retrieves a domain from SQLite +// getDomain retrieves a domain from PostgreSQL func (c *Crawler) getDomain(host string) (*Domain, error) { domain := &Domain{} - var lastCrawledAt sql.NullTime - var lastError sql.NullString + var lastCrawledAt *time.Time + var lastError *string err := c.db.QueryRow(` - SELECT host, status, discoveredAt, lastCrawledAt, feedsFound, lastError, tld - FROM domains WHERE host = ? + SELECT host, status, discovered_at, last_crawled_at, feeds_found, last_error, tld + FROM domains WHERE host = $1 `, normalizeHost(host)).Scan( &domain.Host, &domain.Status, &domain.DiscoveredAt, &lastCrawledAt, &domain.FeedsFound, &lastError, &domain.TLD, ) - if err == sql.ErrNoRows { + if err == pgx.ErrNoRows { return nil, nil } if err != nil { return nil, err } - if lastCrawledAt.Valid { - domain.LastCrawledAt = lastCrawledAt.Time - } - if lastError.Valid { - domain.LastError = lastError.String - } + domain.LastCrawledAt = TimeValue(lastCrawledAt) + domain.LastError = StringValue(lastError) return domain, nil } -// GetUncheckedDomains returns up to limit unchecked domains ordered by discoveredAt (FIFO) +// GetUncheckedDomains returns up to limit unchecked domains ordered by discovered_at (FIFO) func (c *Crawler) GetUncheckedDomains(limit int) ([]*Domain, error) { rows, err := c.db.Query(` - SELECT host, status, discoveredAt, lastCrawledAt, feedsFound, lastError, tld + SELECT host, status, discovered_at, last_crawled_at, feeds_found, last_error, tld FROM domains WHERE status = 'unchecked' - ORDER BY discoveredAt ASC - LIMIT ? + ORDER BY discovered_at ASC + LIMIT $1 `, limit) if err != nil { return nil, err @@ -105,12 +103,12 @@ func (c *Crawler) GetUncheckedDomains(limit int) ([]*Domain, error) { } // scanDomains is a helper to scan multiple domain rows -func (c *Crawler) scanDomains(rows *sql.Rows) ([]*Domain, error) { +func (c *Crawler) scanDomains(rows pgx.Rows) ([]*Domain, error) { var domains []*Domain for rows.Next() { domain := &Domain{} - var lastCrawledAt sql.NullTime - var lastError sql.NullString + var lastCrawledAt *time.Time + var lastError *string if err := rows.Scan( &domain.Host, &domain.Status, &domain.DiscoveredAt, &lastCrawledAt, @@ -119,12 +117,8 @@ func (c *Crawler) scanDomains(rows *sql.Rows) ([]*Domain, error) { continue } - if lastCrawledAt.Valid { - domain.LastCrawledAt = lastCrawledAt.Time - } - if lastError.Valid { - domain.LastError = lastError.String - } + domain.LastCrawledAt = TimeValue(lastCrawledAt) + domain.LastError = StringValue(lastError) domains = append(domains, domain) } @@ -142,13 +136,13 @@ func (c *Crawler) markDomainCrawled(host string, feedsFound int, lastError strin var err error if lastError != "" { _, err = c.db.Exec(` - UPDATE domains SET status = ?, lastCrawledAt = ?, feedsFound = ?, lastError = ? - WHERE host = ? + UPDATE domains SET status = $1, last_crawled_at = $2, feeds_found = $3, last_error = $4 + WHERE host = $5 `, status, time.Now(), feedsFound, lastError, normalizeHost(host)) } else { _, err = c.db.Exec(` - UPDATE domains SET status = ?, lastCrawledAt = ?, feedsFound = ?, lastError = NULL - WHERE host = ? + UPDATE domains SET status = $1, last_crawled_at = $2, feeds_found = $3, last_error = NULL + WHERE host = $4 `, status, time.Now(), feedsFound, normalizeHost(host)) } return err @@ -164,6 +158,23 @@ func (c *Crawler) GetDomainCount() (total int, unchecked int, err error) { return total, unchecked, err } +// ImportTestDomains adds a list of specific domains for testing +func (c *Crawler) ImportTestDomains(domains []string) { + now := time.Now() + for _, host := range domains { + _, err := c.db.Exec(` + INSERT INTO domains (host, status, discovered_at, tld) + VALUES ($1, 'unchecked', $2, $3) + ON CONFLICT(host) DO NOTHING + `, host, now, getTLD(host)) + if err != nil { + fmt.Printf("Error adding test domain %s: %v\n", host, err) + } else { + fmt.Printf("Added test domain: %s\n", host) + } + } +} + // ImportDomainsFromFile reads a vertices file and stores new domains as "unchecked" func (c *Crawler) ImportDomainsFromFile(filename string, limit int) (imported int, skipped int, err error) { file, err := os.Open(filename) @@ -212,7 +223,6 @@ func (c *Crawler) ImportDomainsInBackground(filename string) { const batchSize = 1000 now := time.Now() - nowStr := now.Format("2006-01-02 15:04:05") totalImported := 0 batchCount := 0 @@ -240,31 +250,43 @@ func (c *Crawler) ImportDomainsInBackground(filename string) { break } - // Build bulk INSERT statement - var sb strings.Builder - sb.WriteString("INSERT INTO domains (host, status, discoveredAt, tld) VALUES ") - args := make([]interface{}, 0, len(domains)*4) - for i, d := range domains { - if i > 0 { - sb.WriteString(",") - } - sb.WriteString("(?, 'unchecked', ?, ?)") - args = append(args, d.host, nowStr, d.tld) - } - sb.WriteString(" ON CONFLICT(host) DO NOTHING") - - // Execute bulk insert - result, err := c.db.Exec(sb.String(), args...) - imported := 0 + // Use COPY for bulk insert (much faster than individual INSERTs) + ctx := context.Background() + conn, err := c.db.Acquire(ctx) if err != nil { - fmt.Printf("Bulk insert error: %v\n", err) - } else { - rowsAffected, _ := result.RowsAffected() - imported = int(rowsAffected) + fmt.Printf("Failed to acquire connection: %v\n", err) + break + } + + // Build rows for copy + rows := make([][]interface{}, len(domains)) + for i, d := range domains { + rows[i] = []interface{}{d.host, "unchecked", now, d.tld} + } + + // Use CopyFrom for bulk insert + imported, err := conn.CopyFrom( + ctx, + pgx.Identifier{"domains"}, + []string{"host", "status", "discovered_at", "tld"}, + pgx.CopyFromRows(rows), + ) + conn.Release() + + if err != nil { + // Fall back to individual inserts with ON CONFLICT + for _, d := range domains { + c.db.Exec(` + INSERT INTO domains (host, status, discovered_at, tld) + VALUES ($1, 'unchecked', $2, $3) + ON CONFLICT(host) DO NOTHING + `, d.host, now, d.tld) + } + imported = int64(len(domains)) } batchCount++ - totalImported += imported + totalImported += int(imported) atomic.AddInt32(&c.domainsImported, int32(imported)) // Wait 1 second before the next batch @@ -304,7 +326,6 @@ func (c *Crawler) parseAndStoreDomains(reader io.Reader, limit int) (imported in scanner.Buffer(buf, 1024*1024) now := time.Now() - nowStr := now.Format("2006-01-02 15:04:05") count := 0 const batchSize = 1000 @@ -336,28 +357,21 @@ func (c *Crawler) parseAndStoreDomains(reader io.Reader, limit int) (imported in break } - // Build bulk INSERT statement - var sb strings.Builder - sb.WriteString("INSERT INTO domains (host, status, discoveredAt, tld) VALUES ") - args := make([]interface{}, 0, len(domains)*4) - for i, d := range domains { - if i > 0 { - sb.WriteString(",") + // Insert with ON CONFLICT + for _, d := range domains { + result, err := c.db.Exec(` + INSERT INTO domains (host, status, discovered_at, tld) + VALUES ($1, 'unchecked', $2, $3) + ON CONFLICT(host) DO NOTHING + `, d.host, now, d.tld) + if err != nil { + skipped++ + } else if result > 0 { + imported++ + } else { + skipped++ } - sb.WriteString("(?, 'unchecked', ?, ?)") - args = append(args, d.host, nowStr, d.tld) } - sb.WriteString(" ON CONFLICT(host) DO NOTHING") - - // Execute bulk insert - result, execErr := c.db.Exec(sb.String(), args...) - if execErr != nil { - skipped += len(domains) - continue - } - rowsAffected, _ := result.RowsAffected() - imported += int(rowsAffected) - skipped += len(domains) - int(rowsAffected) if limit > 0 && count >= limit { break @@ -370,18 +384,3 @@ func (c *Crawler) parseAndStoreDomains(reader io.Reader, limit int) (imported in return imported, skipped, nil } - -// Helper functions for SQL null handling -func nullTime(t time.Time) sql.NullTime { - if t.IsZero() { - return sql.NullTime{} - } - return sql.NullTime{Time: t, Valid: true} -} - -func nullString(s string) sql.NullString { - if s == "" { - return sql.NullString{} - } - return sql.NullString{String: s, Valid: true} -} diff --git a/feed.go b/feed.go index 697be82..de55287 100644 --- a/feed.go +++ b/feed.go @@ -1,7 +1,7 @@ package main import ( - "database/sql" + "context" "encoding/json" "fmt" "io" @@ -11,6 +11,8 @@ import ( "strings" "sync/atomic" "time" + + "github.com/jackc/pgx/v5" ) // classifyFeed determines the category of a feed based on URL patterns @@ -115,7 +117,7 @@ type Item struct { UpdatedAt time.Time `json:"updated_at,omitempty"` // Media attachments - Enclosure *Enclosure `json:"enclosure,omitempty"` // Primary enclosure (podcast audio, etc.) + Enclosure *Enclosure `json:"enclosure,omitempty"` // Primary enclosure (podcast audio, etc.) ImageURLs []string `json:"image_urls,omitempty"` // Image URLs extracted from content // Publishing to PDS @@ -144,7 +146,7 @@ type Feed struct { LastModified string `json:"last_modified,omitempty"` // Feed hints for crawl scheduling - TTLMinutes int `json:"ttl_minutes,omitempty"` // From RSS element + TTLMinutes int `json:"ttl_minutes,omitempty"` // From RSS element UpdatePeriod string `json:"update_period,omitempty"` // From sy:updatePeriod (hourly, daily, weekly, monthly, yearly) UpdateFreq int `json:"update_freq,omitempty"` // From sy:updateFrequency @@ -155,7 +157,7 @@ type Feed struct { LastErrorAt time.Time `json:"last_error_at,omitempty"` // Discovery source - SourceURL string `json:"source_url,omitempty"` // Where we found this feed + SourceURL string `json:"source_url,omitempty"` SourceHost string `json:"source_host,omitempty"` TLD string `json:"tld,omitempty"` @@ -169,11 +171,11 @@ type Feed struct { NoUpdate int `json:"no_update"` // Consecutive checks with no change // Publishing to PDS - PublishStatus string `json:"publish_status"` // "held", "pass", "fail" + PublishStatus string `json:"publish_status"` // "held", "pass", "deny" PublishAccount string `json:"publish_account,omitempty"` // e.g., "news.ycombinator.com.1440.news" } -// saveFeed stores a feed in SQLite +// saveFeed stores a feed in PostgreSQL func (c *Crawler) saveFeed(feed *Feed) error { // Default publishStatus to "held" if not set publishStatus := feed.PublishStatus @@ -183,77 +185,77 @@ func (c *Crawler) saveFeed(feed *Feed) error { _, err := c.db.Exec(` INSERT INTO feeds ( - url, type, category, title, description, language, siteUrl, - discoveredAt, lastCrawledAt, nextCrawlAt, lastBuildDate, - etag, lastModified, - ttlMinutes, updatePeriod, updateFreq, - status, errorCount, lastError, lastErrorAt, - sourceUrl, sourceHost, tld, - itemCount, avgPostFreqHrs, oldestItemDate, newestItemDate, - noUpdate, - publishStatus, publishAccount - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + url, type, category, title, description, language, site_url, + discovered_at, last_crawled_at, next_crawl_at, last_build_date, + etag, last_modified, + ttl_minutes, update_period, update_freq, + status, error_count, last_error, last_error_at, + source_url, source_host, tld, + item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date, + no_update, + publish_status, publish_account + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30) ON CONFLICT(url) DO UPDATE SET - type = excluded.type, - category = excluded.category, - title = excluded.title, - description = excluded.description, - language = excluded.language, - siteUrl = excluded.siteUrl, - lastCrawledAt = excluded.lastCrawledAt, - nextCrawlAt = excluded.nextCrawlAt, - lastBuildDate = excluded.lastBuildDate, - etag = excluded.etag, - lastModified = excluded.lastModified, - ttlMinutes = excluded.ttlMinutes, - updatePeriod = excluded.updatePeriod, - updateFreq = excluded.updateFreq, - status = excluded.status, - errorCount = excluded.errorCount, - lastError = excluded.lastError, - lastErrorAt = excluded.lastErrorAt, - itemCount = excluded.itemCount, - avgPostFreqHrs = excluded.avgPostFreqHrs, - oldestItemDate = excluded.oldestItemDate, - newestItemDate = excluded.newestItemDate, - noUpdate = excluded.noUpdate, - publishStatus = excluded.publishStatus, - publishAccount = excluded.publishAccount + type = EXCLUDED.type, + category = EXCLUDED.category, + title = EXCLUDED.title, + description = EXCLUDED.description, + language = EXCLUDED.language, + site_url = EXCLUDED.site_url, + last_crawled_at = EXCLUDED.last_crawled_at, + next_crawl_at = EXCLUDED.next_crawl_at, + last_build_date = EXCLUDED.last_build_date, + etag = EXCLUDED.etag, + last_modified = EXCLUDED.last_modified, + ttl_minutes = EXCLUDED.ttl_minutes, + update_period = EXCLUDED.update_period, + update_freq = EXCLUDED.update_freq, + status = EXCLUDED.status, + error_count = EXCLUDED.error_count, + last_error = EXCLUDED.last_error, + last_error_at = EXCLUDED.last_error_at, + item_count = EXCLUDED.item_count, + avg_post_freq_hrs = EXCLUDED.avg_post_freq_hrs, + oldest_item_date = EXCLUDED.oldest_item_date, + newest_item_date = EXCLUDED.newest_item_date, + no_update = EXCLUDED.no_update, + publish_status = EXCLUDED.publish_status, + publish_account = EXCLUDED.publish_account `, - feed.URL, feed.Type, feed.Category, nullString(feed.Title), nullString(feed.Description), - nullString(feed.Language), nullString(feed.SiteURL), - feed.DiscoveredAt, nullTime(feed.LastCrawledAt), nullTime(feed.NextCrawlAt), nullTime(feed.LastBuildDate), - nullString(feed.ETag), nullString(feed.LastModified), - feed.TTLMinutes, nullString(feed.UpdatePeriod), feed.UpdateFreq, - feed.Status, feed.ErrorCount, nullString(feed.LastError), nullTime(feed.LastErrorAt), - nullString(feed.SourceURL), nullString(feed.SourceHost), nullString(feed.TLD), - feed.ItemCount, feed.AvgPostFreqHrs, nullTime(feed.OldestItemDate), nullTime(feed.NewestItemDate), + feed.URL, feed.Type, feed.Category, NullableString(feed.Title), NullableString(feed.Description), + NullableString(feed.Language), NullableString(feed.SiteURL), + feed.DiscoveredAt, NullableTime(feed.LastCrawledAt), NullableTime(feed.NextCrawlAt), NullableTime(feed.LastBuildDate), + NullableString(feed.ETag), NullableString(feed.LastModified), + feed.TTLMinutes, NullableString(feed.UpdatePeriod), feed.UpdateFreq, + feed.Status, feed.ErrorCount, NullableString(feed.LastError), NullableTime(feed.LastErrorAt), + NullableString(feed.SourceURL), NullableString(feed.SourceHost), NullableString(feed.TLD), + feed.ItemCount, feed.AvgPostFreqHrs, NullableTime(feed.OldestItemDate), NullableTime(feed.NewestItemDate), feed.NoUpdate, - publishStatus, nullString(feed.PublishAccount), + publishStatus, NullableString(feed.PublishAccount), ) return err } -// getFeed retrieves a feed from SQLite +// getFeed retrieves a feed from PostgreSQL func (c *Crawler) getFeed(feedURL string) (*Feed, error) { feed := &Feed{} - var category, title, description, language, siteURL sql.NullString - var lastCrawledAt, nextCrawlAt, lastBuildDate, lastErrorAt, oldestItemDate, newestItemDate sql.NullTime - var etag, lastModified, updatePeriod, lastError, sourceURL, sourceHost, tld sql.NullString - var avgPostFreqHrs sql.NullFloat64 - var publishStatus, publishAccount sql.NullString + var category, title, description, language, siteURL *string + var lastCrawledAt, nextCrawlAt, lastBuildDate, lastErrorAt, oldestItemDate, newestItemDate *time.Time + var etag, lastModified, updatePeriod, lastError, sourceURL, sourceHost, tld *string + var avgPostFreqHrs *float64 + var publishStatus, publishAccount *string err := c.db.QueryRow(` - SELECT url, type, category, title, description, language, siteUrl, - discoveredAt, lastCrawledAt, nextCrawlAt, lastBuildDate, - etag, lastModified, - ttlMinutes, updatePeriod, updateFreq, - status, errorCount, lastError, lastErrorAt, - sourceUrl, sourceHost, tld, - itemCount, avgPostFreqHrs, oldestItemDate, newestItemDate, - noUpdate, - publishStatus, publishAccount - FROM feeds WHERE url = ? + SELECT url, type, category, title, description, language, site_url, + discovered_at, last_crawled_at, next_crawl_at, last_build_date, + etag, last_modified, + ttl_minutes, update_period, update_freq, + status, error_count, last_error, last_error_at, + source_url, source_host, tld, + item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date, + no_update, + publish_status, publish_account + FROM feeds WHERE url = $1 `, normalizeURL(feedURL)).Scan( &feed.URL, &feed.Type, &category, &title, &description, &language, &siteURL, &feed.DiscoveredAt, &lastCrawledAt, &nextCrawlAt, &lastBuildDate, @@ -266,7 +268,7 @@ func (c *Crawler) getFeed(feedURL string) (*Feed, error) { &publishStatus, &publishAccount, ) - if err == sql.ErrNoRows { + if err == pgx.ErrNoRows { return nil, nil } if err != nil { @@ -274,73 +276,37 @@ func (c *Crawler) getFeed(feedURL string) (*Feed, error) { } // Handle nullable fields - if category.Valid { - feed.Category = category.String + if category != nil { + feed.Category = *category } else { - feed.Category = "main" // Default + feed.Category = "main" } - if title.Valid { - feed.Title = title.String + feed.Title = StringValue(title) + feed.Description = StringValue(description) + feed.Language = StringValue(language) + feed.SiteURL = StringValue(siteURL) + feed.LastCrawledAt = TimeValue(lastCrawledAt) + feed.NextCrawlAt = TimeValue(nextCrawlAt) + feed.LastBuildDate = TimeValue(lastBuildDate) + feed.ETag = StringValue(etag) + feed.LastModified = StringValue(lastModified) + feed.UpdatePeriod = StringValue(updatePeriod) + feed.LastError = StringValue(lastError) + feed.LastErrorAt = TimeValue(lastErrorAt) + feed.SourceURL = StringValue(sourceURL) + feed.SourceHost = StringValue(sourceHost) + feed.TLD = StringValue(tld) + if avgPostFreqHrs != nil { + feed.AvgPostFreqHrs = *avgPostFreqHrs } - if description.Valid { - feed.Description = description.String - } - if language.Valid { - feed.Language = language.String - } - if siteURL.Valid { - feed.SiteURL = siteURL.String - } - if lastCrawledAt.Valid { - feed.LastCrawledAt = lastCrawledAt.Time - } - if nextCrawlAt.Valid { - feed.NextCrawlAt = nextCrawlAt.Time - } - if lastBuildDate.Valid { - feed.LastBuildDate = lastBuildDate.Time - } - if etag.Valid { - feed.ETag = etag.String - } - if lastModified.Valid { - feed.LastModified = lastModified.String - } - if updatePeriod.Valid { - feed.UpdatePeriod = updatePeriod.String - } - if lastError.Valid { - feed.LastError = lastError.String - } - if lastErrorAt.Valid { - feed.LastErrorAt = lastErrorAt.Time - } - if sourceURL.Valid { - feed.SourceURL = sourceURL.String - } - if sourceHost.Valid { - feed.SourceHost = sourceHost.String - } - if tld.Valid { - feed.TLD = tld.String - } - if avgPostFreqHrs.Valid { - feed.AvgPostFreqHrs = avgPostFreqHrs.Float64 - } - if oldestItemDate.Valid { - feed.OldestItemDate = oldestItemDate.Time - } - if newestItemDate.Valid { - feed.NewestItemDate = newestItemDate.Time - } - if publishStatus.Valid { - feed.PublishStatus = publishStatus.String + feed.OldestItemDate = TimeValue(oldestItemDate) + feed.NewestItemDate = TimeValue(newestItemDate) + if publishStatus != nil { + feed.PublishStatus = *publishStatus } else { feed.PublishStatus = "held" } - if publishAccount.Valid { - feed.PublishAccount = publishAccount.String - } + feed.PublishAccount = StringValue(publishAccount) return feed, nil } @@ -348,22 +314,22 @@ func (c *Crawler) getFeed(feedURL string) (*Feed, error) { // feedExists checks if a feed URL already exists in the database func (c *Crawler) feedExists(feedURL string) bool { var exists bool - err := c.db.QueryRow("SELECT EXISTS(SELECT 1 FROM feeds WHERE url = ?)", normalizeURL(feedURL)).Scan(&exists) + err := c.db.QueryRow("SELECT EXISTS(SELECT 1 FROM feeds WHERE url = $1)", normalizeURL(feedURL)).Scan(&exists) return err == nil && exists } // GetAllFeeds returns all feeds from the database func (c *Crawler) GetAllFeeds() ([]*Feed, error) { rows, err := c.db.Query(` - SELECT url, type, category, title, description, language, siteUrl, - discoveredAt, lastCrawledAt, nextCrawlAt, lastBuildDate, - etag, lastModified, - ttlMinutes, updatePeriod, updateFreq, - status, errorCount, lastError, lastErrorAt, - sourceUrl, sourceHost, tld, - itemCount, avgPostFreqHrs, oldestItemDate, newestItemDate, - noUpdate, - publishStatus, publishAccount + SELECT url, type, category, title, description, language, site_url, + discovered_at, last_crawled_at, next_crawl_at, last_build_date, + etag, last_modified, + ttl_minutes, update_period, update_freq, + status, error_count, last_error, last_error_at, + source_url, source_host, tld, + item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date, + no_update, + publish_status, publish_account FROM feeds `) if err != nil { @@ -384,26 +350,26 @@ func (c *Crawler) GetFeedCount() (int, error) { // GetFeedCountByHost returns the number of feeds for a specific host func (c *Crawler) GetFeedCountByHost(host string) (int, error) { var count int - err := c.db.QueryRow("SELECT COUNT(*) FROM feeds WHERE sourceHost = ?", host).Scan(&count) + err := c.db.QueryRow("SELECT COUNT(*) FROM feeds WHERE source_host = $1", host).Scan(&count) return count, err } -// GetFeedsDueForCheck returns feeds where nextCrawlAt <= now, ordered randomly, limited to n +// GetFeedsDueForCheck returns feeds where next_crawl_at <= now, ordered randomly, limited to n func (c *Crawler) GetFeedsDueForCheck(limit int) ([]*Feed, error) { rows, err := c.db.Query(` - SELECT url, type, category, title, description, language, siteUrl, - discoveredAt, lastCrawledAt, nextCrawlAt, lastBuildDate, - etag, lastModified, - ttlMinutes, updatePeriod, updateFreq, - status, errorCount, lastError, lastErrorAt, - sourceUrl, sourceHost, tld, - itemCount, avgPostFreqHrs, oldestItemDate, newestItemDate, - noUpdate, - publishStatus, publishAccount + SELECT url, type, category, title, description, language, site_url, + discovered_at, last_crawled_at, next_crawl_at, last_build_date, + etag, last_modified, + ttl_minutes, update_period, update_freq, + status, error_count, last_error, last_error_at, + source_url, source_host, tld, + item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date, + no_update, + publish_status, publish_account FROM feeds - WHERE nextCrawlAt <= datetime('now') AND status != 'dead' + WHERE next_crawl_at <= NOW() AND status != 'dead' ORDER BY RANDOM() - LIMIT ? + LIMIT $1 `, limit) if err != nil { return nil, err @@ -416,16 +382,16 @@ func (c *Crawler) GetFeedsDueForCheck(limit int) ([]*Feed, error) { // GetFeedsByHost returns all feeds from a specific host func (c *Crawler) GetFeedsByHost(host string) ([]*Feed, error) { rows, err := c.db.Query(` - SELECT url, type, category, title, description, language, siteUrl, - discoveredAt, lastCrawledAt, nextCrawlAt, lastBuildDate, - etag, lastModified, - ttlMinutes, updatePeriod, updateFreq, - status, errorCount, lastError, lastErrorAt, - sourceUrl, sourceHost, tld, - itemCount, avgPostFreqHrs, oldestItemDate, newestItemDate, - noUpdate, - publishStatus, publishAccount - FROM feeds WHERE sourceHost = ? + SELECT url, type, category, title, description, language, site_url, + discovered_at, last_crawled_at, next_crawl_at, last_build_date, + etag, last_modified, + ttl_minutes, update_period, update_freq, + status, error_count, last_error, last_error_at, + source_url, source_host, tld, + item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date, + no_update, + publish_status, publish_account + FROM feeds WHERE source_host = $1 `, host) if err != nil { return nil, err @@ -437,20 +403,21 @@ func (c *Crawler) GetFeedsByHost(host string) ([]*Feed, error) { // SearchFeeds performs a full-text search on feeds func (c *Crawler) SearchFeeds(query string) ([]*Feed, error) { + tsquery := ToSearchQuery(query) rows, err := c.db.Query(` - SELECT f.url, f.type, f.category, f.title, f.description, f.language, f.siteUrl, - f.discoveredAt, f.lastCrawledAt, f.nextCrawlAt, f.lastBuildDate, - f.etag, f.lastModified, - f.ttlMinutes, f.updatePeriod, f.updateFreq, - f.status, f.errorCount, f.lastError, f.lastErrorAt, - f.sourceUrl, f.sourceHost, f.tld, - f.itemCount, f.avgPostFreqHrs, f.oldestItemDate, f.newestItemDate, - f.noUpdate, - f.publishEnabled, f.publishAccount - FROM feeds f - JOIN feeds_fts fts ON f.rowid = fts.rowid - WHERE feeds_fts MATCH ? - `, query) + SELECT url, type, category, title, description, language, site_url, + discovered_at, last_crawled_at, next_crawl_at, last_build_date, + etag, last_modified, + ttl_minutes, update_period, update_freq, + status, error_count, last_error, last_error_at, + source_url, source_host, tld, + item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date, + no_update, + publish_status, publish_account + FROM feeds + WHERE search_vector @@ to_tsquery('english', $1) + ORDER BY ts_rank(search_vector, to_tsquery('english', $1)) DESC + `, tsquery) if err != nil { return nil, err } @@ -460,99 +427,82 @@ func (c *Crawler) SearchFeeds(query string) ([]*Feed, error) { } // scanFeeds is a helper to scan multiple feed rows -func scanFeeds(rows *sql.Rows) ([]*Feed, error) { +func scanFeeds(rows pgx.Rows) ([]*Feed, error) { var feeds []*Feed for rows.Next() { feed := &Feed{} - var category, title, description, language, siteURL sql.NullString - var lastCrawledAt, nextCrawlAt, lastBuildDate, lastErrorAt, oldestItemDate, newestItemDate sql.NullTime - var etag, lastModified, updatePeriod, lastError, sourceURL, sourceHost, tld sql.NullString - var avgPostFreqHrs sql.NullFloat64 - var publishStatus, publishAccount sql.NullString + var feedType, category, title, description, language, siteURL *string + var lastCrawledAt, nextCrawlAt, lastBuildDate, lastErrorAt, oldestItemDate, newestItemDate *time.Time + var etag, lastModified, updatePeriod, lastError, sourceURL, sourceHost, tld *string + var ttlMinutes, updateFreq, errorCount, itemCount, noUpdate *int + var avgPostFreqHrs *float64 + var status *string + var publishStatus, publishAccount *string if err := rows.Scan( - &feed.URL, &feed.Type, &category, &title, &description, &language, &siteURL, + &feed.URL, &feedType, &category, &title, &description, &language, &siteURL, &feed.DiscoveredAt, &lastCrawledAt, &nextCrawlAt, &lastBuildDate, &etag, &lastModified, - &feed.TTLMinutes, &updatePeriod, &feed.UpdateFreq, - &feed.Status, &feed.ErrorCount, &lastError, &lastErrorAt, + &ttlMinutes, &updatePeriod, &updateFreq, + &status, &errorCount, &lastError, &lastErrorAt, &sourceURL, &sourceHost, &tld, - &feed.ItemCount, &avgPostFreqHrs, &oldestItemDate, &newestItemDate, - &feed.NoUpdate, + &itemCount, &avgPostFreqHrs, &oldestItemDate, &newestItemDate, + &noUpdate, &publishStatus, &publishAccount, ); err != nil { - continue + return nil, err } // Handle nullable fields - if category.Valid { - feed.Category = category.String + feed.Type = StringValue(feedType) + if category != nil && *category != "" { + feed.Category = *category } else { feed.Category = "main" } - if title.Valid { - feed.Title = title.String + feed.Title = StringValue(title) + feed.Description = StringValue(description) + feed.Language = StringValue(language) + feed.SiteURL = StringValue(siteURL) + feed.LastCrawledAt = TimeValue(lastCrawledAt) + feed.NextCrawlAt = TimeValue(nextCrawlAt) + feed.LastBuildDate = TimeValue(lastBuildDate) + feed.ETag = StringValue(etag) + feed.LastModified = StringValue(lastModified) + if ttlMinutes != nil { + feed.TTLMinutes = *ttlMinutes } - if description.Valid { - feed.Description = description.String + feed.UpdatePeriod = StringValue(updatePeriod) + if updateFreq != nil { + feed.UpdateFreq = *updateFreq } - if language.Valid { - feed.Language = language.String + feed.Status = StringValue(status) + if errorCount != nil { + feed.ErrorCount = *errorCount } - if siteURL.Valid { - feed.SiteURL = siteURL.String + feed.LastError = StringValue(lastError) + feed.LastErrorAt = TimeValue(lastErrorAt) + feed.SourceURL = StringValue(sourceURL) + feed.SourceHost = StringValue(sourceHost) + feed.TLD = StringValue(tld) + if itemCount != nil { + feed.ItemCount = *itemCount } - if lastCrawledAt.Valid { - feed.LastCrawledAt = lastCrawledAt.Time + if avgPostFreqHrs != nil { + feed.AvgPostFreqHrs = *avgPostFreqHrs } - if nextCrawlAt.Valid { - feed.NextCrawlAt = nextCrawlAt.Time + feed.OldestItemDate = TimeValue(oldestItemDate) + feed.NewestItemDate = TimeValue(newestItemDate) + if noUpdate != nil { + feed.NoUpdate = *noUpdate } - if lastBuildDate.Valid { - feed.LastBuildDate = lastBuildDate.Time - } - if etag.Valid { - feed.ETag = etag.String - } - if lastModified.Valid { - feed.LastModified = lastModified.String - } - if updatePeriod.Valid { - feed.UpdatePeriod = updatePeriod.String - } - if lastError.Valid { - feed.LastError = lastError.String - } - if lastErrorAt.Valid { - feed.LastErrorAt = lastErrorAt.Time - } - if sourceURL.Valid { - feed.SourceURL = sourceURL.String - } - if sourceHost.Valid { - feed.SourceHost = sourceHost.String - } - if tld.Valid { - feed.TLD = tld.String - } - if avgPostFreqHrs.Valid { - feed.AvgPostFreqHrs = avgPostFreqHrs.Float64 - } - if oldestItemDate.Valid { - feed.OldestItemDate = oldestItemDate.Time - } - if newestItemDate.Valid { - feed.NewestItemDate = newestItemDate.Time - } - if publishStatus.Valid { - feed.PublishStatus = publishStatus.String + if publishStatus != nil { + feed.PublishStatus = *publishStatus } else { feed.PublishStatus = "held" } - if publishAccount.Valid { - feed.PublishAccount = publishAccount.String - } + feed.PublishAccount = StringValue(publishAccount) feeds = append(feeds, feed) } @@ -560,45 +510,48 @@ func scanFeeds(rows *sql.Rows) ([]*Feed, error) { return feeds, rows.Err() } -// saveItem stores an item in SQLite (upsert by feedUrl + guid) +// saveItem stores an item in PostgreSQL (upsert by feed_url + guid) func (c *Crawler) saveItem(item *Item) error { // Serialize enclosure fields - var enclosureUrl, enclosureType sql.NullString - var enclosureLength sql.NullInt64 + var enclosureUrl, enclosureType *string + var enclosureLength *int64 if item.Enclosure != nil { - enclosureUrl = sql.NullString{String: item.Enclosure.URL, Valid: item.Enclosure.URL != ""} - enclosureType = sql.NullString{String: item.Enclosure.Type, Valid: item.Enclosure.Type != ""} - enclosureLength = sql.NullInt64{Int64: item.Enclosure.Length, Valid: item.Enclosure.Length > 0} + enclosureUrl = NullableString(item.Enclosure.URL) + enclosureType = NullableString(item.Enclosure.Type) + if item.Enclosure.Length > 0 { + enclosureLength = &item.Enclosure.Length + } } // Serialize imageUrls as JSON - var imageUrlsJSON sql.NullString + var imageUrlsJSON *string if len(item.ImageURLs) > 0 { if data, err := json.Marshal(item.ImageURLs); err == nil { - imageUrlsJSON = sql.NullString{String: string(data), Valid: true} + s := string(data) + imageUrlsJSON = &s } } _, err := c.db.Exec(` - INSERT INTO items (feedUrl, guid, title, link, description, content, author, pubDate, discoveredAt, updatedAt, - enclosureUrl, enclosureType, enclosureLength, imageUrls) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(feedUrl, guid) DO UPDATE SET - title = excluded.title, - link = excluded.link, - description = excluded.description, - content = excluded.content, - author = excluded.author, - pubDate = excluded.pubDate, - updatedAt = excluded.updatedAt, - enclosureUrl = excluded.enclosureUrl, - enclosureType = excluded.enclosureType, - enclosureLength = excluded.enclosureLength, - imageUrls = excluded.imageUrls + INSERT INTO items (feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at, + enclosure_url, enclosure_type, enclosure_length, image_urls) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) + ON CONFLICT(feed_url, guid) DO UPDATE SET + title = EXCLUDED.title, + link = EXCLUDED.link, + description = EXCLUDED.description, + content = EXCLUDED.content, + author = EXCLUDED.author, + pub_date = EXCLUDED.pub_date, + updated_at = EXCLUDED.updated_at, + enclosure_url = EXCLUDED.enclosure_url, + enclosure_type = EXCLUDED.enclosure_type, + enclosure_length = EXCLUDED.enclosure_length, + image_urls = EXCLUDED.image_urls `, - item.FeedURL, item.GUID, nullString(item.Title), nullString(item.Link), - nullString(item.Description), nullString(item.Content), nullString(item.Author), - nullTime(item.PubDate), item.DiscoveredAt, nullTime(item.UpdatedAt), + item.FeedURL, item.GUID, NullableString(item.Title), NullableString(item.Link), + NullableString(item.Description), NullableString(item.Content), NullableString(item.Author), + NullableTime(item.PubDate), item.DiscoveredAt, NullableTime(item.UpdatedAt), enclosureUrl, enclosureType, enclosureLength, imageUrlsJSON, ) return err @@ -614,29 +567,7 @@ func (c *Crawler) saveItems(items []*Item) error { if err != nil { return err } - defer tx.Rollback() - - stmt, err := tx.Prepare(` - INSERT INTO items (feedUrl, guid, title, link, description, content, author, pubDate, discoveredAt, updatedAt, - enclosureUrl, enclosureType, enclosureLength, imageUrls) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - ON CONFLICT(feedUrl, guid) DO UPDATE SET - title = excluded.title, - link = excluded.link, - description = excluded.description, - content = excluded.content, - author = excluded.author, - pubDate = excluded.pubDate, - updatedAt = excluded.updatedAt, - enclosureUrl = excluded.enclosureUrl, - enclosureType = excluded.enclosureType, - enclosureLength = excluded.enclosureLength, - imageUrls = excluded.imageUrls - `) - if err != nil { - return err - } - defer stmt.Close() + defer tx.Rollback(context.Background()) for _, item := range items { if item == nil || item.GUID == "" { @@ -644,26 +575,45 @@ func (c *Crawler) saveItems(items []*Item) error { } // Serialize enclosure fields - var enclosureUrl, enclosureType sql.NullString - var enclosureLength sql.NullInt64 + var enclosureUrl, enclosureType *string + var enclosureLength *int64 if item.Enclosure != nil { - enclosureUrl = sql.NullString{String: item.Enclosure.URL, Valid: item.Enclosure.URL != ""} - enclosureType = sql.NullString{String: item.Enclosure.Type, Valid: item.Enclosure.Type != ""} - enclosureLength = sql.NullInt64{Int64: item.Enclosure.Length, Valid: item.Enclosure.Length > 0} - } - - // Serialize imageUrls as JSON - var imageUrlsJSON sql.NullString - if len(item.ImageURLs) > 0 { - if data, err := json.Marshal(item.ImageURLs); err == nil { - imageUrlsJSON = sql.NullString{String: string(data), Valid: true} + enclosureUrl = NullableString(item.Enclosure.URL) + enclosureType = NullableString(item.Enclosure.Type) + if item.Enclosure.Length > 0 { + enclosureLength = &item.Enclosure.Length } } - _, err := stmt.Exec( - item.FeedURL, item.GUID, nullString(item.Title), nullString(item.Link), - nullString(item.Description), nullString(item.Content), nullString(item.Author), - nullTime(item.PubDate), item.DiscoveredAt, nullTime(item.UpdatedAt), + // Serialize imageUrls as JSON + var imageUrlsJSON *string + if len(item.ImageURLs) > 0 { + if data, err := json.Marshal(item.ImageURLs); err == nil { + s := string(data) + imageUrlsJSON = &s + } + } + + _, err := tx.Exec(context.Background(), ` + INSERT INTO items (feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at, + enclosure_url, enclosure_type, enclosure_length, image_urls) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) + ON CONFLICT(feed_url, guid) DO UPDATE SET + title = EXCLUDED.title, + link = EXCLUDED.link, + description = EXCLUDED.description, + content = EXCLUDED.content, + author = EXCLUDED.author, + pub_date = EXCLUDED.pub_date, + updated_at = EXCLUDED.updated_at, + enclosure_url = EXCLUDED.enclosure_url, + enclosure_type = EXCLUDED.enclosure_type, + enclosure_length = EXCLUDED.enclosure_length, + image_urls = EXCLUDED.image_urls + `, + item.FeedURL, item.GUID, NullableString(item.Title), NullableString(item.Link), + NullableString(item.Description), NullableString(item.Content), NullableString(item.Author), + NullableTime(item.PubDate), item.DiscoveredAt, NullableTime(item.UpdatedAt), enclosureUrl, enclosureType, enclosureLength, imageUrlsJSON, ) if err != nil { @@ -671,19 +621,19 @@ func (c *Crawler) saveItems(items []*Item) error { } } - return tx.Commit() + return tx.Commit(context.Background()) } // GetItemsByFeed returns all items for a specific feed func (c *Crawler) GetItemsByFeed(feedURL string, limit int) ([]*Item, error) { rows, err := c.db.Query(` - SELECT id, feedUrl, guid, title, link, description, content, author, pubDate, discoveredAt, updatedAt, - enclosureUrl, enclosureType, enclosureLength, imageUrls, - publishedAt, publishedUri + SELECT id, feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at, + enclosure_url, enclosure_type, enclosure_length, image_urls, + published_at, published_uri FROM items - WHERE feedUrl = ? - ORDER BY pubDate DESC - LIMIT ? + WHERE feed_url = $1 + ORDER BY pub_date DESC + LIMIT $2 `, feedURL, limit) if err != nil { return nil, err @@ -695,16 +645,16 @@ func (c *Crawler) GetItemsByFeed(feedURL string, limit int) ([]*Item, error) { // SearchItems performs a full-text search on items func (c *Crawler) SearchItems(query string, limit int) ([]*Item, error) { + tsquery := ToSearchQuery(query) rows, err := c.db.Query(` - SELECT i.id, i.feedUrl, i.guid, i.title, i.link, i.description, i.content, i.author, i.pubDate, i.discoveredAt, i.updatedAt, - i.enclosureUrl, i.enclosureType, i.enclosureLength, i.imageUrls, - i.publishedAt, i.publishedUri - FROM items i - JOIN items_fts fts ON i.id = fts.rowid - WHERE items_fts MATCH ? - ORDER BY i.pubDate DESC - LIMIT ? - `, query, limit) + SELECT id, feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at, + enclosure_url, enclosure_type, enclosure_length, image_urls, + published_at, published_uri + FROM items + WHERE search_vector @@ to_tsquery('english', $1) + ORDER BY ts_rank(search_vector, to_tsquery('english', $1)) DESC, pub_date DESC + LIMIT $2 + `, tsquery, limit) if err != nil { return nil, err } @@ -714,16 +664,16 @@ func (c *Crawler) SearchItems(query string, limit int) ([]*Item, error) { } // scanItems is a helper to scan multiple item rows -func scanItems(rows *sql.Rows) ([]*Item, error) { +func scanItems(rows pgx.Rows) ([]*Item, error) { var items []*Item for rows.Next() { item := &Item{} - var guid, title, link, description, content, author sql.NullString - var pubDate, updatedAt, publishedAt sql.NullTime - var enclosureUrl, enclosureType sql.NullString - var enclosureLength sql.NullInt64 - var imageUrlsJSON sql.NullString - var publishedUri sql.NullString + var guid, title, link, description, content, author *string + var pubDate, updatedAt, publishedAt *time.Time + var enclosureUrl, enclosureType *string + var enclosureLength *int64 + var imageUrlsJSON *string + var publishedUri *string if err := rows.Scan( &item.ID, &item.FeedURL, &guid, &title, &link, @@ -735,56 +685,36 @@ func scanItems(rows *sql.Rows) ([]*Item, error) { continue } - if guid.Valid { - item.GUID = guid.String - } - if title.Valid { - item.Title = title.String - } - if link.Valid { - item.Link = link.String - } - if description.Valid { - item.Description = description.String - } - if content.Valid { - item.Content = content.String - } - if author.Valid { - item.Author = author.String - } - if pubDate.Valid { - item.PubDate = pubDate.Time - } - if updatedAt.Valid { - item.UpdatedAt = updatedAt.Time - } + item.GUID = StringValue(guid) + item.Title = StringValue(title) + item.Link = StringValue(link) + item.Description = StringValue(description) + item.Content = StringValue(content) + item.Author = StringValue(author) + item.PubDate = TimeValue(pubDate) + item.UpdatedAt = TimeValue(updatedAt) // Parse enclosure - if enclosureUrl.Valid && enclosureUrl.String != "" { + if enclosureUrl != nil && *enclosureUrl != "" { item.Enclosure = &Enclosure{ - URL: enclosureUrl.String, - Type: enclosureType.String, + URL: *enclosureUrl, + Type: StringValue(enclosureType), } - if enclosureLength.Valid { - item.Enclosure.Length = enclosureLength.Int64 + if enclosureLength != nil { + item.Enclosure.Length = *enclosureLength } } // Parse imageUrls JSON - if imageUrlsJSON.Valid && imageUrlsJSON.String != "" { + if imageUrlsJSON != nil && *imageUrlsJSON != "" { var urls []string - if err := json.Unmarshal([]byte(imageUrlsJSON.String), &urls); err == nil { + if err := json.Unmarshal([]byte(*imageUrlsJSON), &urls); err == nil { item.ImageURLs = urls } } - if publishedAt.Valid { - item.PublishedAt = publishedAt.Time - } - if publishedUri.Valid { - item.PublishedUri = publishedUri.String - } + item.PublishedAt = TimeValue(publishedAt) + item.PublishedUri = StringValue(publishedUri) items = append(items, item) } @@ -796,12 +726,12 @@ func scanItems(rows *sql.Rows) ([]*Item, error) { func (c *Crawler) CleanupOldItems() (int64, error) { cutoff := time.Now().AddDate(-1, 0, 0) // 12 months ago result, err := c.db.Exec(` - DELETE FROM items WHERE pubDate < ? AND pubDate IS NOT NULL + DELETE FROM items WHERE pub_date < $1 AND pub_date IS NOT NULL `, cutoff) if err != nil { return 0, err } - return result.RowsAffected() + return result, nil } // processFeed parses and stores a feed with full metadata @@ -1032,7 +962,7 @@ func (c *Crawler) CheckFeed(feed *Feed) (bool, error) { return true, nil } -// SetPublishStatus sets the publish status for a feed ('held', 'pass', 'fail') +// SetPublishStatus sets the publish status for a feed ('held', 'pass', 'deny') // If status is 'pass', the account handle is also set (auto-derived if empty) func (c *Crawler) SetPublishStatus(feedURL, status, account string) error { feedURL = normalizeURL(feedURL) @@ -1043,25 +973,25 @@ func (c *Crawler) SetPublishStatus(feedURL, status, account string) error { } _, err := c.db.Exec(` - UPDATE feeds SET publishStatus = ?, publishAccount = ? WHERE url = ? - `, status, nullString(account), feedURL) + UPDATE feeds SET publish_status = $1, publish_account = $2 WHERE url = $3 + `, status, NullableString(account), feedURL) return err } // GetFeedsByPublishStatus returns all feeds with a specific publish status func (c *Crawler) GetFeedsByPublishStatus(status string) ([]*Feed, error) { rows, err := c.db.Query(` - SELECT url, type, category, title, description, language, siteUrl, - discoveredAt, lastCrawledAt, nextCrawlAt, lastBuildDate, - etag, lastModified, - ttlMinutes, updatePeriod, updateFreq, - status, errorCount, lastError, lastErrorAt, - sourceUrl, sourceHost, tld, - itemCount, avgPostFreqHrs, oldestItemDate, newestItemDate, - noUpdate, - publishStatus, publishAccount + SELECT url, type, category, title, description, language, site_url, + discovered_at, last_crawled_at, next_crawl_at, last_build_date, + etag, last_modified, + ttl_minutes, update_period, update_freq, + status, error_count, last_error, last_error_at, + source_url, source_host, tld, + item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date, + no_update, + publish_status, publish_account FROM feeds - WHERE publishStatus = ? + WHERE publish_status = $1 `, status) if err != nil { return nil, err @@ -1071,22 +1001,22 @@ func (c *Crawler) GetFeedsByPublishStatus(status string) ([]*Feed, error) { return scanFeeds(rows) } -// GetPublishCandidates returns feeds that are held review and have items +// GetPublishCandidates returns feeds that are held for review and have items func (c *Crawler) GetPublishCandidates(limit int) ([]*Feed, error) { rows, err := c.db.Query(` - SELECT url, type, category, title, description, language, siteUrl, - discoveredAt, lastCrawledAt, nextCrawlAt, lastBuildDate, - etag, lastModified, - ttlMinutes, updatePeriod, updateFreq, - status, errorCount, lastError, lastErrorAt, - sourceUrl, sourceHost, tld, - itemCount, avgPostFreqHrs, oldestItemDate, newestItemDate, - noUpdate, - publishStatus, publishAccount + SELECT url, type, category, title, description, language, site_url, + discovered_at, last_crawled_at, next_crawl_at, last_build_date, + etag, last_modified, + ttl_minutes, update_period, update_freq, + status, error_count, last_error, last_error_at, + source_url, source_host, tld, + item_count, avg_post_freq_hrs, oldest_item_date, newest_item_date, + no_update, + publish_status, publish_account FROM feeds - WHERE publishStatus = 'held' AND itemCount > 0 AND status = 'active' - ORDER BY itemCount DESC - LIMIT ? + WHERE publish_status = 'held' AND item_count > 0 AND status = 'active' + ORDER BY item_count DESC + LIMIT $1 `, limit) if err != nil { return nil, err @@ -1099,13 +1029,13 @@ func (c *Crawler) GetPublishCandidates(limit int) ([]*Feed, error) { // GetUnpublishedItems returns items for a feed that haven't been published yet func (c *Crawler) GetUnpublishedItems(feedURL string, limit int) ([]*Item, error) { rows, err := c.db.Query(` - SELECT id, feedUrl, guid, title, link, description, content, author, pubDate, discoveredAt, updatedAt, - enclosureUrl, enclosureType, enclosureLength, imageUrls, - publishedAt, publishedUri + SELECT id, feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at, + enclosure_url, enclosure_type, enclosure_length, image_urls, + published_at, published_uri FROM items - WHERE feedUrl = ? AND publishedAt IS NULL - ORDER BY pubDate ASC - LIMIT ? + WHERE feed_url = $1 AND published_at IS NULL + ORDER BY pub_date ASC + LIMIT $2 `, feedURL, limit) if err != nil { return nil, err @@ -1118,7 +1048,7 @@ func (c *Crawler) GetUnpublishedItems(feedURL string, limit int) ([]*Item, error // MarkItemPublished marks an item as published with the given URI func (c *Crawler) MarkItemPublished(itemID int64, uri string) error { _, err := c.db.Exec(` - UPDATE items SET publishedAt = datetime('now'), publishedUri = ? WHERE id = ? + UPDATE items SET published_at = NOW(), published_uri = $1 WHERE id = $2 `, uri, itemID) return err } @@ -1127,7 +1057,7 @@ func (c *Crawler) MarkItemPublished(itemID int64, uri string) error { func (c *Crawler) GetUnpublishedItemCount(feedURL string) (int, error) { var count int err := c.db.QueryRow(` - SELECT COUNT(*) FROM items WHERE feedUrl = ? AND publishedAt IS NULL + SELECT COUNT(*) FROM items WHERE feed_url = $1 AND published_at IS NULL `, feedURL).Scan(&count) return count, err } diff --git a/html.go b/html.go index ffcabf1..2fbb0d2 100644 --- a/html.go +++ b/html.go @@ -77,7 +77,11 @@ func (c *Crawler) extractFeedLinks(n *html.Node, baseURL string) []simpleFeed { func (c *Crawler) extractAnchorFeeds(n *html.Node, baseURL string) []simpleFeed { feeds := make([]simpleFeed, 0) - feedPattern := regexp.MustCompile(`(?i)(rss|atom|feed)`) + // Match feed URLs more precisely: + // - /feed, /rss, /atom as path segments (not "feeds" or "feedback") + // - .rss, .atom, .xml file extensions + // - ?feed=, ?format=rss, etc. + feedPattern := regexp.MustCompile(`(?i)(/feed/?$|/feed/|/rss/?$|/rss/|/atom/?$|/atom/|\.rss|\.atom|\.xml|\?.*feed=|\?.*format=rss|\?.*format=atom)`) var f func(*html.Node) f = func(n *html.Node) { diff --git a/main.go b/main.go index b65a7ad..b0ecc3a 100644 --- a/main.go +++ b/main.go @@ -8,13 +8,8 @@ import ( ) func main() { - // Ensure feeds directory exists - if err := os.MkdirAll("feeds", 0755); err != nil { - fmt.Fprintf(os.Stderr, "Error creating feeds directory: %v\n", err) - os.Exit(1) - } - - crawler, err := NewCrawler("feeds/feeds.db") + // Connection string from environment (DATABASE_URL or DB_* vars) + crawler, err := NewCrawler("") if err != nil { fmt.Fprintf(os.Stderr, "Error initializing crawler: %v\n", err) os.Exit(1) @@ -37,8 +32,14 @@ func main() { // Start all loops independently fmt.Println("Starting import, crawl, check, and stats loops...") - // Import loop (background) - go crawler.ImportDomainsInBackground("vertices.txt.gz") + // Import loop (background) - DISABLED for testing, using manual domains + // go crawler.ImportDomainsInBackground("vertices.txt.gz") + + // Add only ycombinator domains for testing + go crawler.ImportTestDomains([]string{ + "news.ycombinator.com", + "ycombinator.com", + }) // Check loop (background) go crawler.StartCheckLoop() @@ -52,6 +53,9 @@ func main() { // Maintenance loop (background) - WAL checkpoints and integrity checks go crawler.StartMaintenanceLoop() + // Publish loop (background) - autopublishes items for approved feeds + go crawler.StartPublishLoop() + // Crawl loop (background) go crawler.StartCrawlLoop() diff --git a/publisher.go b/publisher.go index d62577c..06cde2c 100644 --- a/publisher.go +++ b/publisher.go @@ -3,7 +3,6 @@ package main import ( "bytes" "crypto/sha256" - "encoding/base32" "encoding/json" "fmt" "io" @@ -12,6 +11,7 @@ import ( "regexp" "strings" "time" + "unicode/utf8" ) // Publisher handles posting items to AT Protocol PDS @@ -196,22 +196,41 @@ func (p *Publisher) CreateInviteCode(adminPassword string, useCount int) (string return result.Code, nil } -// GenerateRkey creates a deterministic rkey from a GUID and timestamp -// Uses a truncated base32-encoded SHA256 hash -// Including the timestamp allows regenerating a new rkey by updating discoveredAt +// TID alphabet for base32-sortable encoding +const tidAlphabet = "234567abcdefghijklmnopqrstuvwxyz" + +// GenerateRkey creates a deterministic TID-format rkey from a GUID and timestamp +// TIDs are required by Bluesky relay for indexing - custom rkeys don't sync +// Format: 13 chars base32-sortable, 53 bits timestamp + 10 bits clock ID func GenerateRkey(guid string, timestamp time.Time) string { if guid == "" { return "" } - // Combine GUID with timestamp for the hash input - // Format timestamp to second precision for consistency - input := guid + "|" + timestamp.UTC().Format(time.RFC3339) - hash := sha256.Sum256([]byte(input)) - // Use first 10 bytes (80 bits) - plenty for uniqueness - // Base32 encode without padding, lowercase for rkey compatibility - encoded := base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(hash[:10]) - return strings.ToLower(encoded) + // Get microseconds since Unix epoch (53 bits) + microsInt := timestamp.UnixMicro() + if microsInt < 0 { + microsInt = 0 + } + // Convert to uint64 and mask to 53 bits + micros := uint64(microsInt) & ((1 << 53) - 1) + + // Generate deterministic 10-bit clock ID from GUID hash + hash := sha256.Sum256([]byte(guid)) + clockID := uint64(hash[0])<<2 | uint64(hash[1])>>6 + clockID = clockID & ((1 << 10) - 1) // 10 bits = 0-1023 + + // Combine: top bit 0, 53 bits timestamp, 10 bits clock ID + tid := (micros << 10) | clockID + + // Encode as base32-sortable (13 characters) + var result [13]byte + for i := 12; i >= 0; i-- { + result[i] = tidAlphabet[tid&0x1f] + tid >>= 5 + } + + return string(result[:]) } // extractURLs finds all URLs in a string @@ -239,7 +258,8 @@ func (p *Publisher) PublishItem(session *PDSSession, item *Item) (string, error) return "", fmt.Errorf("item has no GUID or link, cannot publish") } - // Collect all unique URLs: main link + any URLs in description + // Collect URLs: main link + HN comments link (if applicable) + // Limit to 2 URLs max to stay under 300 grapheme limit urlSet := make(map[string]bool) var allURLs []string @@ -249,8 +269,18 @@ func (p *Publisher) PublishItem(session *PDSSession, item *Item) (string, error) allURLs = append(allURLs, item.Link) } - // Add enclosure URL for podcasts/media (audio/video) - if item.Enclosure != nil && item.Enclosure.URL != "" { + // For HN feeds, add comments link from description (looks like "https://news.ycombinator.com/item?id=...") + descURLs := extractURLs(item.Description) + for _, u := range descURLs { + if strings.Contains(u, "news.ycombinator.com/item") && !urlSet[u] { + urlSet[u] = true + allURLs = append(allURLs, u) + break // Only add one comments link + } + } + + // Add enclosure URL for podcasts/media (audio/video) if we have room + if len(allURLs) < 2 && item.Enclosure != nil && item.Enclosure.URL != "" { encType := strings.ToLower(item.Enclosure.Type) if strings.HasPrefix(encType, "audio/") || strings.HasPrefix(encType, "video/") { if !urlSet[item.Enclosure.URL] { @@ -260,59 +290,52 @@ func (p *Publisher) PublishItem(session *PDSSession, item *Item) (string, error) } } - // Extract URLs from description - descURLs := extractURLs(item.Description) - for _, u := range descURLs { - if !urlSet[u] { - urlSet[u] = true - allURLs = append(allURLs, u) - } - } - - // Extract URLs from content if available - contentURLs := extractURLs(item.Content) - for _, u := range contentURLs { - if !urlSet[u] { - urlSet[u] = true - allURLs = append(allURLs, u) - } - } - // Build post text: title + all links - // Bluesky has 300 grapheme limit - var textBuilder strings.Builder - textBuilder.WriteString(item.Title) + // Bluesky has 300 grapheme limit - use rune count as approximation + const maxGraphemes = 295 // Leave some margin + // Calculate space needed for URLs (in runes) + urlSpace := 0 for _, u := range allURLs { - textBuilder.WriteString("\n\n") - textBuilder.WriteString(u) + urlSpace += utf8.RuneCountInString(u) + 2 // +2 for \n\n } - text := textBuilder.String() + // Truncate title if needed + title := item.Title + titleRunes := utf8.RuneCountInString(title) + maxTitleRunes := maxGraphemes - urlSpace - 3 // -3 for "..." - // Truncate title if text is too long (keep URLs intact) - const maxLen = 300 - if len(text) > maxLen { - // Calculate space needed for URLs - urlSpace := 0 - for _, u := range allURLs { - urlSpace += len(u) + 2 // +2 for \n\n - } - - maxTitleLen := maxLen - urlSpace - 3 // -3 for "..." - if maxTitleLen > 10 { - text = item.Title[:maxTitleLen] + "..." - for _, u := range allURLs { - text += "\n\n" + u + if titleRunes+urlSpace > maxGraphemes { + if maxTitleRunes > 10 { + // Truncate title to fit + runes := []rune(title) + if len(runes) > maxTitleRunes { + title = string(runes[:maxTitleRunes]) + "..." + } + } else { + // Title too long even with minimal space - just truncate hard + runes := []rune(title) + if len(runes) > 50 { + title = string(runes[:50]) + "..." } } } - // Use item's pubDate for createdAt, fall back to now - createdAt := time.Now() - if !item.PubDate.IsZero() { - createdAt = item.PubDate + // Build final text + var textBuilder strings.Builder + textBuilder.WriteString(title) + for _, u := range allURLs { + textBuilder.WriteString("\n\n") + textBuilder.WriteString(u) } + text := textBuilder.String() + + // Use current time for createdAt (Bluesky won't index backdated posts) + // TODO: Restore original pubDate once Bluesky indexing is understood + createdAt := time.Now() + // if !item.PubDate.IsZero() { + // createdAt = item.PubDate + // } post := BskyPost{ Type: "app.bsky.feed.post", diff --git a/static/dashboard.js b/static/dashboard.js index 37348d6..2501cb0 100644 --- a/static/dashboard.js +++ b/static/dashboard.js @@ -258,6 +258,7 @@ function initDashboard() { output.innerHTML = html; attachTldHandlers(output.querySelector('.tld-list')); } catch (err) { + console.error('TLDs error:', err); output.innerHTML = '
Error: ' + escapeHtml(err.message) + '
'; } } @@ -301,7 +302,7 @@ function initDashboard() { const result = await response.json(); if (!result.data || result.data.length === 0) { - infiniteScrollState.ended = true; + if (infiniteScrollState) infiniteScrollState.ended = true; document.getElementById('infiniteLoader').textContent = offset === 0 ? 'No results found' : 'End of list'; return; } @@ -319,11 +320,12 @@ function initDashboard() { offset += result.data.length; if (result.data.length < limit) { - infiniteScrollState.ended = true; + if (infiniteScrollState) infiniteScrollState.ended = true; document.getElementById('infiniteLoader').textContent = 'End of list'; } } catch (err) { - document.getElementById('infiniteLoader').textContent = 'Error loading'; + console.error('Filter error:', err); + document.getElementById('infiniteLoader').textContent = 'Error loading: ' + err.message; } } @@ -479,17 +481,26 @@ function initDashboard() { output.innerHTML = '
Loading publish data...
'; try { - const [candidatesRes, passedRes] = await Promise.all([ + const [candidatesRes, passedRes, deniedRes] = await Promise.all([ fetch('/api/publishCandidates?limit=50'), - fetch('/api/publishEnabled') + fetch('/api/publishEnabled'), + fetch('/api/publishDenied') ]); const candidates = await candidatesRes.json(); const passed = await passedRes.json(); + const denied = await deniedRes.json(); let html = '
'; + // Filter buttons + html += '
'; + html += ''; + html += ''; + html += ''; + html += '
'; + // Passed feeds (approved for publishing) - html += '
'; + html += '
'; html += '
✓ Approved for Publishing (' + passed.length + ')
'; if (passed.length === 0) { html += '
No feeds approved yet
'; @@ -501,14 +512,14 @@ function initDashboard() { html += '
' + escapeHtml(f.url) + '
'; html += '
→ ' + escapeHtml(f.account) + ' (' + f.unpublished_count + ' unpublished)
'; html += '
'; - html += ''; + html += ''; html += '
'; }); } html += '
'; // Candidates (held for review) - html += '
'; + html += '
'; html += '
⏳ Held for Review (' + candidates.length + ')
'; if (candidates.length === 0) { html += '
No candidates held
'; @@ -523,7 +534,28 @@ function initDashboard() { html += '
' + escapeHtml(f.source_host) + ' · ' + f.item_count + ' items · ' + escapeHtml(f.category) + '
'; html += '
'; html += ''; - html += ''; + html += ''; + html += '
'; + html += ''; + }); + } + html += ''; + + // Denied feeds + html += ''; output.innerHTML = html; - // Attach handlers for pass/fail buttons + // Filter button handlers + output.querySelectorAll('.filter-btn').forEach(btn => { + btn.addEventListener('click', () => { + const filter = btn.dataset.filter; + document.getElementById('section-pass').style.display = filter === 'pass' ? 'block' : 'none'; + document.getElementById('section-held').style.display = filter === 'held' ? 'block' : 'none'; + document.getElementById('section-deny').style.display = filter === 'deny' ? 'block' : 'none'; + // Update button styles + output.querySelectorAll('.filter-btn').forEach(b => { + b.style.opacity = b.dataset.filter === filter ? '1' : '0.5'; + }); + }); + }); + + // Attach handlers for pass/deny buttons output.querySelectorAll('.status-btn').forEach(btn => { btn.addEventListener('click', async () => { const url = btn.dataset.url;