Migrate from SQLite to PostgreSQL

- Replace modernc.org/sqlite with jackc/pgx/v5
- Update all SQL queries for PostgreSQL syntax ($1, $2 placeholders)
- Use snake_case column names throughout
- Replace SQLite FTS5 with PostgreSQL tsvector/tsquery full-text search
- Add connection pooling with pgxpool
- Support Docker secrets for database password
- Add trigger to normalize feed URLs (strip https://, http://, www.)
- Fix anchor feed detection regex to avoid false positives
- Connect app container to atproto network for PostgreSQL access
- Add version indicator to dashboard UI

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
primal
2026-01-28 20:38:13 -05:00
parent 75835d771d
commit f4afb29980
11 changed files with 1525 additions and 1137 deletions
+48 -14
View File
@@ -11,20 +11,47 @@ go fmt ./... # Format
go vet ./... # Static analysis go vet ./... # Static analysis
``` ```
### Database Setup
Requires PostgreSQL. Start the database first:
```bash
cd ../postgres && docker compose up -d
```
### Environment Variables
Set via environment or create a `.env` file:
```bash
# Database connection (individual vars)
DB_HOST=atproto-postgres # Default: atproto-postgres
DB_PORT=5432 # Default: 5432
DB_USER=news_1440 # Default: news_1440
DB_PASSWORD=<password> # Or use DB_PASSWORD_FILE
DB_NAME=news_1440 # Default: news_1440
# Or use a connection string
DATABASE_URL=postgres://news_1440:password@atproto-postgres:5432/news_1440?sslmode=disable
```
For Docker, use `DB_PASSWORD_FILE=/run/secrets/db_password` with Docker secrets.
Requires `vertices.txt.gz` (Common Crawl domain list) in the working directory. Requires `vertices.txt.gz` (Common Crawl domain list) in the working directory.
## Architecture ## Architecture
Multi-file Go application that crawls websites for RSS/Atom feeds, stores them in SQLite, and provides a web dashboard. Multi-file Go application that crawls websites for RSS/Atom feeds, stores them in PostgreSQL, and provides a web dashboard.
### Concurrent Loops (main.go) ### Concurrent Loops (main.go)
The application runs five independent goroutine loops: The application runs six independent goroutine loops:
- **Import loop** - Reads `vertices.txt.gz` and inserts domains into DB in 10k batches - **Import loop** - Reads `vertices.txt.gz` and inserts domains into DB in 10k batches
- **Crawl loop** - Worker pool processes unchecked domains, discovers feeds - **Crawl loop** - Worker pool processes unchecked domains, discovers feeds
- **Check loop** - Worker pool re-checks known feeds for updates (conditional HTTP) - **Check loop** - Worker pool re-checks known feeds for updates (conditional HTTP)
- **Stats loop** - Updates cached dashboard statistics every minute - **Stats loop** - Updates cached dashboard statistics every minute
- **Cleanup loop** - Removes items older than 12 months (weekly) - **Cleanup loop** - Removes items older than 12 months (weekly)
- **Publish loop** - Autopublishes items from approved feeds to AT Protocol PDS
### File Structure ### File Structure
@@ -36,16 +63,19 @@ The application runs five independent goroutine loops:
| `parser.go` | RSS/Atom XML parsing, date parsing, next-crawl calculation | | `parser.go` | RSS/Atom XML parsing, date parsing, next-crawl calculation |
| `html.go` | HTML parsing: feed link extraction, anchor feed detection | | `html.go` | HTML parsing: feed link extraction, anchor feed detection |
| `util.go` | URL normalization, host utilities, TLD extraction | | `util.go` | URL normalization, host utilities, TLD extraction |
| `db.go` | SQLite schema (domains, feeds, items tables with FTS5) | | `db.go` | PostgreSQL schema (domains, feeds, items tables with tsvector FTS) |
| `dashboard.go` | HTTP server, JSON APIs, HTML template | | `dashboard.go` | HTTP server, JSON APIs, HTML template |
| `publisher.go` | AT Protocol PDS integration for posting items |
### Database Schema ### Database Schema
SQLite with WAL mode at `feeds/feeds.db`: PostgreSQL with pgx driver, using connection pooling:
- **domains** - Hosts to crawl (status: unchecked/checked/error) - **domains** - Hosts to crawl (status: unchecked/checked/error)
- **feeds** - Discovered RSS/Atom feeds with metadata and cache headers - **feeds** - Discovered RSS/Atom feeds with metadata and cache headers
- **items** - Individual feed entries (guid + feedUrl unique) - **items** - Individual feed entries (guid + feed_url unique)
- **feeds_fts / items_fts** - FTS5 virtual tables for search - **search_vector** - GENERATED tsvector columns for full-text search (GIN indexed)
Column naming: snake_case (e.g., `source_host`, `pub_date`, `item_count`)
### Crawl Logic ### Crawl Logic
@@ -53,13 +83,18 @@ SQLite with WAL mode at `feeds/feeds.db`:
2. Try HTTPS, fall back to HTTP 2. Try HTTPS, fall back to HTTP
3. Recursive crawl up to MaxDepth=10, MaxPagesPerHost=10 3. Recursive crawl up to MaxDepth=10, MaxPagesPerHost=10
4. Extract `<link rel="alternate">` and anchor hrefs containing rss/atom/feed 4. Extract `<link rel="alternate">` and anchor hrefs containing rss/atom/feed
5. Parse discovered feeds for metadata, save with nextCrawlAt 5. Parse discovered feeds for metadata, save with next_crawl_at
### Feed Checking ### Feed Checking
Uses conditional HTTP (ETag, If-Modified-Since). Adaptive backoff: base 100s + 100s per consecutive no-change. Respects RSS `<ttl>` and Syndication namespace hints. Uses conditional HTTP (ETag, If-Modified-Since). Adaptive backoff: base 100s + 100s per consecutive no-change. Respects RSS `<ttl>` and Syndication namespace hints.
## AT Protocol Integration (Planned) ### Publishing
Feeds with `publish_status = 'pass'` have their items automatically posted to AT Protocol.
Status values: `held` (default), `pass` (approved), `deny` (rejected).
## AT Protocol Integration
Domain: 1440.news Domain: 1440.news
@@ -68,9 +103,8 @@ User structure:
- `{domain}.1440.news` - Catch-all feed per source (e.g., `wsj.com.1440.news`) - `{domain}.1440.news` - Catch-all feed per source (e.g., `wsj.com.1440.news`)
- `{category}.{domain}.1440.news` - Category-specific feeds (future) - `{category}.{domain}.1440.news` - Category-specific feeds (future)
Phases: PDS configuration in `pds.env`:
1. Local PDS setup ```
2. Account management PDS_HOST=https://pds.1440.news
3. Auto-create domain users PDS_ADMIN_PASSWORD=<admin_password>
4. Post articles to accounts ```
5. Category detection
+233 -45
View File
@@ -1,10 +1,10 @@
package main package main
import ( import (
"database/sql"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"os"
"runtime" "runtime"
"strings" "strings"
"sync" "sync"
@@ -25,17 +25,17 @@ type Crawler struct {
hostsProcessed int32 hostsProcessed int32
feedsChecked int32 feedsChecked int32
startTime time.Time startTime time.Time
db *sql.DB db *DB
displayedCrawlRate int displayedCrawlRate int
displayedCheckRate int displayedCheckRate int
domainsImported int32 domainsImported int32
cachedStats *DashboardStats cachedStats *DashboardStats
cachedAllDomains []DomainStat cachedAllDomains []DomainStat
statsMu sync.RWMutex statsMu sync.RWMutex
} }
func NewCrawler(dbPath string) (*Crawler, error) { func NewCrawler(connString string) (*Crawler, error) {
db, err := OpenDatabase(dbPath) db, err := OpenDatabase(connString)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to open database: %v", err) return nil, fmt.Errorf("failed to open database: %v", err)
} }
@@ -61,12 +61,6 @@ func NewCrawler(dbPath string) (*Crawler, error) {
func (c *Crawler) Close() error { func (c *Crawler) Close() error {
if c.db != nil { if c.db != nil {
// Checkpoint WAL to merge it back into main database before closing
// This prevents corruption if the container is stopped mid-write
fmt.Println("Checkpointing WAL...")
if _, err := c.db.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil {
fmt.Printf("WAL checkpoint warning: %v\n", err)
}
fmt.Println("Closing database...") fmt.Println("Closing database...")
return c.db.Close() return c.db.Close()
} }
@@ -95,53 +89,247 @@ func (c *Crawler) StartCleanupLoop() {
} }
// StartMaintenanceLoop performs periodic database maintenance // StartMaintenanceLoop performs periodic database maintenance
// - WAL checkpoint every 5 minutes to prevent WAL bloat and reduce corruption risk
// - Quick integrity check every hour to detect issues early
// - Hot backup every 24 hours for recovery
func (c *Crawler) StartMaintenanceLoop() { func (c *Crawler) StartMaintenanceLoop() {
checkpointTicker := time.NewTicker(5 * time.Minute) vacuumTicker := time.NewTicker(24 * time.Hour)
integrityTicker := time.NewTicker(1 * time.Hour) analyzeTicker := time.NewTicker(1 * time.Hour)
backupTicker := time.NewTicker(24 * time.Hour) defer vacuumTicker.Stop()
defer checkpointTicker.Stop() defer analyzeTicker.Stop()
defer integrityTicker.Stop()
defer backupTicker.Stop()
for { for {
select { select {
case <-checkpointTicker.C: case <-analyzeTicker.C:
// Passive checkpoint - doesn't block writers // Update statistics for query planner
if _, err := c.db.Exec("PRAGMA wal_checkpoint(PASSIVE)"); err != nil { if _, err := c.db.Exec("ANALYZE"); err != nil {
fmt.Printf("WAL checkpoint error: %v\n", err) fmt.Printf("ANALYZE error: %v\n", err)
} }
case <-integrityTicker.C: case <-vacuumTicker.C:
// Quick check is faster than full integrity_check // Reclaim dead tuple space (VACUUM is lighter than VACUUM FULL)
var result string fmt.Println("Running VACUUM...")
if err := c.db.QueryRow("PRAGMA quick_check").Scan(&result); err != nil { if _, err := c.db.Exec("VACUUM"); err != nil {
fmt.Printf("Integrity check error: %v\n", err) fmt.Printf("VACUUM error: %v\n", err)
} else if result != "ok" { } else {
fmt.Printf("WARNING: Database integrity issue detected: %s\n", result) fmt.Println("VACUUM complete")
} }
case <-backupTicker.C:
c.createBackup()
} }
} }
} }
// createBackup creates a hot backup of the database using SQLite's backup API // StartPublishLoop automatically publishes unpublished items for approved feeds
func (c *Crawler) createBackup() { // Grabs up to 50 items sorted by discovered_at, publishes one per second, then reloops
backupPath := "feeds/feeds.db.backup" func (c *Crawler) StartPublishLoop() {
fmt.Println("Creating database backup...") // Load PDS credentials from environment or pds.env file
pdsHost := os.Getenv("PDS_HOST")
pdsAdminPassword := os.Getenv("PDS_ADMIN_PASSWORD")
// Use SQLite's online backup via VACUUM INTO (available in SQLite 3.27+) if pdsHost == "" || pdsAdminPassword == "" {
// This creates a consistent snapshot without blocking writers if data, err := os.ReadFile("pds.env"); err == nil {
if _, err := c.db.Exec("VACUUM INTO ?", backupPath); err != nil { for _, line := range strings.Split(string(data), "\n") {
fmt.Printf("Backup error: %v\n", err) line = strings.TrimSpace(line)
if strings.HasPrefix(line, "#") || line == "" {
continue
}
parts := strings.SplitN(line, "=", 2)
if len(parts) == 2 {
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
switch key {
case "PDS_HOST":
pdsHost = value
case "PDS_ADMIN_PASSWORD":
pdsAdminPassword = value
}
}
}
}
}
if pdsHost == "" || pdsAdminPassword == "" {
fmt.Println("Publish loop: PDS credentials not configured, skipping")
return return
} }
fmt.Printf("Backup created: %s\n", backupPath) fmt.Printf("Publish loop: starting with PDS %s\n", pdsHost)
feedPassword := "feed1440!"
// Cache sessions per account
sessions := make(map[string]*PDSSession)
publisher := NewPublisher(pdsHost)
for {
// Get up to 50 unpublished items from approved feeds, sorted by discovered_at ASC
items, err := c.GetAllUnpublishedItems(50)
if err != nil {
fmt.Printf("Publish loop error: %v\n", err)
time.Sleep(1 * time.Second)
continue
}
if len(items) == 0 {
time.Sleep(1 * time.Second)
continue
}
// Publish one item per second
for _, item := range items {
// Get or create session for this feed's account
account := c.getAccountForFeed(item.FeedURL)
if account == "" {
time.Sleep(1 * time.Second)
continue
}
session, ok := sessions[account]
if !ok {
// Try to log in
session, err = publisher.CreateSession(account, feedPassword)
if err != nil {
// Account might not exist - try to create it
inviteCode, err := publisher.CreateInviteCode(pdsAdminPassword, 1)
if err != nil {
fmt.Printf("Publish: failed to create invite for %s: %v\n", account, err)
time.Sleep(1 * time.Second)
continue
}
email := account + "@1440.news"
session, err = publisher.CreateAccount(account, email, feedPassword, inviteCode)
if err != nil {
fmt.Printf("Publish: failed to create account %s: %v\n", account, err)
time.Sleep(1 * time.Second)
continue
}
fmt.Printf("Publish: created account %s\n", account)
c.db.Exec("UPDATE feeds SET publish_account = $1 WHERE url = $2", account, item.FeedURL)
// Set up profile for new account
feedInfo := c.getFeedInfo(item.FeedURL)
if feedInfo != nil {
displayName := feedInfo.Title
if displayName == "" {
displayName = account
}
description := feedInfo.Description
if description == "" {
description = "News feed via 1440.news"
}
// Truncate if needed
if len(displayName) > 64 {
displayName = displayName[:61] + "..."
}
if len(description) > 256 {
description = description[:253] + "..."
}
if err := publisher.UpdateProfile(session, displayName, description, nil); err != nil {
fmt.Printf("Publish: failed to set profile for %s: %v\n", account, err)
} else {
fmt.Printf("Publish: set profile for %s\n", account)
}
}
}
sessions[account] = session
}
// Publish the item
uri, err := publisher.PublishItem(session, &item)
if err != nil {
fmt.Printf("Publish: failed item %d: %v\n", item.ID, err)
// Clear session cache on auth errors
if strings.Contains(err.Error(), "401") || strings.Contains(err.Error(), "auth") {
delete(sessions, account)
}
} else {
c.MarkItemPublished(item.ID, uri)
fmt.Printf("Publish: %s -> %s\n", item.Title[:min(40, len(item.Title))], account)
}
time.Sleep(1 * time.Second)
}
time.Sleep(1 * time.Second)
}
}
// getAccountForFeed returns the publish account for a feed URL
func (c *Crawler) getAccountForFeed(feedURL string) string {
var account *string
err := c.db.QueryRow(`
SELECT publish_account FROM feeds
WHERE url = $1 AND publish_status = 'pass' AND status = 'active'
`, feedURL).Scan(&account)
if err != nil || account == nil || *account == "" {
// Derive handle from feed URL
return DeriveHandleFromFeed(feedURL)
}
return *account
}
// FeedInfo holds basic feed metadata for profile setup
type FeedInfo struct {
Title string
Description string
SiteURL string
}
// getFeedInfo returns feed metadata for profile setup
func (c *Crawler) getFeedInfo(feedURL string) *FeedInfo {
var title, description, siteURL *string
err := c.db.QueryRow(`
SELECT title, description, site_url FROM feeds WHERE url = $1
`, feedURL).Scan(&title, &description, &siteURL)
if err != nil {
return nil
}
return &FeedInfo{
Title: StringValue(title),
Description: StringValue(description),
SiteURL: StringValue(siteURL),
}
}
// GetAllUnpublishedItems returns unpublished items from all approved feeds
func (c *Crawler) GetAllUnpublishedItems(limit int) ([]Item, error) {
rows, err := c.db.Query(`
SELECT i.id, i.feed_url, i.guid, i.title, i.link, i.description, i.content,
i.author, i.pub_date, i.discovered_at
FROM items i
JOIN feeds f ON i.feed_url = f.url
WHERE f.publish_status = 'pass'
AND f.status = 'active'
AND i.published_at IS NULL
ORDER BY i.discovered_at ASC
LIMIT $1
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Item
for rows.Next() {
var item Item
var guid, title, link, description, content, author *string
var pubDate, discoveredAt *time.Time
err := rows.Scan(&item.ID, &item.FeedURL, &guid, &title, &link, &description,
&content, &author, &pubDate, &discoveredAt)
if err != nil {
continue
}
item.GUID = StringValue(guid)
item.Title = StringValue(title)
item.Link = StringValue(link)
item.Description = StringValue(description)
item.Content = StringValue(content)
item.Author = StringValue(author)
item.PubDate = TimeValue(pubDate)
item.DiscoveredAt = TimeValue(discoveredAt)
items = append(items, item)
}
return items, nil
} }
// StartCrawlLoop runs the domain crawling loop independently // StartCrawlLoop runs the domain crawling loop independently
+417 -353
View File
File diff suppressed because it is too large Load Diff
+222 -140
View File
@@ -1,27 +1,31 @@
package main package main
import ( import (
"database/sql" "context"
"fmt" "fmt"
"net/url"
"os"
"strings"
"time" "time"
_ "modernc.org/sqlite" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
) )
const schema = ` const schema = `
CREATE TABLE IF NOT EXISTS domains ( CREATE TABLE IF NOT EXISTS domains (
host TEXT PRIMARY KEY, host TEXT PRIMARY KEY,
status TEXT NOT NULL DEFAULT 'unchecked', status TEXT NOT NULL DEFAULT 'unchecked',
discoveredAt DATETIME NOT NULL, discovered_at TIMESTAMPTZ NOT NULL,
lastCrawledAt DATETIME, last_crawled_at TIMESTAMPTZ,
feedsFound INTEGER DEFAULT 0, feeds_found INTEGER DEFAULT 0,
lastError TEXT, last_error TEXT,
tld TEXT tld TEXT
); );
CREATE INDEX IF NOT EXISTS idx_domains_status ON domains(status); CREATE INDEX IF NOT EXISTS idx_domains_status ON domains(status);
CREATE INDEX IF NOT EXISTS idx_domains_tld ON domains(tld); CREATE INDEX IF NOT EXISTS idx_domains_tld ON domains(tld);
CREATE INDEX IF NOT EXISTS idx_domains_feedsFound ON domains(feedsFound DESC) WHERE feedsFound > 0; CREATE INDEX IF NOT EXISTS idx_domains_feeds_found ON domains(feeds_found DESC) WHERE feeds_found > 0;
CREATE TABLE IF NOT EXISTS feeds ( CREATE TABLE IF NOT EXISTS feeds (
url TEXT PRIMARY KEY, url TEXT PRIMARY KEY,
@@ -30,196 +34,195 @@ CREATE TABLE IF NOT EXISTS feeds (
title TEXT, title TEXT,
description TEXT, description TEXT,
language TEXT, language TEXT,
siteUrl TEXT, site_url TEXT,
discoveredAt DATETIME NOT NULL, discovered_at TIMESTAMPTZ NOT NULL,
lastCrawledAt DATETIME, last_crawled_at TIMESTAMPTZ,
nextCrawlAt DATETIME, next_crawl_at TIMESTAMPTZ,
lastBuildDate DATETIME, last_build_date TIMESTAMPTZ,
etag TEXT, etag TEXT,
lastModified TEXT, last_modified TEXT,
ttlMinutes INTEGER, ttl_minutes INTEGER,
updatePeriod TEXT, update_period TEXT,
updateFreq INTEGER, update_freq INTEGER,
status TEXT DEFAULT 'active', status TEXT DEFAULT 'active',
errorCount INTEGER DEFAULT 0, error_count INTEGER DEFAULT 0,
lastError TEXT, last_error TEXT,
lastErrorAt DATETIME, last_error_at TIMESTAMPTZ,
sourceUrl TEXT, source_url TEXT,
sourceHost TEXT, source_host TEXT,
tld TEXT, tld TEXT,
itemCount INTEGER, item_count INTEGER,
avgPostFreqHrs REAL, avg_post_freq_hrs DOUBLE PRECISION,
oldestItemDate DATETIME, oldest_item_date TIMESTAMPTZ,
newestItemDate DATETIME, newest_item_date TIMESTAMPTZ,
noUpdate INTEGER DEFAULT 0, no_update INTEGER DEFAULT 0,
-- Publishing to PDS -- Publishing to PDS
publishStatus TEXT DEFAULT 'held' CHECK(publishStatus IN ('held', 'pass', 'fail')), publish_status TEXT DEFAULT 'held' CHECK(publish_status IN ('held', 'pass', 'deny')),
publishAccount TEXT publish_account TEXT,
-- Full-text search vector
search_vector tsvector GENERATED ALWAYS AS (
setweight(to_tsvector('english', coalesce(title, '')), 'A') ||
setweight(to_tsvector('english', coalesce(description, '')), 'B') ||
setweight(to_tsvector('english', coalesce(url, '')), 'C')
) STORED
); );
CREATE INDEX IF NOT EXISTS idx_feeds_sourceHost ON feeds(sourceHost); CREATE INDEX IF NOT EXISTS idx_feeds_source_host ON feeds(source_host);
CREATE INDEX IF NOT EXISTS idx_feeds_publishStatus ON feeds(publishStatus); CREATE INDEX IF NOT EXISTS idx_feeds_publish_status ON feeds(publish_status);
CREATE INDEX IF NOT EXISTS idx_feeds_sourceHost_url ON feeds(sourceHost, url); CREATE INDEX IF NOT EXISTS idx_feeds_source_host_url ON feeds(source_host, url);
CREATE INDEX IF NOT EXISTS idx_feeds_tld ON feeds(tld); CREATE INDEX IF NOT EXISTS idx_feeds_tld ON feeds(tld);
CREATE INDEX IF NOT EXISTS idx_feeds_tld_sourceHost ON feeds(tld, sourceHost); CREATE INDEX IF NOT EXISTS idx_feeds_tld_source_host ON feeds(tld, source_host);
CREATE INDEX IF NOT EXISTS idx_feeds_type ON feeds(type); CREATE INDEX IF NOT EXISTS idx_feeds_type ON feeds(type);
CREATE INDEX IF NOT EXISTS idx_feeds_category ON feeds(category); CREATE INDEX IF NOT EXISTS idx_feeds_category ON feeds(category);
CREATE INDEX IF NOT EXISTS idx_feeds_status ON feeds(status); CREATE INDEX IF NOT EXISTS idx_feeds_status ON feeds(status);
CREATE INDEX IF NOT EXISTS idx_feeds_discoveredAt ON feeds(discoveredAt); CREATE INDEX IF NOT EXISTS idx_feeds_discovered_at ON feeds(discovered_at);
CREATE INDEX IF NOT EXISTS idx_feeds_title ON feeds(title); CREATE INDEX IF NOT EXISTS idx_feeds_title ON feeds(title);
CREATE INDEX IF NOT EXISTS idx_feeds_search ON feeds USING GIN(search_vector);
CREATE TABLE IF NOT EXISTS items ( CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT, id BIGSERIAL PRIMARY KEY,
feedUrl TEXT NOT NULL, feed_url TEXT NOT NULL,
guid TEXT, guid TEXT,
title TEXT, title TEXT,
link TEXT, link TEXT,
description TEXT, description TEXT,
content TEXT, content TEXT,
author TEXT, author TEXT,
pubDate DATETIME, pub_date TIMESTAMPTZ,
discoveredAt DATETIME NOT NULL, discovered_at TIMESTAMPTZ NOT NULL,
updatedAt DATETIME, updated_at TIMESTAMPTZ,
-- Media attachments -- Media attachments
enclosureUrl TEXT, enclosure_url TEXT,
enclosureType TEXT, enclosure_type TEXT,
enclosureLength INTEGER, enclosure_length BIGINT,
imageUrls TEXT, -- JSON array of image URLs image_urls TEXT, -- JSON array of image URLs
-- Publishing to PDS -- Publishing to PDS
publishedAt DATETIME, published_at TIMESTAMPTZ,
publishedUri TEXT, published_uri TEXT,
UNIQUE(feedUrl, guid) -- Full-text search vector
search_vector tsvector GENERATED ALWAYS AS (
setweight(to_tsvector('english', coalesce(title, '')), 'A') ||
setweight(to_tsvector('english', coalesce(description, '')), 'B') ||
setweight(to_tsvector('english', coalesce(content, '')), 'C') ||
setweight(to_tsvector('english', coalesce(author, '')), 'D')
) STORED,
UNIQUE(feed_url, guid)
); );
CREATE INDEX IF NOT EXISTS idx_items_feedUrl ON items(feedUrl); CREATE INDEX IF NOT EXISTS idx_items_feed_url ON items(feed_url);
CREATE INDEX IF NOT EXISTS idx_items_pubDate ON items(pubDate DESC); CREATE INDEX IF NOT EXISTS idx_items_pub_date ON items(pub_date DESC);
CREATE INDEX IF NOT EXISTS idx_items_link ON items(link); CREATE INDEX IF NOT EXISTS idx_items_link ON items(link);
CREATE INDEX IF NOT EXISTS idx_items_feedUrl_pubDate ON items(feedUrl, pubDate DESC); CREATE INDEX IF NOT EXISTS idx_items_feed_url_pub_date ON items(feed_url, pub_date DESC);
CREATE INDEX IF NOT EXISTS idx_items_unpublished ON items(feedUrl, publishedAt) WHERE publishedAt IS NULL; CREATE INDEX IF NOT EXISTS idx_items_unpublished ON items(feed_url, published_at) WHERE published_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_items_search ON items USING GIN(search_vector);
-- Full-text search for feeds -- Trigger to normalize feed URLs on insert/update (strips https://, http://, www.)
CREATE VIRTUAL TABLE IF NOT EXISTS feeds_fts USING fts5( CREATE OR REPLACE FUNCTION normalize_feed_url()
url, RETURNS TRIGGER AS $$
title, BEGIN
description, NEW.url = regexp_replace(NEW.url, '^https?://', '');
content='feeds', NEW.url = regexp_replace(NEW.url, '^www\.', '');
content_rowid='rowid' RETURN NEW;
);
-- Triggers to keep FTS in sync
CREATE TRIGGER IF NOT EXISTS feeds_ai AFTER INSERT ON feeds BEGIN
INSERT INTO feeds_fts(rowid, url, title, description)
VALUES (NEW.rowid, NEW.url, NEW.title, NEW.description);
END; END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER IF NOT EXISTS feeds_ad AFTER DELETE ON feeds BEGIN DROP TRIGGER IF EXISTS normalize_feed_url_trigger ON feeds;
INSERT INTO feeds_fts(feeds_fts, rowid, url, title, description) CREATE TRIGGER normalize_feed_url_trigger
VALUES ('delete', OLD.rowid, OLD.url, OLD.title, OLD.description); BEFORE INSERT OR UPDATE ON feeds
END; FOR EACH ROW
EXECUTE FUNCTION normalize_feed_url();
CREATE TRIGGER IF NOT EXISTS feeds_au AFTER UPDATE ON feeds BEGIN
INSERT INTO feeds_fts(feeds_fts, rowid, url, title, description)
VALUES ('delete', OLD.rowid, OLD.url, OLD.title, OLD.description);
INSERT INTO feeds_fts(rowid, url, title, description)
VALUES (NEW.rowid, NEW.url, NEW.title, NEW.description);
END;
-- Full-text search for items
CREATE VIRTUAL TABLE IF NOT EXISTS items_fts USING fts5(
title,
description,
content,
author,
content='items',
content_rowid='id'
);
-- Triggers to keep items FTS in sync
CREATE TRIGGER IF NOT EXISTS items_ai AFTER INSERT ON items BEGIN
INSERT INTO items_fts(rowid, title, description, content, author)
VALUES (NEW.id, NEW.title, NEW.description, NEW.content, NEW.author);
END;
CREATE TRIGGER IF NOT EXISTS items_ad AFTER DELETE ON items BEGIN
INSERT INTO items_fts(items_fts, rowid, title, description, content, author)
VALUES ('delete', OLD.id, OLD.title, OLD.description, OLD.content, OLD.author);
END;
CREATE TRIGGER IF NOT EXISTS items_au AFTER UPDATE ON items BEGIN
INSERT INTO items_fts(items_fts, rowid, title, description, content, author)
VALUES ('delete', OLD.id, OLD.title, OLD.description, OLD.content, OLD.author);
INSERT INTO items_fts(rowid, title, description, content, author)
VALUES (NEW.id, NEW.title, NEW.description, NEW.content, NEW.author);
END;
` `
func OpenDatabase(dbPath string) (*sql.DB, error) { // DB wraps pgxpool.Pool with helper methods
fmt.Printf("Opening database: %s\n", dbPath) type DB struct {
*pgxpool.Pool
}
// Use pragmas in connection string for consistent application func OpenDatabase(connString string) (*DB, error) {
// - busy_timeout: wait up to 10s for locks instead of failing immediately fmt.Printf("Connecting to database...\n")
// - journal_mode: WAL for better concurrency and crash recovery
// - synchronous: NORMAL is safe with WAL (fsync at checkpoint, not every commit) // If connection string not provided, try environment variables
// - wal_autocheckpoint: checkpoint every 1000 pages (~4MB) to prevent WAL bloat if connString == "" {
// - foreign_keys: enforce referential integrity connString = os.Getenv("DATABASE_URL")
connStr := dbPath + "?_pragma=busy_timeout(10000)&_pragma=journal_mode(WAL)&_pragma=synchronous(NORMAL)&_pragma=wal_autocheckpoint(1000)&_pragma=foreign_keys(ON)" }
db, err := sql.Open("sqlite", connStr) if connString == "" {
// Build from individual env vars
host := getEnvOrDefault("DB_HOST", "atproto-postgres")
port := getEnvOrDefault("DB_PORT", "5432")
user := getEnvOrDefault("DB_USER", "news_1440")
dbname := getEnvOrDefault("DB_NAME", "news_1440")
// Support Docker secrets (password file) or direct password
password := os.Getenv("DB_PASSWORD")
if password == "" {
if passwordFile := os.Getenv("DB_PASSWORD_FILE"); passwordFile != "" {
data, err := os.ReadFile(passwordFile)
if err != nil {
return nil, fmt.Errorf("failed to read password file: %v", err)
}
password = strings.TrimSpace(string(data))
}
}
connString = fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable",
user, url.QueryEscape(password), host, port, dbname)
}
config, err := pgxpool.ParseConfig(connString)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to open database: %v", err) return nil, fmt.Errorf("failed to parse connection string: %v", err)
} }
// Connection pool settings for stability // Connection pool settings
db.SetMaxOpenConns(4) // Limit concurrent connections config.MaxConns = 10
db.SetMaxIdleConns(2) // Keep some connections warm config.MinConns = 2
db.SetConnMaxLifetime(5 * time.Minute) // Recycle connections periodically config.MaxConnLifetime = 5 * time.Minute
db.SetConnMaxIdleTime(1 * time.Minute) // Close idle connections config.MaxConnIdleTime = 1 * time.Minute
// Verify connection and show journal mode ctx := context.Background()
var journalMode string pool, err := pgxpool.NewWithConfig(ctx, config)
if err := db.QueryRow("PRAGMA journal_mode").Scan(&journalMode); err != nil { if err != nil {
fmt.Printf(" Warning: could not query journal_mode: %v\n", err) return nil, fmt.Errorf("failed to connect to database: %v", err)
} else {
fmt.Printf(" Journal mode: %s\n", journalMode)
} }
// Verify connection
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("failed to ping database: %v", err)
}
fmt.Println(" Connected to PostgreSQL")
db := &DB{pool}
// Create schema // Create schema
if _, err := db.Exec(schema); err != nil { if _, err := pool.Exec(ctx, schema); err != nil {
db.Close() pool.Close()
return nil, fmt.Errorf("failed to create schema: %v", err) return nil, fmt.Errorf("failed to create schema: %v", err)
} }
fmt.Println(" Schema OK") fmt.Println(" Schema OK")
// Migrations for existing databases // Run stats in background
migrations := []string{
"ALTER TABLE items ADD COLUMN enclosureUrl TEXT",
"ALTER TABLE items ADD COLUMN enclosureType TEXT",
"ALTER TABLE items ADD COLUMN enclosureLength INTEGER",
"ALTER TABLE items ADD COLUMN imageUrls TEXT",
}
for _, m := range migrations {
db.Exec(m) // Ignore errors (column may already exist)
}
// Run stats and ANALYZE in background to avoid blocking startup with large databases
go func() { go func() {
var domainCount, feedCount int var domainCount, feedCount int
db.QueryRow("SELECT COUNT(*) FROM domains").Scan(&domainCount) pool.QueryRow(context.Background(), "SELECT COUNT(*) FROM domains").Scan(&domainCount)
db.QueryRow("SELECT COUNT(*) FROM feeds").Scan(&feedCount) pool.QueryRow(context.Background(), "SELECT COUNT(*) FROM feeds").Scan(&feedCount)
fmt.Printf(" Existing data: %d domains, %d feeds\n", domainCount, feedCount) fmt.Printf(" Existing data: %d domains, %d feeds\n", domainCount, feedCount)
fmt.Println(" Running ANALYZE...") fmt.Println(" Running ANALYZE...")
if _, err := db.Exec("ANALYZE"); err != nil { if _, err := pool.Exec(context.Background(), "ANALYZE"); err != nil {
fmt.Printf(" Warning: ANALYZE failed: %v\n", err) fmt.Printf(" Warning: ANALYZE failed: %v\n", err)
} else { } else {
fmt.Println(" ANALYZE complete") fmt.Println(" ANALYZE complete")
@@ -228,3 +231,82 @@ func OpenDatabase(dbPath string) (*sql.DB, error) {
return db, nil return db, nil
} }
func getEnvOrDefault(key, defaultVal string) string {
if val := os.Getenv(key); val != "" {
return val
}
return defaultVal
}
// QueryRow wraps pool.QueryRow for compatibility
func (db *DB) QueryRow(query string, args ...interface{}) pgx.Row {
return db.Pool.QueryRow(context.Background(), query, args...)
}
// Query wraps pool.Query for compatibility
func (db *DB) Query(query string, args ...interface{}) (pgx.Rows, error) {
return db.Pool.Query(context.Background(), query, args...)
}
// Exec wraps pool.Exec for compatibility
func (db *DB) Exec(query string, args ...interface{}) (int64, error) {
result, err := db.Pool.Exec(context.Background(), query, args...)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
// Begin starts a transaction
func (db *DB) Begin() (pgx.Tx, error) {
return db.Pool.Begin(context.Background())
}
// Close closes the connection pool
func (db *DB) Close() error {
db.Pool.Close()
return nil
}
// NullableString returns nil for empty strings, otherwise the string pointer
func NullableString(s string) *string {
if s == "" {
return nil
}
return &s
}
// NullableTime returns nil for zero times, otherwise the time pointer
func NullableTime(t time.Time) *time.Time {
if t.IsZero() {
return nil
}
return &t
}
// StringValue returns empty string for nil, otherwise the dereferenced value
func StringValue(s *string) string {
if s == nil {
return ""
}
return *s
}
// TimeValue returns zero time for nil, otherwise the dereferenced value
func TimeValue(t *time.Time) time.Time {
if t == nil {
return time.Time{}
}
return *t
}
// ToSearchQuery converts a user query to PostgreSQL tsquery format
func ToSearchQuery(query string) string {
// Simple conversion: split on spaces and join with &
words := strings.Fields(query)
if len(words) == 0 {
return ""
}
return strings.Join(words, " & ")
}
+15 -1
View File
@@ -6,11 +6,19 @@ services:
stop_grace_period: 30s stop_grace_period: 30s
env_file: env_file:
- pds.env - pds.env
environment:
DB_HOST: atproto-postgres
DB_PORT: 5432
DB_USER: news_1440
DB_PASSWORD_FILE: /run/secrets/db_password
DB_NAME: news_1440
secrets:
- db_password
volumes: volumes:
- ./feeds:/app/feeds
- ./vertices.txt.gz:/app/vertices.txt.gz:ro - ./vertices.txt.gz:/app/vertices.txt.gz:ro
networks: networks:
- proxy - proxy
- atproto
labels: labels:
- "traefik.enable=true" - "traefik.enable=true"
# Production: HTTPS with Let's Encrypt # Production: HTTPS with Let's Encrypt
@@ -29,6 +37,12 @@ services:
# Shared service # Shared service
- "traefik.http.services.app-1440-news.loadbalancer.server.port=4321" - "traefik.http.services.app-1440-news.loadbalancer.server.port=4321"
secrets:
db_password:
file: ../postgres/secrets/news_1440_password.txt
networks: networks:
proxy: proxy:
external: true external: true
atproto:
external: true
+104 -105
View File
@@ -3,13 +3,15 @@ package main
import ( import (
"bufio" "bufio"
"compress/gzip" "compress/gzip"
"database/sql" "context"
"fmt" "fmt"
"io" "io"
"os" "os"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/jackc/pgx/v5"
) )
// Domain represents a host to be crawled for feeds // Domain represents a host to be crawled for feeds
@@ -23,78 +25,74 @@ type Domain struct {
TLD string `json:"tld,omitempty"` TLD string `json:"tld,omitempty"`
} }
// saveDomain stores a domain in SQLite // saveDomain stores a domain in PostgreSQL
func (c *Crawler) saveDomain(domain *Domain) error { func (c *Crawler) saveDomain(domain *Domain) error {
_, err := c.db.Exec(` _, err := c.db.Exec(`
INSERT INTO domains (host, status, discoveredAt, lastCrawledAt, feedsFound, lastError, tld) INSERT INTO domains (host, status, discovered_at, last_crawled_at, feeds_found, last_error, tld)
VALUES (?, ?, ?, ?, ?, ?, ?) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT(host) DO UPDATE SET ON CONFLICT(host) DO UPDATE SET
status = excluded.status, status = EXCLUDED.status,
lastCrawledAt = excluded.lastCrawledAt, last_crawled_at = EXCLUDED.last_crawled_at,
feedsFound = excluded.feedsFound, feeds_found = EXCLUDED.feeds_found,
lastError = excluded.lastError, last_error = EXCLUDED.last_error,
tld = excluded.tld tld = EXCLUDED.tld
`, domain.Host, domain.Status, domain.DiscoveredAt, nullTime(domain.LastCrawledAt), `, domain.Host, domain.Status, domain.DiscoveredAt, NullableTime(domain.LastCrawledAt),
domain.FeedsFound, nullString(domain.LastError), domain.TLD) domain.FeedsFound, NullableString(domain.LastError), domain.TLD)
return err return err
} }
// saveDomainTx stores a domain using a transaction // saveDomainTx stores a domain using a transaction
func (c *Crawler) saveDomainTx(tx *sql.Tx, domain *Domain) error { func (c *Crawler) saveDomainTx(tx pgx.Tx, domain *Domain) error {
_, err := tx.Exec(` _, err := tx.Exec(context.Background(), `
INSERT INTO domains (host, status, discoveredAt, lastCrawledAt, feedsFound, lastError, tld) INSERT INTO domains (host, status, discovered_at, last_crawled_at, feeds_found, last_error, tld)
VALUES (?, ?, ?, ?, ?, ?, ?) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT(host) DO NOTHING ON CONFLICT(host) DO NOTHING
`, domain.Host, domain.Status, domain.DiscoveredAt, nullTime(domain.LastCrawledAt), `, domain.Host, domain.Status, domain.DiscoveredAt, NullableTime(domain.LastCrawledAt),
domain.FeedsFound, nullString(domain.LastError), domain.TLD) domain.FeedsFound, NullableString(domain.LastError), domain.TLD)
return err return err
} }
// domainExists checks if a domain already exists in the database // domainExists checks if a domain already exists in the database
func (c *Crawler) domainExists(host string) bool { func (c *Crawler) domainExists(host string) bool {
var exists bool var exists bool
err := c.db.QueryRow("SELECT EXISTS(SELECT 1 FROM domains WHERE host = ?)", normalizeHost(host)).Scan(&exists) err := c.db.QueryRow("SELECT EXISTS(SELECT 1 FROM domains WHERE host = $1)", normalizeHost(host)).Scan(&exists)
return err == nil && exists return err == nil && exists
} }
// getDomain retrieves a domain from SQLite // getDomain retrieves a domain from PostgreSQL
func (c *Crawler) getDomain(host string) (*Domain, error) { func (c *Crawler) getDomain(host string) (*Domain, error) {
domain := &Domain{} domain := &Domain{}
var lastCrawledAt sql.NullTime var lastCrawledAt *time.Time
var lastError sql.NullString var lastError *string
err := c.db.QueryRow(` err := c.db.QueryRow(`
SELECT host, status, discoveredAt, lastCrawledAt, feedsFound, lastError, tld SELECT host, status, discovered_at, last_crawled_at, feeds_found, last_error, tld
FROM domains WHERE host = ? FROM domains WHERE host = $1
`, normalizeHost(host)).Scan( `, normalizeHost(host)).Scan(
&domain.Host, &domain.Status, &domain.DiscoveredAt, &lastCrawledAt, &domain.Host, &domain.Status, &domain.DiscoveredAt, &lastCrawledAt,
&domain.FeedsFound, &lastError, &domain.TLD, &domain.FeedsFound, &lastError, &domain.TLD,
) )
if err == sql.ErrNoRows { if err == pgx.ErrNoRows {
return nil, nil return nil, nil
} }
if err != nil { if err != nil {
return nil, err return nil, err
} }
if lastCrawledAt.Valid { domain.LastCrawledAt = TimeValue(lastCrawledAt)
domain.LastCrawledAt = lastCrawledAt.Time domain.LastError = StringValue(lastError)
}
if lastError.Valid {
domain.LastError = lastError.String
}
return domain, nil return domain, nil
} }
// GetUncheckedDomains returns up to limit unchecked domains ordered by discoveredAt (FIFO) // GetUncheckedDomains returns up to limit unchecked domains ordered by discovered_at (FIFO)
func (c *Crawler) GetUncheckedDomains(limit int) ([]*Domain, error) { func (c *Crawler) GetUncheckedDomains(limit int) ([]*Domain, error) {
rows, err := c.db.Query(` rows, err := c.db.Query(`
SELECT host, status, discoveredAt, lastCrawledAt, feedsFound, lastError, tld SELECT host, status, discovered_at, last_crawled_at, feeds_found, last_error, tld
FROM domains WHERE status = 'unchecked' FROM domains WHERE status = 'unchecked'
ORDER BY discoveredAt ASC ORDER BY discovered_at ASC
LIMIT ? LIMIT $1
`, limit) `, limit)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -105,12 +103,12 @@ func (c *Crawler) GetUncheckedDomains(limit int) ([]*Domain, error) {
} }
// scanDomains is a helper to scan multiple domain rows // scanDomains is a helper to scan multiple domain rows
func (c *Crawler) scanDomains(rows *sql.Rows) ([]*Domain, error) { func (c *Crawler) scanDomains(rows pgx.Rows) ([]*Domain, error) {
var domains []*Domain var domains []*Domain
for rows.Next() { for rows.Next() {
domain := &Domain{} domain := &Domain{}
var lastCrawledAt sql.NullTime var lastCrawledAt *time.Time
var lastError sql.NullString var lastError *string
if err := rows.Scan( if err := rows.Scan(
&domain.Host, &domain.Status, &domain.DiscoveredAt, &lastCrawledAt, &domain.Host, &domain.Status, &domain.DiscoveredAt, &lastCrawledAt,
@@ -119,12 +117,8 @@ func (c *Crawler) scanDomains(rows *sql.Rows) ([]*Domain, error) {
continue continue
} }
if lastCrawledAt.Valid { domain.LastCrawledAt = TimeValue(lastCrawledAt)
domain.LastCrawledAt = lastCrawledAt.Time domain.LastError = StringValue(lastError)
}
if lastError.Valid {
domain.LastError = lastError.String
}
domains = append(domains, domain) domains = append(domains, domain)
} }
@@ -142,13 +136,13 @@ func (c *Crawler) markDomainCrawled(host string, feedsFound int, lastError strin
var err error var err error
if lastError != "" { if lastError != "" {
_, err = c.db.Exec(` _, err = c.db.Exec(`
UPDATE domains SET status = ?, lastCrawledAt = ?, feedsFound = ?, lastError = ? UPDATE domains SET status = $1, last_crawled_at = $2, feeds_found = $3, last_error = $4
WHERE host = ? WHERE host = $5
`, status, time.Now(), feedsFound, lastError, normalizeHost(host)) `, status, time.Now(), feedsFound, lastError, normalizeHost(host))
} else { } else {
_, err = c.db.Exec(` _, err = c.db.Exec(`
UPDATE domains SET status = ?, lastCrawledAt = ?, feedsFound = ?, lastError = NULL UPDATE domains SET status = $1, last_crawled_at = $2, feeds_found = $3, last_error = NULL
WHERE host = ? WHERE host = $4
`, status, time.Now(), feedsFound, normalizeHost(host)) `, status, time.Now(), feedsFound, normalizeHost(host))
} }
return err return err
@@ -164,6 +158,23 @@ func (c *Crawler) GetDomainCount() (total int, unchecked int, err error) {
return total, unchecked, err return total, unchecked, err
} }
// ImportTestDomains adds a list of specific domains for testing
func (c *Crawler) ImportTestDomains(domains []string) {
now := time.Now()
for _, host := range domains {
_, err := c.db.Exec(`
INSERT INTO domains (host, status, discovered_at, tld)
VALUES ($1, 'unchecked', $2, $3)
ON CONFLICT(host) DO NOTHING
`, host, now, getTLD(host))
if err != nil {
fmt.Printf("Error adding test domain %s: %v\n", host, err)
} else {
fmt.Printf("Added test domain: %s\n", host)
}
}
}
// ImportDomainsFromFile reads a vertices file and stores new domains as "unchecked" // ImportDomainsFromFile reads a vertices file and stores new domains as "unchecked"
func (c *Crawler) ImportDomainsFromFile(filename string, limit int) (imported int, skipped int, err error) { func (c *Crawler) ImportDomainsFromFile(filename string, limit int) (imported int, skipped int, err error) {
file, err := os.Open(filename) file, err := os.Open(filename)
@@ -212,7 +223,6 @@ func (c *Crawler) ImportDomainsInBackground(filename string) {
const batchSize = 1000 const batchSize = 1000
now := time.Now() now := time.Now()
nowStr := now.Format("2006-01-02 15:04:05")
totalImported := 0 totalImported := 0
batchCount := 0 batchCount := 0
@@ -240,31 +250,43 @@ func (c *Crawler) ImportDomainsInBackground(filename string) {
break break
} }
// Build bulk INSERT statement // Use COPY for bulk insert (much faster than individual INSERTs)
var sb strings.Builder ctx := context.Background()
sb.WriteString("INSERT INTO domains (host, status, discoveredAt, tld) VALUES ") conn, err := c.db.Acquire(ctx)
args := make([]interface{}, 0, len(domains)*4)
for i, d := range domains {
if i > 0 {
sb.WriteString(",")
}
sb.WriteString("(?, 'unchecked', ?, ?)")
args = append(args, d.host, nowStr, d.tld)
}
sb.WriteString(" ON CONFLICT(host) DO NOTHING")
// Execute bulk insert
result, err := c.db.Exec(sb.String(), args...)
imported := 0
if err != nil { if err != nil {
fmt.Printf("Bulk insert error: %v\n", err) fmt.Printf("Failed to acquire connection: %v\n", err)
} else { break
rowsAffected, _ := result.RowsAffected() }
imported = int(rowsAffected)
// Build rows for copy
rows := make([][]interface{}, len(domains))
for i, d := range domains {
rows[i] = []interface{}{d.host, "unchecked", now, d.tld}
}
// Use CopyFrom for bulk insert
imported, err := conn.CopyFrom(
ctx,
pgx.Identifier{"domains"},
[]string{"host", "status", "discovered_at", "tld"},
pgx.CopyFromRows(rows),
)
conn.Release()
if err != nil {
// Fall back to individual inserts with ON CONFLICT
for _, d := range domains {
c.db.Exec(`
INSERT INTO domains (host, status, discovered_at, tld)
VALUES ($1, 'unchecked', $2, $3)
ON CONFLICT(host) DO NOTHING
`, d.host, now, d.tld)
}
imported = int64(len(domains))
} }
batchCount++ batchCount++
totalImported += imported totalImported += int(imported)
atomic.AddInt32(&c.domainsImported, int32(imported)) atomic.AddInt32(&c.domainsImported, int32(imported))
// Wait 1 second before the next batch // Wait 1 second before the next batch
@@ -304,7 +326,6 @@ func (c *Crawler) parseAndStoreDomains(reader io.Reader, limit int) (imported in
scanner.Buffer(buf, 1024*1024) scanner.Buffer(buf, 1024*1024)
now := time.Now() now := time.Now()
nowStr := now.Format("2006-01-02 15:04:05")
count := 0 count := 0
const batchSize = 1000 const batchSize = 1000
@@ -336,28 +357,21 @@ func (c *Crawler) parseAndStoreDomains(reader io.Reader, limit int) (imported in
break break
} }
// Build bulk INSERT statement // Insert with ON CONFLICT
var sb strings.Builder for _, d := range domains {
sb.WriteString("INSERT INTO domains (host, status, discoveredAt, tld) VALUES ") result, err := c.db.Exec(`
args := make([]interface{}, 0, len(domains)*4) INSERT INTO domains (host, status, discovered_at, tld)
for i, d := range domains { VALUES ($1, 'unchecked', $2, $3)
if i > 0 { ON CONFLICT(host) DO NOTHING
sb.WriteString(",") `, d.host, now, d.tld)
if err != nil {
skipped++
} else if result > 0 {
imported++
} else {
skipped++
} }
sb.WriteString("(?, 'unchecked', ?, ?)")
args = append(args, d.host, nowStr, d.tld)
} }
sb.WriteString(" ON CONFLICT(host) DO NOTHING")
// Execute bulk insert
result, execErr := c.db.Exec(sb.String(), args...)
if execErr != nil {
skipped += len(domains)
continue
}
rowsAffected, _ := result.RowsAffected()
imported += int(rowsAffected)
skipped += len(domains) - int(rowsAffected)
if limit > 0 && count >= limit { if limit > 0 && count >= limit {
break break
@@ -370,18 +384,3 @@ func (c *Crawler) parseAndStoreDomains(reader io.Reader, limit int) (imported in
return imported, skipped, nil return imported, skipped, nil
} }
// Helper functions for SQL null handling
func nullTime(t time.Time) sql.NullTime {
if t.IsZero() {
return sql.NullTime{}
}
return sql.NullTime{Time: t, Valid: true}
}
func nullString(s string) sql.NullString {
if s == "" {
return sql.NullString{}
}
return sql.NullString{String: s, Valid: true}
}
+332 -402
View File
File diff suppressed because it is too large Load Diff
+5 -1
View File
@@ -77,7 +77,11 @@ func (c *Crawler) extractFeedLinks(n *html.Node, baseURL string) []simpleFeed {
func (c *Crawler) extractAnchorFeeds(n *html.Node, baseURL string) []simpleFeed { func (c *Crawler) extractAnchorFeeds(n *html.Node, baseURL string) []simpleFeed {
feeds := make([]simpleFeed, 0) feeds := make([]simpleFeed, 0)
feedPattern := regexp.MustCompile(`(?i)(rss|atom|feed)`) // Match feed URLs more precisely:
// - /feed, /rss, /atom as path segments (not "feeds" or "feedback")
// - .rss, .atom, .xml file extensions
// - ?feed=, ?format=rss, etc.
feedPattern := regexp.MustCompile(`(?i)(/feed/?$|/feed/|/rss/?$|/rss/|/atom/?$|/atom/|\.rss|\.atom|\.xml|\?.*feed=|\?.*format=rss|\?.*format=atom)`)
var f func(*html.Node) var f func(*html.Node)
f = func(n *html.Node) { f = func(n *html.Node) {
+13 -9
View File
@@ -8,13 +8,8 @@ import (
) )
func main() { func main() {
// Ensure feeds directory exists // Connection string from environment (DATABASE_URL or DB_* vars)
if err := os.MkdirAll("feeds", 0755); err != nil { crawler, err := NewCrawler("")
fmt.Fprintf(os.Stderr, "Error creating feeds directory: %v\n", err)
os.Exit(1)
}
crawler, err := NewCrawler("feeds/feeds.db")
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "Error initializing crawler: %v\n", err) fmt.Fprintf(os.Stderr, "Error initializing crawler: %v\n", err)
os.Exit(1) os.Exit(1)
@@ -37,8 +32,14 @@ func main() {
// Start all loops independently // Start all loops independently
fmt.Println("Starting import, crawl, check, and stats loops...") fmt.Println("Starting import, crawl, check, and stats loops...")
// Import loop (background) // Import loop (background) - DISABLED for testing, using manual domains
go crawler.ImportDomainsInBackground("vertices.txt.gz") // go crawler.ImportDomainsInBackground("vertices.txt.gz")
// Add only ycombinator domains for testing
go crawler.ImportTestDomains([]string{
"news.ycombinator.com",
"ycombinator.com",
})
// Check loop (background) // Check loop (background)
go crawler.StartCheckLoop() go crawler.StartCheckLoop()
@@ -52,6 +53,9 @@ func main() {
// Maintenance loop (background) - WAL checkpoints and integrity checks // Maintenance loop (background) - WAL checkpoints and integrity checks
go crawler.StartMaintenanceLoop() go crawler.StartMaintenanceLoop()
// Publish loop (background) - autopublishes items for approved feeds
go crawler.StartPublishLoop()
// Crawl loop (background) // Crawl loop (background)
go crawler.StartCrawlLoop() go crawler.StartCrawlLoop()
+80 -57
View File
@@ -3,7 +3,6 @@ package main
import ( import (
"bytes" "bytes"
"crypto/sha256" "crypto/sha256"
"encoding/base32"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@@ -12,6 +11,7 @@ import (
"regexp" "regexp"
"strings" "strings"
"time" "time"
"unicode/utf8"
) )
// Publisher handles posting items to AT Protocol PDS // Publisher handles posting items to AT Protocol PDS
@@ -196,22 +196,41 @@ func (p *Publisher) CreateInviteCode(adminPassword string, useCount int) (string
return result.Code, nil return result.Code, nil
} }
// GenerateRkey creates a deterministic rkey from a GUID and timestamp // TID alphabet for base32-sortable encoding
// Uses a truncated base32-encoded SHA256 hash const tidAlphabet = "234567abcdefghijklmnopqrstuvwxyz"
// Including the timestamp allows regenerating a new rkey by updating discoveredAt
// GenerateRkey creates a deterministic TID-format rkey from a GUID and timestamp
// TIDs are required by Bluesky relay for indexing - custom rkeys don't sync
// Format: 13 chars base32-sortable, 53 bits timestamp + 10 bits clock ID
func GenerateRkey(guid string, timestamp time.Time) string { func GenerateRkey(guid string, timestamp time.Time) string {
if guid == "" { if guid == "" {
return "" return ""
} }
// Combine GUID with timestamp for the hash input // Get microseconds since Unix epoch (53 bits)
// Format timestamp to second precision for consistency microsInt := timestamp.UnixMicro()
input := guid + "|" + timestamp.UTC().Format(time.RFC3339) if microsInt < 0 {
hash := sha256.Sum256([]byte(input)) microsInt = 0
// Use first 10 bytes (80 bits) - plenty for uniqueness }
// Base32 encode without padding, lowercase for rkey compatibility // Convert to uint64 and mask to 53 bits
encoded := base32.StdEncoding.WithPadding(base32.NoPadding).EncodeToString(hash[:10]) micros := uint64(microsInt) & ((1 << 53) - 1)
return strings.ToLower(encoded)
// Generate deterministic 10-bit clock ID from GUID hash
hash := sha256.Sum256([]byte(guid))
clockID := uint64(hash[0])<<2 | uint64(hash[1])>>6
clockID = clockID & ((1 << 10) - 1) // 10 bits = 0-1023
// Combine: top bit 0, 53 bits timestamp, 10 bits clock ID
tid := (micros << 10) | clockID
// Encode as base32-sortable (13 characters)
var result [13]byte
for i := 12; i >= 0; i-- {
result[i] = tidAlphabet[tid&0x1f]
tid >>= 5
}
return string(result[:])
} }
// extractURLs finds all URLs in a string // extractURLs finds all URLs in a string
@@ -239,7 +258,8 @@ func (p *Publisher) PublishItem(session *PDSSession, item *Item) (string, error)
return "", fmt.Errorf("item has no GUID or link, cannot publish") return "", fmt.Errorf("item has no GUID or link, cannot publish")
} }
// Collect all unique URLs: main link + any URLs in description // Collect URLs: main link + HN comments link (if applicable)
// Limit to 2 URLs max to stay under 300 grapheme limit
urlSet := make(map[string]bool) urlSet := make(map[string]bool)
var allURLs []string var allURLs []string
@@ -249,8 +269,18 @@ func (p *Publisher) PublishItem(session *PDSSession, item *Item) (string, error)
allURLs = append(allURLs, item.Link) allURLs = append(allURLs, item.Link)
} }
// Add enclosure URL for podcasts/media (audio/video) // For HN feeds, add comments link from description (looks like "https://news.ycombinator.com/item?id=...")
if item.Enclosure != nil && item.Enclosure.URL != "" { descURLs := extractURLs(item.Description)
for _, u := range descURLs {
if strings.Contains(u, "news.ycombinator.com/item") && !urlSet[u] {
urlSet[u] = true
allURLs = append(allURLs, u)
break // Only add one comments link
}
}
// Add enclosure URL for podcasts/media (audio/video) if we have room
if len(allURLs) < 2 && item.Enclosure != nil && item.Enclosure.URL != "" {
encType := strings.ToLower(item.Enclosure.Type) encType := strings.ToLower(item.Enclosure.Type)
if strings.HasPrefix(encType, "audio/") || strings.HasPrefix(encType, "video/") { if strings.HasPrefix(encType, "audio/") || strings.HasPrefix(encType, "video/") {
if !urlSet[item.Enclosure.URL] { if !urlSet[item.Enclosure.URL] {
@@ -260,59 +290,52 @@ func (p *Publisher) PublishItem(session *PDSSession, item *Item) (string, error)
} }
} }
// Extract URLs from description
descURLs := extractURLs(item.Description)
for _, u := range descURLs {
if !urlSet[u] {
urlSet[u] = true
allURLs = append(allURLs, u)
}
}
// Extract URLs from content if available
contentURLs := extractURLs(item.Content)
for _, u := range contentURLs {
if !urlSet[u] {
urlSet[u] = true
allURLs = append(allURLs, u)
}
}
// Build post text: title + all links // Build post text: title + all links
// Bluesky has 300 grapheme limit // Bluesky has 300 grapheme limit - use rune count as approximation
var textBuilder strings.Builder const maxGraphemes = 295 // Leave some margin
textBuilder.WriteString(item.Title)
// Calculate space needed for URLs (in runes)
urlSpace := 0
for _, u := range allURLs { for _, u := range allURLs {
textBuilder.WriteString("\n\n") urlSpace += utf8.RuneCountInString(u) + 2 // +2 for \n\n
textBuilder.WriteString(u)
} }
text := textBuilder.String() // Truncate title if needed
title := item.Title
titleRunes := utf8.RuneCountInString(title)
maxTitleRunes := maxGraphemes - urlSpace - 3 // -3 for "..."
// Truncate title if text is too long (keep URLs intact) if titleRunes+urlSpace > maxGraphemes {
const maxLen = 300 if maxTitleRunes > 10 {
if len(text) > maxLen { // Truncate title to fit
// Calculate space needed for URLs runes := []rune(title)
urlSpace := 0 if len(runes) > maxTitleRunes {
for _, u := range allURLs { title = string(runes[:maxTitleRunes]) + "..."
urlSpace += len(u) + 2 // +2 for \n\n }
} } else {
// Title too long even with minimal space - just truncate hard
maxTitleLen := maxLen - urlSpace - 3 // -3 for "..." runes := []rune(title)
if maxTitleLen > 10 { if len(runes) > 50 {
text = item.Title[:maxTitleLen] + "..." title = string(runes[:50]) + "..."
for _, u := range allURLs {
text += "\n\n" + u
} }
} }
} }
// Use item's pubDate for createdAt, fall back to now // Build final text
createdAt := time.Now() var textBuilder strings.Builder
if !item.PubDate.IsZero() { textBuilder.WriteString(title)
createdAt = item.PubDate for _, u := range allURLs {
textBuilder.WriteString("\n\n")
textBuilder.WriteString(u)
} }
text := textBuilder.String()
// Use current time for createdAt (Bluesky won't index backdated posts)
// TODO: Restore original pubDate once Bluesky indexing is understood
createdAt := time.Now()
// if !item.PubDate.IsZero() {
// createdAt = item.PubDate
// }
post := BskyPost{ post := BskyPost{
Type: "app.bsky.feed.post", Type: "app.bsky.feed.post",
+56 -10
View File
@@ -258,6 +258,7 @@ function initDashboard() {
output.innerHTML = html; output.innerHTML = html;
attachTldHandlers(output.querySelector('.tld-list')); attachTldHandlers(output.querySelector('.tld-list'));
} catch (err) { } catch (err) {
console.error('TLDs error:', err);
output.innerHTML = '<div style="color: #f66; padding: 10px;">Error: ' + escapeHtml(err.message) + '</div>'; output.innerHTML = '<div style="color: #f66; padding: 10px;">Error: ' + escapeHtml(err.message) + '</div>';
} }
} }
@@ -301,7 +302,7 @@ function initDashboard() {
const result = await response.json(); const result = await response.json();
if (!result.data || result.data.length === 0) { if (!result.data || result.data.length === 0) {
infiniteScrollState.ended = true; if (infiniteScrollState) infiniteScrollState.ended = true;
document.getElementById('infiniteLoader').textContent = offset === 0 ? 'No results found' : 'End of list'; document.getElementById('infiniteLoader').textContent = offset === 0 ? 'No results found' : 'End of list';
return; return;
} }
@@ -319,11 +320,12 @@ function initDashboard() {
offset += result.data.length; offset += result.data.length;
if (result.data.length < limit) { if (result.data.length < limit) {
infiniteScrollState.ended = true; if (infiniteScrollState) infiniteScrollState.ended = true;
document.getElementById('infiniteLoader').textContent = 'End of list'; document.getElementById('infiniteLoader').textContent = 'End of list';
} }
} catch (err) { } catch (err) {
document.getElementById('infiniteLoader').textContent = 'Error loading'; console.error('Filter error:', err);
document.getElementById('infiniteLoader').textContent = 'Error loading: ' + err.message;
} }
} }
@@ -479,17 +481,26 @@ function initDashboard() {
output.innerHTML = '<div style="color: #666; padding: 10px;">Loading publish data...</div>'; output.innerHTML = '<div style="color: #666; padding: 10px;">Loading publish data...</div>';
try { try {
const [candidatesRes, passedRes] = await Promise.all([ const [candidatesRes, passedRes, deniedRes] = await Promise.all([
fetch('/api/publishCandidates?limit=50'), fetch('/api/publishCandidates?limit=50'),
fetch('/api/publishEnabled') fetch('/api/publishEnabled'),
fetch('/api/publishDenied')
]); ]);
const candidates = await candidatesRes.json(); const candidates = await candidatesRes.json();
const passed = await passedRes.json(); const passed = await passedRes.json();
const denied = await deniedRes.json();
let html = '<div style="padding: 10px;">'; let html = '<div style="padding: 10px;">';
// Filter buttons
html += '<div style="margin-bottom: 15px; display: flex; gap: 10px;">';
html += '<button class="filter-btn" data-filter="pass" style="padding: 6px 16px; background: #040; border: 1px solid #060; border-radius: 3px; color: #0a0; cursor: pointer;">Pass (' + passed.length + ')</button>';
html += '<button class="filter-btn" data-filter="held" style="padding: 6px 16px; background: #330; border: 1px solid #550; border-radius: 3px; color: #f90; cursor: pointer;">Held (' + candidates.length + ')</button>';
html += '<button class="filter-btn" data-filter="deny" style="padding: 6px 16px; background: #400; border: 1px solid #600; border-radius: 3px; color: #f66; cursor: pointer;">Deny (' + denied.length + ')</button>';
html += '</div>';
// Passed feeds (approved for publishing) // Passed feeds (approved for publishing)
html += '<div style="margin-bottom: 20px;">'; html += '<div id="section-pass" style="margin-bottom: 20px;">';
html += '<div style="color: #0a0; font-weight: bold; margin-bottom: 10px; border-bottom: 1px solid #333; padding-bottom: 5px;">✓ Approved for Publishing (' + passed.length + ')</div>'; html += '<div style="color: #0a0; font-weight: bold; margin-bottom: 10px; border-bottom: 1px solid #333; padding-bottom: 5px;">✓ Approved for Publishing (' + passed.length + ')</div>';
if (passed.length === 0) { if (passed.length === 0) {
html += '<div style="color: #666; padding: 10px;">No feeds approved yet</div>'; html += '<div style="color: #666; padding: 10px;">No feeds approved yet</div>';
@@ -501,14 +512,14 @@ function initDashboard() {
html += '<div style="color: #666; font-size: 0.85em;">' + escapeHtml(f.url) + '</div>'; html += '<div style="color: #666; font-size: 0.85em;">' + escapeHtml(f.url) + '</div>';
html += '<div style="color: #888; font-size: 0.85em;">→ ' + escapeHtml(f.account) + ' (' + f.unpublished_count + ' unpublished)</div>'; html += '<div style="color: #888; font-size: 0.85em;">→ ' + escapeHtml(f.account) + ' (' + f.unpublished_count + ' unpublished)</div>';
html += '</div>'; html += '</div>';
html += '<button class="status-btn" data-url="' + escapeHtml(f.url) + '" data-status="fail" style="padding: 4px 12px; background: #400; border: 1px solid #600; border-radius: 3px; color: #f66; cursor: pointer; margin-left: 10px;">Revoke</button>'; html += '<button class="status-btn" data-url="' + escapeHtml(f.url) + '" data-status="deny" style="padding: 4px 12px; background: #400; border: 1px solid #600; border-radius: 3px; color: #f66; cursor: pointer; margin-left: 10px;">Revoke</button>';
html += '</div>'; html += '</div>';
}); });
} }
html += '</div>'; html += '</div>';
// Candidates (held for review) // Candidates (held for review)
html += '<div>'; html += '<div id="section-held">';
html += '<div style="color: #f90; font-weight: bold; margin-bottom: 10px; border-bottom: 1px solid #333; padding-bottom: 5px;">⏳ Held for Review (' + candidates.length + ')</div>'; html += '<div style="color: #f90; font-weight: bold; margin-bottom: 10px; border-bottom: 1px solid #333; padding-bottom: 5px;">⏳ Held for Review (' + candidates.length + ')</div>';
if (candidates.length === 0) { if (candidates.length === 0) {
html += '<div style="color: #666; padding: 10px;">No candidates held</div>'; html += '<div style="color: #666; padding: 10px;">No candidates held</div>';
@@ -523,7 +534,28 @@ function initDashboard() {
html += '<div style="color: #555; font-size: 0.8em;">' + escapeHtml(f.source_host) + ' · ' + f.item_count + ' items · ' + escapeHtml(f.category) + '</div>'; html += '<div style="color: #555; font-size: 0.8em;">' + escapeHtml(f.source_host) + ' · ' + f.item_count + ' items · ' + escapeHtml(f.category) + '</div>';
html += '</div>'; html += '</div>';
html += '<button class="status-btn pass-btn" data-url="' + escapeHtml(f.url) + '" data-status="pass" style="padding: 4px 12px; background: #040; border: 1px solid #060; border-radius: 3px; color: #0a0; cursor: pointer; margin-left: 10px;">Pass</button>'; html += '<button class="status-btn pass-btn" data-url="' + escapeHtml(f.url) + '" data-status="pass" style="padding: 4px 12px; background: #040; border: 1px solid #060; border-radius: 3px; color: #0a0; cursor: pointer; margin-left: 10px;">Pass</button>';
html += '<button class="status-btn fail-btn" data-url="' + escapeHtml(f.url) + '" data-status="fail" style="padding: 4px 12px; background: #400; border: 1px solid #600; border-radius: 3px; color: #f66; cursor: pointer; margin-left: 5px;">Fail</button>'; html += '<button class="status-btn deny-btn" data-url="' + escapeHtml(f.url) + '" data-status="deny" style="padding: 4px 12px; background: #400; border: 1px solid #600; border-radius: 3px; color: #f66; cursor: pointer; margin-left: 5px;">Deny</button>';
html += '</div>';
html += '</div>';
});
}
html += '</div>';
// Denied feeds
html += '<div id="section-deny" style="display: none;">';
html += '<div style="color: #f66; font-weight: bold; margin-bottom: 10px; border-bottom: 1px solid #333; padding-bottom: 5px;">✗ Denied (' + denied.length + ')</div>';
if (denied.length === 0) {
html += '<div style="color: #666; padding: 10px;">No feeds denied</div>';
} else {
denied.forEach(f => {
html += '<div class="publish-row" style="padding: 8px; border-bottom: 1px solid #202020;">';
html += '<div style="display: flex; align-items: center;">';
html += '<div style="flex: 1;">';
html += '<div style="color: #0af;">' + escapeHtml(f.title || f.url) + '</div>';
html += '<div style="color: #666; font-size: 0.85em;">' + escapeHtml(f.url) + '</div>';
html += '<div style="color: #555; font-size: 0.8em;">' + escapeHtml(f.source_host) + ' · ' + f.item_count + ' items</div>';
html += '</div>';
html += '<button class="status-btn" data-url="' + escapeHtml(f.url) + '" data-status="held" style="padding: 4px 12px; background: #330; border: 1px solid #550; border-radius: 3px; color: #f90; cursor: pointer; margin-left: 10px;">Restore</button>';
html += '</div>'; html += '</div>';
html += '</div>'; html += '</div>';
}); });
@@ -533,7 +565,21 @@ function initDashboard() {
html += '</div>'; html += '</div>';
output.innerHTML = html; output.innerHTML = html;
// Attach handlers for pass/fail buttons // Filter button handlers
output.querySelectorAll('.filter-btn').forEach(btn => {
btn.addEventListener('click', () => {
const filter = btn.dataset.filter;
document.getElementById('section-pass').style.display = filter === 'pass' ? 'block' : 'none';
document.getElementById('section-held').style.display = filter === 'held' ? 'block' : 'none';
document.getElementById('section-deny').style.display = filter === 'deny' ? 'block' : 'none';
// Update button styles
output.querySelectorAll('.filter-btn').forEach(b => {
b.style.opacity = b.dataset.filter === filter ? '1' : '0.5';
});
});
});
// Attach handlers for pass/deny buttons
output.querySelectorAll('.status-btn').forEach(btn => { output.querySelectorAll('.status-btn').forEach(btn => {
btn.addEventListener('click', async () => { btn.addEventListener('click', async () => {
const url = btn.dataset.url; const url = btn.dataset.url;