Files
crawler/db.go
primal 26de5d3753 Add status column to items table
Items now have a status column ('pass' or 'fail', default 'pass') to
control publishing eligibility. Includes migration for existing databases.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 15:46:33 -05:00

450 lines
16 KiB
Go

package main
import (
"context"
"fmt"
"net/url"
"os"
"strings"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
const schema = `
-- Note: tld_enum type created separately from tlds.txt (1530 TLDs)
-- CREATE TYPE tld_enum AS ENUM ('aaa', 'aarp', ... 'zw');
-- New TLDs synced daily from IANA: https://data.iana.org/TLD/tlds-alpha-by-domain.txt
CREATE TABLE IF NOT EXISTS domains (
host TEXT NOT NULL,
tld tld_enum NOT NULL,
status TEXT NOT NULL DEFAULT 'hold', -- hold, pass, skip, dead (dead = retired TLD)
crawled_at TIMESTAMP NOT NULL DEFAULT '0001-01-01 00:00:00', -- domain_check: 0=unchecked, +1s=checked; feed_crawl: real timestamp
feeds_found INTEGER DEFAULT 0,
last_error TEXT,
miss_count INTEGER NOT NULL DEFAULT 0, -- consecutive errors, set to hold at 100
PRIMARY KEY (host, tld)
);
CREATE INDEX IF NOT EXISTS idx_domains_status ON domains(status);
CREATE INDEX IF NOT EXISTS idx_domains_tld ON domains(tld);
CREATE INDEX IF NOT EXISTS idx_domains_feeds_found ON domains(feeds_found DESC) WHERE feeds_found > 0;
CREATE INDEX IF NOT EXISTS idx_domains_to_crawl ON domains(crawled_at) WHERE crawled_at < '0001-01-02';
CREATE INDEX IF NOT EXISTS idx_domains_host_trgm ON domains USING GIN(host gin_trgm_ops);
CREATE TABLE IF NOT EXISTS feeds (
url TEXT PRIMARY KEY,
domain_host TEXT NOT NULL,
domain_tld tld_enum NOT NULL,
type TEXT,
category TEXT DEFAULT 'main',
title TEXT,
description TEXT,
language TEXT,
site_url TEXT,
source_url TEXT,
discovered_at TIMESTAMP NOT NULL DEFAULT NOW(),
last_checked_at TIMESTAMP, -- feed_check: when last checked for new items
next_check_at TIMESTAMP, -- feed_check: when to next check
last_build_date TIMESTAMP,
etag TEXT,
last_modified TEXT,
status TEXT NOT NULL DEFAULT 'pass',
last_error TEXT,
last_error_at TIMESTAMP,
item_count INTEGER NOT NULL DEFAULT 0,
oldest_item_date TIMESTAMP,
newest_item_date TIMESTAMP,
no_update INTEGER NOT NULL DEFAULT 0,
-- Publishing to PDS
publish_status TEXT NOT NULL DEFAULT 'hold',
publish_account TEXT,
FOREIGN KEY (domain_host, domain_tld) REFERENCES domains(host, tld)
);
-- Indexes will be added as needed based on query patterns
CREATE TABLE IF NOT EXISTS items (
guid TEXT NOT NULL,
feed_url TEXT NOT NULL REFERENCES feeds(url) ON DELETE CASCADE,
title TEXT,
link TEXT,
description TEXT,
content TEXT,
author TEXT,
pub_date TIMESTAMP,
discovered_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP,
-- Media attachments
enclosure_url TEXT,
enclosure_type TEXT,
enclosure_length BIGINT,
image_urls JSONB,
tags JSONB,
-- Item status: 'pass' (default, eligible for publishing), 'fail' (rejected)
status TEXT NOT NULL DEFAULT 'pass',
-- Publishing to PDS
published_at TIMESTAMP,
published_uri TEXT,
PRIMARY KEY (guid, feed_url)
);
-- Indexes will be added as needed based on query patterns
-- OAuth sessions
CREATE TABLE IF NOT EXISTS oauth_sessions (
id TEXT PRIMARY KEY,
did TEXT NOT NULL,
handle TEXT NOT NULL,
access_token TEXT,
refresh_token TEXT,
token_type TEXT NOT NULL DEFAULT 'DPoP',
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
dpop_private_jwk TEXT,
dpop_authserver_nonce TEXT,
dpop_pds_nonce TEXT,
pds_url TEXT,
authserver_iss TEXT,
token_expiry TIMESTAMP
);
-- Trigger to normalize feed URLs on insert/update (strips https://, http://, www.)
CREATE OR REPLACE FUNCTION normalize_feed_url()
RETURNS TRIGGER AS $$
BEGIN
NEW.url = regexp_replace(NEW.url, '^https?://', '');
NEW.url = regexp_replace(NEW.url, '^www\.', '');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS normalize_feed_url_trigger ON feeds;
CREATE TRIGGER normalize_feed_url_trigger
BEFORE INSERT OR UPDATE ON feeds
FOR EACH ROW
EXECUTE FUNCTION normalize_feed_url();
`
// DB wraps pgxpool.Pool with helper methods
type DB struct {
*pgxpool.Pool
}
func OpenDatabase(connString string) (*DB, error) {
fmt.Printf("Connecting to database...\n")
// If connection string not provided, try environment variables
if connString == "" {
connString = os.Getenv("DATABASE_URL")
}
if connString == "" {
// Build from individual env vars
host := getEnvOrDefault("DB_HOST", "atproto-postgres")
port := getEnvOrDefault("DB_PORT", "5432")
user := getEnvOrDefault("DB_USER", "dba_1440_news")
dbname := getEnvOrDefault("DB_NAME", "db_1440_news")
// Support Docker secrets (password file) or direct password
password := os.Getenv("DB_PASSWORD")
if password == "" {
if passwordFile := os.Getenv("DB_PASSWORD_FILE"); passwordFile != "" {
data, err := os.ReadFile(passwordFile)
if err != nil {
return nil, fmt.Errorf("failed to read password file: %v", err)
}
password = strings.TrimSpace(string(data))
}
}
connString = fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable",
user, url.QueryEscape(password), host, port, dbname)
}
config, err := pgxpool.ParseConfig(connString)
if err != nil {
return nil, fmt.Errorf("failed to parse connection string: %v", err)
}
// Connection pool settings
config.MaxConns = 10
config.MinConns = 0 // Don't pre-create connections to avoid schema race conditions
config.MaxConnLifetime = 5 * time.Minute
config.MaxConnIdleTime = 1 * time.Minute
ctx := context.Background()
pool, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %v", err)
}
// Verify connection
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("failed to ping database: %v", err)
}
fmt.Println(" Connected to PostgreSQL")
db := &DB{pool}
// Check if schema already exists (check for domains table)
var tableExists bool
pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = 'domains')").Scan(&tableExists)
if !tableExists {
// Create schema only if tables don't exist
if _, err := pool.Exec(ctx, schema); err != nil {
pool.Close()
return nil, fmt.Errorf("failed to create schema: %v", err)
}
}
fmt.Println(" Schema OK")
// Migration: add miss_count column if not exists
pool.Exec(ctx, "ALTER TABLE domains ADD COLUMN IF NOT EXISTS miss_count INTEGER NOT NULL DEFAULT 0")
// Migration: add trigram extension and indexes for fast LIKE searches
// Indexes must match LOWER() used in queries
pool.Exec(ctx, "CREATE EXTENSION IF NOT EXISTS pg_trgm")
pool.Exec(ctx, "CREATE INDEX IF NOT EXISTS idx_domains_host_trgm ON domains USING gin (LOWER(host) gin_trgm_ops)")
pool.Exec(ctx, "CREATE INDEX IF NOT EXISTS idx_feeds_domain_host_trgm ON feeds USING gin (LOWER(domain_host) gin_trgm_ops)")
// Migration: rename old 'sessions' table to 'oauth_sessions'
var oldSessionsExists, newSessionsExists bool
pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = 'sessions')").Scan(&oldSessionsExists)
pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = 'oauth_sessions')").Scan(&newSessionsExists)
if oldSessionsExists && !newSessionsExists {
pool.Exec(ctx, "ALTER TABLE sessions RENAME TO oauth_sessions")
}
// Add token_expiry column if missing (used by OAuth library)
pool.Exec(ctx, "ALTER TABLE oauth_sessions ADD COLUMN IF NOT EXISTS token_expiry TIMESTAMP")
// Make access_token nullable (session created before tokens obtained)
pool.Exec(ctx, "ALTER TABLE oauth_sessions ALTER COLUMN access_token DROP NOT NULL")
// Add missing OAuth session columns
pool.Exec(ctx, "ALTER TABLE oauth_sessions ADD COLUMN IF NOT EXISTS dpop_authserver_nonce TEXT")
pool.Exec(ctx, "ALTER TABLE oauth_sessions ADD COLUMN IF NOT EXISTS dpop_pds_nonce TEXT")
pool.Exec(ctx, "ALTER TABLE oauth_sessions ADD COLUMN IF NOT EXISTS pds_url TEXT")
pool.Exec(ctx, "ALTER TABLE oauth_sessions ADD COLUMN IF NOT EXISTS authserver_iss TEXT")
// Drop old dpop_nonce column if it exists
pool.Exec(ctx, "ALTER TABLE oauth_sessions DROP COLUMN IF EXISTS dpop_nonce")
// Migration: rename feed columns for consistent terminology
// last_crawled_at -> last_checked_at (feed_check = checking feeds for new items)
// next_crawl_at -> next_check_at
// Check if old column names exist before renaming
var colExists bool
pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name='feeds' AND column_name='last_crawled_at')").Scan(&colExists)
if colExists {
pool.Exec(ctx, "ALTER TABLE feeds RENAME COLUMN last_crawled_at TO last_checked_at")
}
pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name='feeds' AND column_name='next_crawl_at')").Scan(&colExists)
if colExists {
pool.Exec(ctx, "ALTER TABLE feeds RENAME COLUMN next_crawl_at TO next_check_at")
}
// Create index on next_check_at (must be after column rename)
pool.Exec(ctx, "CREATE INDEX IF NOT EXISTS idx_feeds_to_check ON feeds(next_check_at, no_update DESC) WHERE status = 'pass'")
// Drop old index name if it exists
pool.Exec(ctx, "DROP INDEX IF EXISTS idx_feeds_due_check")
// Migration: convert TIMESTAMPTZ to TIMESTAMP (all times are GMT)
// Helper to check if column is already TIMESTAMP (skip if already migrated)
isTimestamp := func(table, column string) bool {
var dataType string
pool.QueryRow(ctx, `
SELECT data_type FROM information_schema.columns
WHERE table_name = $1 AND column_name = $2
`, table, column).Scan(&dataType)
return dataType == "timestamp without time zone"
}
// feeds table
if !isTimestamp("feeds", "discovered_at") {
pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN discovered_at TYPE TIMESTAMP USING discovered_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("feeds", "last_checked_at") {
pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN last_checked_at TYPE TIMESTAMP USING last_checked_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("feeds", "next_check_at") {
pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN next_check_at TYPE TIMESTAMP USING next_check_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("feeds", "last_build_date") {
pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN last_build_date TYPE TIMESTAMP USING last_build_date AT TIME ZONE 'UTC'")
}
if !isTimestamp("feeds", "last_error_at") {
pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN last_error_at TYPE TIMESTAMP USING last_error_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("feeds", "oldest_item_date") {
pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN oldest_item_date TYPE TIMESTAMP USING oldest_item_date AT TIME ZONE 'UTC'")
}
if !isTimestamp("feeds", "newest_item_date") {
pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN newest_item_date TYPE TIMESTAMP USING newest_item_date AT TIME ZONE 'UTC'")
}
// items table
if !isTimestamp("items", "pub_date") {
pool.Exec(ctx, "ALTER TABLE items ALTER COLUMN pub_date TYPE TIMESTAMP USING pub_date AT TIME ZONE 'UTC'")
}
if !isTimestamp("items", "discovered_at") {
pool.Exec(ctx, "ALTER TABLE items ALTER COLUMN discovered_at TYPE TIMESTAMP USING discovered_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("items", "updated_at") {
pool.Exec(ctx, "ALTER TABLE items ALTER COLUMN updated_at TYPE TIMESTAMP USING updated_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("items", "published_at") {
pool.Exec(ctx, "ALTER TABLE items ALTER COLUMN published_at TYPE TIMESTAMP USING published_at AT TIME ZONE 'UTC'")
}
// short_urls table
if !isTimestamp("short_urls", "created_at") {
pool.Exec(ctx, "ALTER TABLE short_urls ALTER COLUMN created_at TYPE TIMESTAMP USING created_at AT TIME ZONE 'UTC'")
}
// clicks table
if !isTimestamp("clicks", "clicked_at") {
pool.Exec(ctx, "ALTER TABLE clicks ALTER COLUMN clicked_at TYPE TIMESTAMP USING clicked_at AT TIME ZONE 'UTC'")
}
// oauth_sessions table
if !isTimestamp("oauth_sessions", "created_at") {
pool.Exec(ctx, "ALTER TABLE oauth_sessions ALTER COLUMN created_at TYPE TIMESTAMP USING created_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("oauth_sessions", "expires_at") {
pool.Exec(ctx, "ALTER TABLE oauth_sessions ALTER COLUMN expires_at TYPE TIMESTAMP USING expires_at AT TIME ZONE 'UTC'")
}
if !isTimestamp("oauth_sessions", "token_expiry") {
pool.Exec(ctx, "ALTER TABLE oauth_sessions ALTER COLUMN token_expiry TYPE TIMESTAMP USING token_expiry AT TIME ZONE 'UTC'")
}
// Migration: rename item_id to item_guid in short_urls table (items now use composite PK)
pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name='short_urls' AND column_name='item_id')").Scan(&colExists)
if colExists {
// Drop the column and add item_guid instead (can't convert int64 to text meaningfully)
pool.Exec(ctx, "ALTER TABLE short_urls DROP COLUMN IF EXISTS item_id")
pool.Exec(ctx, "ALTER TABLE short_urls ADD COLUMN IF NOT EXISTS item_guid TEXT")
}
// Migration: add status column to items table (pass/fail for publishing)
pool.Exec(ctx, "ALTER TABLE items ADD COLUMN IF NOT EXISTS status TEXT NOT NULL DEFAULT 'pass'")
fmt.Println(" Schema OK")
// Run stats and background index creation
go func() {
var domainCount, feedCount int
pool.QueryRow(context.Background(), "SELECT COUNT(*) FROM domains").Scan(&domainCount)
pool.QueryRow(context.Background(), "SELECT COUNT(*) FROM feeds").Scan(&feedCount)
fmt.Printf(" Existing data: %d domains, %d feeds\n", domainCount, feedCount)
fmt.Println(" Running ANALYZE...")
if _, err := pool.Exec(context.Background(), "ANALYZE"); err != nil {
fmt.Printf(" Warning: ANALYZE failed: %v\n", err)
} else {
fmt.Println(" ANALYZE complete")
}
// Create trigram index on items.title in background (CONCURRENTLY = no table lock)
// Check if index already exists first
var indexExists bool
pool.QueryRow(context.Background(),
"SELECT EXISTS(SELECT 1 FROM pg_indexes WHERE indexname = 'idx_items_title_trgm')").Scan(&indexExists)
if !indexExists {
fmt.Println(" Creating trigram index on items.title (background, may take a while)...")
if _, err := pool.Exec(context.Background(),
"CREATE INDEX CONCURRENTLY idx_items_title_trgm ON items USING gin (LOWER(title) gin_trgm_ops)"); err != nil {
fmt.Printf(" Warning: items title trigram index failed: %v\n", err)
} else {
fmt.Println(" Trigram index on items.title complete")
}
}
}()
return db, nil
}
func getEnvOrDefault(key, defaultVal string) string {
if val := os.Getenv(key); val != "" {
return val
}
return defaultVal
}
// QueryRow wraps pool.QueryRow for compatibility
func (db *DB) QueryRow(query string, args ...interface{}) pgx.Row {
return db.Pool.QueryRow(context.Background(), query, args...)
}
// Query wraps pool.Query for compatibility
func (db *DB) Query(query string, args ...interface{}) (pgx.Rows, error) {
return db.Pool.Query(context.Background(), query, args...)
}
// Exec wraps pool.Exec for compatibility
func (db *DB) Exec(query string, args ...interface{}) (int64, error) {
result, err := db.Pool.Exec(context.Background(), query, args...)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
// Begin starts a transaction
func (db *DB) Begin() (pgx.Tx, error) {
return db.Pool.Begin(context.Background())
}
// Close closes the connection pool
func (db *DB) Close() error {
db.Pool.Close()
return nil
}
// NullableString returns nil for empty strings, otherwise the string pointer
func NullableString(s string) *string {
if s == "" {
return nil
}
return &s
}
// NullableTime returns nil for zero times, otherwise the time pointer
func NullableTime(t time.Time) *time.Time {
if t.IsZero() {
return nil
}
return &t
}
// StringValue returns empty string for nil, otherwise the dereferenced value
func StringValue(s *string) string {
if s == nil {
return ""
}
return *s
}
// TimeValue returns zero time for nil, otherwise the dereferenced value
func TimeValue(t *time.Time) time.Time {
if t == nil {
return time.Time{}
}
return *t
}
// ToSearchQuery converts a user query to PostgreSQL tsquery format
func ToSearchQuery(query string) string {
// Simple conversion: split on spaces and join with &
words := strings.Fields(query)
if len(words) == 0 {
return ""
}
return strings.Join(words, " & ")
}