Files
crawler/db.go
primal f307e6c845 Add guards to skip migrations if already done
Checks column types before running ALTER TYPE migrations to avoid
slow table scans on every restart. Also guards column renames.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 17:44:35 -05:00

470 lines
16 KiB
Go

package main
import (
"context"
"fmt"
"net/url"
"os"
"strings"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
const schema = `
-- Note: tld_enum type created separately from tlds.txt (1530 TLDs)
-- CREATE TYPE tld_enum AS ENUM ('aaa', 'aarp', ... 'zw');
-- New TLDs synced daily from IANA: https://data.iana.org/TLD/tlds-alpha-by-domain.txt
CREATE TABLE IF NOT EXISTS domains (
host TEXT NOT NULL,
tld tld_enum NOT NULL,
status TEXT NOT NULL DEFAULT 'hold', -- hold, pass, skip, dead (dead = retired TLD)
crawled_at TIMESTAMP NOT NULL DEFAULT '0001-01-01 00:00:00', -- domain_check: 0=unchecked, +1s=checked; feed_crawl: real timestamp
feeds_found INTEGER DEFAULT 0,
last_error TEXT,
miss_count INTEGER NOT NULL DEFAULT 0, -- consecutive errors, set to hold at 100
PRIMARY KEY (host, tld)
);
CREATE INDEX IF NOT EXISTS idx_domains_status ON domains(status);
CREATE INDEX IF NOT EXISTS idx_domains_tld ON domains(tld);
CREATE INDEX IF NOT EXISTS idx_domains_feeds_found ON domains(feeds_found DESC) WHERE feeds_found > 0;
CREATE INDEX IF NOT EXISTS idx_domains_to_crawl ON domains(crawled_at) WHERE crawled_at < '0001-01-02';
CREATE INDEX IF NOT EXISTS idx_domains_host_trgm ON domains USING GIN(host gin_trgm_ops);
CREATE TABLE IF NOT EXISTS feeds (
url TEXT PRIMARY KEY,
type TEXT,
category TEXT DEFAULT 'main',
title TEXT,
description TEXT,
language TEXT,
site_url TEXT,
discovered_at TIMESTAMP NOT NULL,
last_checked_at TIMESTAMP, -- feed_check: when last checked for new items
next_check_at TIMESTAMP, -- feed_check: when to next check
last_build_date TIMESTAMP,
etag TEXT,
last_modified TEXT,
status TEXT DEFAULT 'pass' CHECK(status IN ('hold', 'pass', 'skip')),
last_error TEXT,
last_error_at TIMESTAMP,
source_url TEXT,
source_host TEXT,
tld TEXT,
item_count INTEGER,
oldest_item_date TIMESTAMP,
newest_item_date TIMESTAMP,
no_update INTEGER DEFAULT 0,
-- Publishing to PDS
publish_status TEXT DEFAULT 'hold' CHECK(publish_status IN ('hold', 'pass', 'skip')),
publish_account TEXT,
-- Full-text search vector
search_vector tsvector GENERATED ALWAYS AS (
setweight(to_tsvector('english', coalesce(title, '')), 'A') ||
setweight(to_tsvector('english', coalesce(description, '')), 'B') ||
setweight(to_tsvector('english', coalesce(url, '')), 'C')
) STORED
);
CREATE INDEX IF NOT EXISTS idx_feeds_source_host ON feeds(source_host);
CREATE INDEX IF NOT EXISTS idx_feeds_publish_status ON feeds(publish_status);
CREATE INDEX IF NOT EXISTS idx_feeds_source_host_url ON feeds(source_host, url);
CREATE INDEX IF NOT EXISTS idx_feeds_tld ON feeds(tld);
CREATE INDEX IF NOT EXISTS idx_feeds_tld_source_host ON feeds(tld, source_host);
CREATE INDEX IF NOT EXISTS idx_feeds_source_host_trgm ON feeds USING GIN(source_host gin_trgm_ops);
CREATE INDEX IF NOT EXISTS idx_feeds_type ON feeds(type);
CREATE INDEX IF NOT EXISTS idx_feeds_category ON feeds(category);
CREATE INDEX IF NOT EXISTS idx_feeds_status ON feeds(status);
CREATE INDEX IF NOT EXISTS idx_feeds_discovered_at ON feeds(discovered_at);
CREATE INDEX IF NOT EXISTS idx_feeds_title ON feeds(title);
CREATE INDEX IF NOT EXISTS idx_feeds_search ON feeds USING GIN(search_vector);
-- idx_feeds_to_check created in migrations after column rename
CREATE TABLE IF NOT EXISTS items (
id BIGSERIAL PRIMARY KEY,
feed_url TEXT NOT NULL,
guid TEXT,
title TEXT,
link TEXT,
description TEXT,
content TEXT,
author TEXT,
pub_date TIMESTAMP,
discovered_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP,
-- Media attachments
enclosure_url TEXT,
enclosure_type TEXT,
enclosure_length BIGINT,
image_urls TEXT, -- JSON array of image URLs
tags TEXT, -- JSON array of category/tag strings
-- Publishing to PDS
published_at TIMESTAMP,
published_uri TEXT,
-- Full-text search vector
search_vector tsvector GENERATED ALWAYS AS (
setweight(to_tsvector('english', coalesce(title, '')), 'A') ||
setweight(to_tsvector('english', coalesce(description, '')), 'B') ||
setweight(to_tsvector('english', coalesce(content, '')), 'C') ||
setweight(to_tsvector('english', coalesce(author, '')), 'D')
) STORED,
UNIQUE(feed_url, guid)
);
CREATE INDEX IF NOT EXISTS idx_items_feed_url ON items(feed_url);
CREATE INDEX IF NOT EXISTS idx_items_pub_date ON items(pub_date DESC);
CREATE INDEX IF NOT EXISTS idx_items_link ON items(link);
CREATE INDEX IF NOT EXISTS idx_items_feed_url_pub_date ON items(feed_url, pub_date DESC);
CREATE INDEX IF NOT EXISTS idx_items_unpublished ON items(feed_url, published_at) WHERE published_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_items_search ON items USING GIN(search_vector);
-- URL Shortener tables
CREATE TABLE IF NOT EXISTS short_urls (
code TEXT PRIMARY KEY,
original_url TEXT NOT NULL,
item_id BIGINT REFERENCES items(id),
feed_url TEXT,
created_at TIMESTAMP NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC'),
click_count INTEGER DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_short_urls_original ON short_urls(original_url);
CREATE INDEX IF NOT EXISTS idx_short_urls_item_id ON short_urls(item_id);
CREATE INDEX IF NOT EXISTS idx_short_urls_feed_url ON short_urls(feed_url);
CREATE TABLE IF NOT EXISTS clicks (
id BIGSERIAL PRIMARY KEY,
short_code TEXT NOT NULL REFERENCES short_urls(code),
clicked_at TIMESTAMP NOT NULL DEFAULT (NOW() AT TIME ZONE 'UTC'),
referrer TEXT,
user_agent TEXT,
ip_hash TEXT,
country TEXT
);
CREATE INDEX IF NOT EXISTS idx_clicks_short_code ON clicks(short_code);
CREATE INDEX IF NOT EXISTS idx_clicks_clicked_at ON clicks(clicked_at DESC);
-- OAuth sessions (persisted for login persistence across deploys)
CREATE TABLE IF NOT EXISTS oauth_sessions (
id TEXT PRIMARY KEY,
did TEXT NOT NULL,
handle TEXT NOT NULL,
created_at TIMESTAMP NOT NULL,
expires_at TIMESTAMP NOT NULL,
access_token TEXT,
refresh_token TEXT,
token_expiry TIMESTAMP,
dpop_private_jwk TEXT,
dpop_authserver_nonce TEXT,
dpop_pds_nonce TEXT,
pds_url TEXT,
authserver_iss TEXT
);
CREATE INDEX IF NOT EXISTS idx_oauth_sessions_expires_at ON oauth_sessions(expires_at);
-- Trigger to normalize feed URLs on insert/update (strips https://, http://, www.)
CREATE OR REPLACE FUNCTION normalize_feed_url()
RETURNS TRIGGER AS $$
BEGIN
NEW.url = regexp_replace(NEW.url, '^https?://', '');
NEW.url = regexp_replace(NEW.url, '^www\.', '');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS normalize_feed_url_trigger ON feeds;
CREATE TRIGGER normalize_feed_url_trigger
BEFORE INSERT OR UPDATE ON feeds
FOR EACH ROW
EXECUTE FUNCTION normalize_feed_url();
`
// DB wraps pgxpool.Pool with helper methods
type DB struct {
*pgxpool.Pool
}
func OpenDatabase(connString string) (*DB, error) {
fmt.Printf("Connecting to database...\n")
// If connection string not provided, try environment variables
if connString == "" {
connString = os.Getenv("DATABASE_URL")
}
if connString == "" {
// Build from individual env vars
host := getEnvOrDefault("DB_HOST", "atproto-postgres")
port := getEnvOrDefault("DB_PORT", "5432")
user := getEnvOrDefault("DB_USER", "news_1440")
dbname := getEnvOrDefault("DB_NAME", "news_1440")
// Support Docker secrets (password file) or direct password
password := os.Getenv("DB_PASSWORD")
if password == "" {
if passwordFile := os.Getenv("DB_PASSWORD_FILE"); passwordFile != "" {
data, err := os.ReadFile(passwordFile)
if err != nil {
return nil, fmt.Errorf("failed to read password file: %v", err)
}
password = strings.TrimSpace(string(data))
}
}
connString = fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable",
user, url.QueryEscape(password), host, port, dbname)
}
config, err := pgxpool.ParseConfig(connString)
if err != nil {
return nil, fmt.Errorf("failed to parse connection string: %v", err)
}
// Connection pool settings
config.MaxConns = 10
config.MinConns = 2
config.MaxConnLifetime = 5 * time.Minute
config.MaxConnIdleTime = 1 * time.Minute
ctx := context.Background()
pool, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %v", err)
}
// Verify connection
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("failed to ping database: %v", err)
}
fmt.Println(" Connected to PostgreSQL")
db := &DB{pool}
// Create schema
if _, err := pool.Exec(ctx, schema); err != nil {
pool.Close()
return nil, fmt.Errorf("failed to create schema: %v", err)
}
// Migration: add miss_count column if not exists
pool.Exec(ctx, "ALTER TABLE domains ADD COLUMN IF NOT EXISTS miss_count INTEGER NOT NULL DEFAULT 0")
// Migration: add trigram extension and indexes for fast LIKE searches
// Indexes must match LOWER() used in queries
pool.Exec(ctx, "CREATE EXTENSION IF NOT EXISTS pg_trgm")
pool.Exec(ctx, "CREATE INDEX IF NOT EXISTS idx_domains_host_trgm ON domains USING gin (LOWER(host) gin_trgm_ops)")
pool.Exec(ctx, "CREATE INDEX IF NOT EXISTS idx_feeds_source_host_trgm ON feeds USING gin (LOWER(source_host) gin_trgm_ops)")
// Migration: rename feed columns for consistent terminology
// last_crawled_at -> last_checked_at (feed_check = checking feeds for new items)
// next_crawl_at -> next_check_at
// Check if old column names exist before renaming
var colExists bool
pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name='feeds' AND column_name='last_crawled_at')").Scan(&colExists)
if colExists {
pool.Exec(ctx, "ALTER TABLE feeds RENAME COLUMN last_crawled_at TO last_checked_at")
}
pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name='feeds' AND column_name='next_crawl_at')").Scan(&colExists)
if colExists {
pool.Exec(ctx, "ALTER TABLE feeds RENAME COLUMN next_crawl_at TO next_check_at")
}
// Create index on next_check_at (must be after column rename)
pool.Exec(ctx, "CREATE INDEX IF NOT EXISTS idx_feeds_to_check ON feeds(next_check_at, no_update DESC) WHERE status = 'pass'")
// Drop old index name if it exists
pool.Exec(ctx, "DROP INDEX IF EXISTS idx_feeds_due_check")
// Migration: convert TIMESTAMPTZ to TIMESTAMP (all times are GMT)
// Helper to check if column is already TIMESTAMP (skip if already migrated)
isTimestamp := func(table, column string) bool {
var dataType string
pool.QueryRow(ctx, `
SELECT data_type FROM information_schema.columns
WHERE table_name = $1 AND column_name = $2
`, table, column).Scan(&dataType)
return dataType == "timestamp without time zone"
}
// feeds table
if !isTimestamp("feeds", "discovered_at") {
pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN discovered_at TYPE TIMESTAMP USING discovered_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("feeds", "last_checked_at") {
pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN last_checked_at TYPE TIMESTAMP USING last_checked_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("feeds", "next_check_at") {
pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN next_check_at TYPE TIMESTAMP USING next_check_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("feeds", "last_build_date") {
pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN last_build_date TYPE TIMESTAMP USING last_build_date AT TIME ZONE 'UTC'")
}
if !isTimestamp("feeds", "last_error_at") {
pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN last_error_at TYPE TIMESTAMP USING last_error_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("feeds", "oldest_item_date") {
pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN oldest_item_date TYPE TIMESTAMP USING oldest_item_date AT TIME ZONE 'UTC'")
}
if !isTimestamp("feeds", "newest_item_date") {
pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN newest_item_date TYPE TIMESTAMP USING newest_item_date AT TIME ZONE 'UTC'")
}
// items table
if !isTimestamp("items", "pub_date") {
pool.Exec(ctx, "ALTER TABLE items ALTER COLUMN pub_date TYPE TIMESTAMP USING pub_date AT TIME ZONE 'UTC'")
}
if !isTimestamp("items", "discovered_at") {
pool.Exec(ctx, "ALTER TABLE items ALTER COLUMN discovered_at TYPE TIMESTAMP USING discovered_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("items", "updated_at") {
pool.Exec(ctx, "ALTER TABLE items ALTER COLUMN updated_at TYPE TIMESTAMP USING updated_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("items", "published_at") {
pool.Exec(ctx, "ALTER TABLE items ALTER COLUMN published_at TYPE TIMESTAMP USING published_at AT TIME ZONE 'UTC'")
}
// short_urls table
if !isTimestamp("short_urls", "created_at") {
pool.Exec(ctx, "ALTER TABLE short_urls ALTER COLUMN created_at TYPE TIMESTAMP USING created_at AT TIME ZONE 'UTC'")
}
// clicks table
if !isTimestamp("clicks", "clicked_at") {
pool.Exec(ctx, "ALTER TABLE clicks ALTER COLUMN clicked_at TYPE TIMESTAMP USING clicked_at AT TIME ZONE 'UTC'")
}
// oauth_sessions table
if !isTimestamp("oauth_sessions", "created_at") {
pool.Exec(ctx, "ALTER TABLE oauth_sessions ALTER COLUMN created_at TYPE TIMESTAMP USING created_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("oauth_sessions", "expires_at") {
pool.Exec(ctx, "ALTER TABLE oauth_sessions ALTER COLUMN expires_at TYPE TIMESTAMP USING expires_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("oauth_sessions", "token_expiry") {
pool.Exec(ctx, "ALTER TABLE oauth_sessions ALTER COLUMN token_expiry TYPE TIMESTAMP USING token_expiry AT TIME ZONE 'UTC'")
}
fmt.Println(" Schema OK")
// Run stats and background index creation
go func() {
var domainCount, feedCount int
pool.QueryRow(context.Background(), "SELECT COUNT(*) FROM domains").Scan(&domainCount)
pool.QueryRow(context.Background(), "SELECT COUNT(*) FROM feeds").Scan(&feedCount)
fmt.Printf(" Existing data: %d domains, %d feeds\n", domainCount, feedCount)
fmt.Println(" Running ANALYZE...")
if _, err := pool.Exec(context.Background(), "ANALYZE"); err != nil {
fmt.Printf(" Warning: ANALYZE failed: %v\n", err)
} else {
fmt.Println(" ANALYZE complete")
}
// Create trigram index on items.title in background (CONCURRENTLY = no table lock)
// Check if index already exists first
var indexExists bool
pool.QueryRow(context.Background(),
"SELECT EXISTS(SELECT 1 FROM pg_indexes WHERE indexname = 'idx_items_title_trgm')").Scan(&indexExists)
if !indexExists {
fmt.Println(" Creating trigram index on items.title (background, may take a while)...")
if _, err := pool.Exec(context.Background(),
"CREATE INDEX CONCURRENTLY idx_items_title_trgm ON items USING gin (LOWER(title) gin_trgm_ops)"); err != nil {
fmt.Printf(" Warning: items title trigram index failed: %v\n", err)
} else {
fmt.Println(" Trigram index on items.title complete")
}
}
}()
return db, nil
}
func getEnvOrDefault(key, defaultVal string) string {
if val := os.Getenv(key); val != "" {
return val
}
return defaultVal
}
// QueryRow wraps pool.QueryRow for compatibility
func (db *DB) QueryRow(query string, args ...interface{}) pgx.Row {
return db.Pool.QueryRow(context.Background(), query, args...)
}
// Query wraps pool.Query for compatibility
func (db *DB) Query(query string, args ...interface{}) (pgx.Rows, error) {
return db.Pool.Query(context.Background(), query, args...)
}
// Exec wraps pool.Exec for compatibility
func (db *DB) Exec(query string, args ...interface{}) (int64, error) {
result, err := db.Pool.Exec(context.Background(), query, args...)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
// Begin starts a transaction
func (db *DB) Begin() (pgx.Tx, error) {
return db.Pool.Begin(context.Background())
}
// Close closes the connection pool
func (db *DB) Close() error {
db.Pool.Close()
return nil
}
// NullableString returns nil for empty strings, otherwise the string pointer
func NullableString(s string) *string {
if s == "" {
return nil
}
return &s
}
// NullableTime returns nil for zero times, otherwise the time pointer
func NullableTime(t time.Time) *time.Time {
if t.IsZero() {
return nil
}
return &t
}
// StringValue returns empty string for nil, otherwise the dereferenced value
func StringValue(s *string) string {
if s == nil {
return ""
}
return *s
}
// TimeValue returns zero time for nil, otherwise the dereferenced value
func TimeValue(t *time.Time) time.Time {
if t == nil {
return time.Time{}
}
return *t
}
// ToSearchQuery converts a user query to PostgreSQL tsquery format
func ToSearchQuery(query string) string {
// Simple conversion: split on spaces and join with &
words := strings.Fields(query)
if len(words) == 0 {
return ""
}
return strings.Join(words, " & ")
}