package main import ( "context" "fmt" "net/url" "os" "strings" "time" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) const schema = ` -- Note: tld_enum type created separately from tlds.txt (1530 TLDs) -- CREATE TYPE tld_enum AS ENUM ('aaa', 'aarp', ... 'zw'); -- New TLDs synced daily from IANA: https://data.iana.org/TLD/tlds-alpha-by-domain.txt CREATE TABLE IF NOT EXISTS domains ( host TEXT NOT NULL, tld tld_enum NOT NULL, status TEXT NOT NULL DEFAULT 'hold', -- hold, pass, skip, dead (dead = retired TLD) crawled_at TIMESTAMP NOT NULL DEFAULT '0001-01-01 00:00:00', -- domain_check: 0=unchecked, +1s=checked; feed_crawl: real timestamp feeds_found INTEGER DEFAULT 0, last_error TEXT, miss_count INTEGER NOT NULL DEFAULT 0, -- consecutive errors, set to hold at 100 PRIMARY KEY (host, tld) ); CREATE INDEX IF NOT EXISTS idx_domains_status ON domains(status); CREATE INDEX IF NOT EXISTS idx_domains_tld ON domains(tld); CREATE INDEX IF NOT EXISTS idx_domains_feeds_found ON domains(feeds_found DESC) WHERE feeds_found > 0; CREATE INDEX IF NOT EXISTS idx_domains_to_crawl ON domains(crawled_at) WHERE crawled_at < '0001-01-02'; CREATE INDEX IF NOT EXISTS idx_domains_host_trgm ON domains USING GIN(host gin_trgm_ops); CREATE TABLE IF NOT EXISTS feeds ( url TEXT PRIMARY KEY, domain_host TEXT NOT NULL, domain_tld tld_enum NOT NULL, type TEXT, category TEXT DEFAULT 'main', title TEXT, description TEXT, language TEXT, site_url TEXT, source_url TEXT, discovered_at TIMESTAMP NOT NULL DEFAULT NOW(), last_checked_at TIMESTAMP, -- feed_check: when last checked for new items next_check_at TIMESTAMP, -- feed_check: when to next check last_build_date TIMESTAMP, etag TEXT, last_modified TEXT, status TEXT NOT NULL DEFAULT 'pass', last_error TEXT, last_error_at TIMESTAMP, item_count INTEGER NOT NULL DEFAULT 0, oldest_item_date TIMESTAMP, newest_item_date TIMESTAMP, no_update INTEGER NOT NULL DEFAULT 0, -- Publishing to PDS publish_status TEXT NOT NULL DEFAULT 'hold', publish_account TEXT, FOREIGN KEY (domain_host, domain_tld) REFERENCES domains(host, tld) ); -- Indexes will be added as needed based on query patterns CREATE TABLE IF NOT EXISTS items ( guid TEXT NOT NULL, feed_url TEXT NOT NULL REFERENCES feeds(url) ON DELETE CASCADE, title TEXT, link TEXT, description TEXT, content TEXT, author TEXT, pub_date TIMESTAMP, discovered_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP, -- Media attachments enclosure_url TEXT, enclosure_type TEXT, enclosure_length BIGINT, image_urls JSONB, tags JSONB, -- Publishing to PDS published_at TIMESTAMP, published_uri TEXT, PRIMARY KEY (guid, feed_url) ); -- Indexes will be added as needed based on query patterns -- OAuth sessions CREATE TABLE IF NOT EXISTS oauth_sessions ( id TEXT PRIMARY KEY, did TEXT NOT NULL, handle TEXT NOT NULL, access_token TEXT, refresh_token TEXT, token_type TEXT NOT NULL DEFAULT 'DPoP', expires_at TIMESTAMP NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW(), dpop_private_jwk TEXT, dpop_authserver_nonce TEXT, dpop_pds_nonce TEXT, pds_url TEXT, authserver_iss TEXT, token_expiry TIMESTAMP ); -- Trigger to normalize feed URLs on insert/update (strips https://, http://, www.) CREATE OR REPLACE FUNCTION normalize_feed_url() RETURNS TRIGGER AS $$ BEGIN NEW.url = regexp_replace(NEW.url, '^https?://', ''); NEW.url = regexp_replace(NEW.url, '^www\.', ''); RETURN NEW; END; $$ LANGUAGE plpgsql; DROP TRIGGER IF EXISTS normalize_feed_url_trigger ON feeds; CREATE TRIGGER normalize_feed_url_trigger BEFORE INSERT OR UPDATE ON feeds FOR EACH ROW EXECUTE FUNCTION normalize_feed_url(); ` // DB wraps pgxpool.Pool with helper methods type DB struct { *pgxpool.Pool } func OpenDatabase(connString string) (*DB, error) { fmt.Printf("Connecting to database...\n") // If connection string not provided, try environment variables if connString == "" { connString = os.Getenv("DATABASE_URL") } if connString == "" { // Build from individual env vars host := getEnvOrDefault("DB_HOST", "atproto-postgres") port := getEnvOrDefault("DB_PORT", "5432") user := getEnvOrDefault("DB_USER", "dba_1440_news") dbname := getEnvOrDefault("DB_NAME", "db_1440_news") // Support Docker secrets (password file) or direct password password := os.Getenv("DB_PASSWORD") if password == "" { if passwordFile := os.Getenv("DB_PASSWORD_FILE"); passwordFile != "" { data, err := os.ReadFile(passwordFile) if err != nil { return nil, fmt.Errorf("failed to read password file: %v", err) } password = strings.TrimSpace(string(data)) } } connString = fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", user, url.QueryEscape(password), host, port, dbname) } config, err := pgxpool.ParseConfig(connString) if err != nil { return nil, fmt.Errorf("failed to parse connection string: %v", err) } // Connection pool settings config.MaxConns = 10 config.MinConns = 0 // Don't pre-create connections to avoid schema race conditions config.MaxConnLifetime = 5 * time.Minute config.MaxConnIdleTime = 1 * time.Minute ctx := context.Background() pool, err := pgxpool.NewWithConfig(ctx, config) if err != nil { return nil, fmt.Errorf("failed to connect to database: %v", err) } // Verify connection if err := pool.Ping(ctx); err != nil { pool.Close() return nil, fmt.Errorf("failed to ping database: %v", err) } fmt.Println(" Connected to PostgreSQL") db := &DB{pool} // Check if schema already exists (check for domains table) var tableExists bool pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = 'domains')").Scan(&tableExists) if !tableExists { // Create schema only if tables don't exist if _, err := pool.Exec(ctx, schema); err != nil { pool.Close() return nil, fmt.Errorf("failed to create schema: %v", err) } } fmt.Println(" Schema OK") // Migration: add miss_count column if not exists pool.Exec(ctx, "ALTER TABLE domains ADD COLUMN IF NOT EXISTS miss_count INTEGER NOT NULL DEFAULT 0") // Migration: add trigram extension and indexes for fast LIKE searches // Indexes must match LOWER() used in queries pool.Exec(ctx, "CREATE EXTENSION IF NOT EXISTS pg_trgm") pool.Exec(ctx, "CREATE INDEX IF NOT EXISTS idx_domains_host_trgm ON domains USING gin (LOWER(host) gin_trgm_ops)") pool.Exec(ctx, "CREATE INDEX IF NOT EXISTS idx_feeds_domain_host_trgm ON feeds USING gin (LOWER(domain_host) gin_trgm_ops)") // Migration: rename old 'sessions' table to 'oauth_sessions' var oldSessionsExists, newSessionsExists bool pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = 'sessions')").Scan(&oldSessionsExists) pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_name = 'oauth_sessions')").Scan(&newSessionsExists) if oldSessionsExists && !newSessionsExists { pool.Exec(ctx, "ALTER TABLE sessions RENAME TO oauth_sessions") } // Add token_expiry column if missing (used by OAuth library) pool.Exec(ctx, "ALTER TABLE oauth_sessions ADD COLUMN IF NOT EXISTS token_expiry TIMESTAMP") // Make access_token nullable (session created before tokens obtained) pool.Exec(ctx, "ALTER TABLE oauth_sessions ALTER COLUMN access_token DROP NOT NULL") // Add missing OAuth session columns pool.Exec(ctx, "ALTER TABLE oauth_sessions ADD COLUMN IF NOT EXISTS dpop_authserver_nonce TEXT") pool.Exec(ctx, "ALTER TABLE oauth_sessions ADD COLUMN IF NOT EXISTS dpop_pds_nonce TEXT") pool.Exec(ctx, "ALTER TABLE oauth_sessions ADD COLUMN IF NOT EXISTS pds_url TEXT") pool.Exec(ctx, "ALTER TABLE oauth_sessions ADD COLUMN IF NOT EXISTS authserver_iss TEXT") // Drop old dpop_nonce column if it exists pool.Exec(ctx, "ALTER TABLE oauth_sessions DROP COLUMN IF EXISTS dpop_nonce") // Migration: rename feed columns for consistent terminology // last_crawled_at -> last_checked_at (feed_check = checking feeds for new items) // next_crawl_at -> next_check_at // Check if old column names exist before renaming var colExists bool pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name='feeds' AND column_name='last_crawled_at')").Scan(&colExists) if colExists { pool.Exec(ctx, "ALTER TABLE feeds RENAME COLUMN last_crawled_at TO last_checked_at") } pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name='feeds' AND column_name='next_crawl_at')").Scan(&colExists) if colExists { pool.Exec(ctx, "ALTER TABLE feeds RENAME COLUMN next_crawl_at TO next_check_at") } // Create index on next_check_at (must be after column rename) pool.Exec(ctx, "CREATE INDEX IF NOT EXISTS idx_feeds_to_check ON feeds(next_check_at, no_update DESC) WHERE status = 'pass'") // Drop old index name if it exists pool.Exec(ctx, "DROP INDEX IF EXISTS idx_feeds_due_check") // Migration: convert TIMESTAMPTZ to TIMESTAMP (all times are GMT) // Helper to check if column is already TIMESTAMP (skip if already migrated) isTimestamp := func(table, column string) bool { var dataType string pool.QueryRow(ctx, ` SELECT data_type FROM information_schema.columns WHERE table_name = $1 AND column_name = $2 `, table, column).Scan(&dataType) return dataType == "timestamp without time zone" } // feeds table if !isTimestamp("feeds", "discovered_at") { pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN discovered_at TYPE TIMESTAMP USING discovered_at AT TIME ZONE 'UTC'") } if !isTimestamp("feeds", "last_checked_at") { pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN last_checked_at TYPE TIMESTAMP USING last_checked_at AT TIME ZONE 'UTC'") } if !isTimestamp("feeds", "next_check_at") { pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN next_check_at TYPE TIMESTAMP USING next_check_at AT TIME ZONE 'UTC'") } if !isTimestamp("feeds", "last_build_date") { pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN last_build_date TYPE TIMESTAMP USING last_build_date AT TIME ZONE 'UTC'") } if !isTimestamp("feeds", "last_error_at") { pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN last_error_at TYPE TIMESTAMP USING last_error_at AT TIME ZONE 'UTC'") } if !isTimestamp("feeds", "oldest_item_date") { pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN oldest_item_date TYPE TIMESTAMP USING oldest_item_date AT TIME ZONE 'UTC'") } if !isTimestamp("feeds", "newest_item_date") { pool.Exec(ctx, "ALTER TABLE feeds ALTER COLUMN newest_item_date TYPE TIMESTAMP USING newest_item_date AT TIME ZONE 'UTC'") } // items table if !isTimestamp("items", "pub_date") { pool.Exec(ctx, "ALTER TABLE items ALTER COLUMN pub_date TYPE TIMESTAMP USING pub_date AT TIME ZONE 'UTC'") } if !isTimestamp("items", "discovered_at") { pool.Exec(ctx, "ALTER TABLE items ALTER COLUMN discovered_at TYPE TIMESTAMP USING discovered_at AT TIME ZONE 'UTC'") } if !isTimestamp("items", "updated_at") { pool.Exec(ctx, "ALTER TABLE items ALTER COLUMN updated_at TYPE TIMESTAMP USING updated_at AT TIME ZONE 'UTC'") } if !isTimestamp("items", "published_at") { pool.Exec(ctx, "ALTER TABLE items ALTER COLUMN published_at TYPE TIMESTAMP USING published_at AT TIME ZONE 'UTC'") } // short_urls table if !isTimestamp("short_urls", "created_at") { pool.Exec(ctx, "ALTER TABLE short_urls ALTER COLUMN created_at TYPE TIMESTAMP USING created_at AT TIME ZONE 'UTC'") } // clicks table if !isTimestamp("clicks", "clicked_at") { pool.Exec(ctx, "ALTER TABLE clicks ALTER COLUMN clicked_at TYPE TIMESTAMP USING clicked_at AT TIME ZONE 'UTC'") } // oauth_sessions table if !isTimestamp("oauth_sessions", "created_at") { pool.Exec(ctx, "ALTER TABLE oauth_sessions ALTER COLUMN created_at TYPE TIMESTAMP USING created_at AT TIME ZONE 'UTC'") } if !isTimestamp("oauth_sessions", "expires_at") { pool.Exec(ctx, "ALTER TABLE oauth_sessions ALTER COLUMN expires_at TYPE TIMESTAMP USING expires_at AT TIME ZONE 'UTC'") } if !isTimestamp("oauth_sessions", "token_expiry") { pool.Exec(ctx, "ALTER TABLE oauth_sessions ALTER COLUMN token_expiry TYPE TIMESTAMP USING token_expiry AT TIME ZONE 'UTC'") } // Migration: rename item_id to item_guid in short_urls table (items now use composite PK) pool.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM information_schema.columns WHERE table_name='short_urls' AND column_name='item_id')").Scan(&colExists) if colExists { // Drop the column and add item_guid instead (can't convert int64 to text meaningfully) pool.Exec(ctx, "ALTER TABLE short_urls DROP COLUMN IF EXISTS item_id") pool.Exec(ctx, "ALTER TABLE short_urls ADD COLUMN IF NOT EXISTS item_guid TEXT") } fmt.Println(" Schema OK") // Run stats and background index creation go func() { var domainCount, feedCount int pool.QueryRow(context.Background(), "SELECT COUNT(*) FROM domains").Scan(&domainCount) pool.QueryRow(context.Background(), "SELECT COUNT(*) FROM feeds").Scan(&feedCount) fmt.Printf(" Existing data: %d domains, %d feeds\n", domainCount, feedCount) fmt.Println(" Running ANALYZE...") if _, err := pool.Exec(context.Background(), "ANALYZE"); err != nil { fmt.Printf(" Warning: ANALYZE failed: %v\n", err) } else { fmt.Println(" ANALYZE complete") } // Create trigram index on items.title in background (CONCURRENTLY = no table lock) // Check if index already exists first var indexExists bool pool.QueryRow(context.Background(), "SELECT EXISTS(SELECT 1 FROM pg_indexes WHERE indexname = 'idx_items_title_trgm')").Scan(&indexExists) if !indexExists { fmt.Println(" Creating trigram index on items.title (background, may take a while)...") if _, err := pool.Exec(context.Background(), "CREATE INDEX CONCURRENTLY idx_items_title_trgm ON items USING gin (LOWER(title) gin_trgm_ops)"); err != nil { fmt.Printf(" Warning: items title trigram index failed: %v\n", err) } else { fmt.Println(" Trigram index on items.title complete") } } }() return db, nil } func getEnvOrDefault(key, defaultVal string) string { if val := os.Getenv(key); val != "" { return val } return defaultVal } // QueryRow wraps pool.QueryRow for compatibility func (db *DB) QueryRow(query string, args ...interface{}) pgx.Row { return db.Pool.QueryRow(context.Background(), query, args...) } // Query wraps pool.Query for compatibility func (db *DB) Query(query string, args ...interface{}) (pgx.Rows, error) { return db.Pool.Query(context.Background(), query, args...) } // Exec wraps pool.Exec for compatibility func (db *DB) Exec(query string, args ...interface{}) (int64, error) { result, err := db.Pool.Exec(context.Background(), query, args...) if err != nil { return 0, err } return result.RowsAffected(), nil } // Begin starts a transaction func (db *DB) Begin() (pgx.Tx, error) { return db.Pool.Begin(context.Background()) } // Close closes the connection pool func (db *DB) Close() error { db.Pool.Close() return nil } // NullableString returns nil for empty strings, otherwise the string pointer func NullableString(s string) *string { if s == "" { return nil } return &s } // NullableTime returns nil for zero times, otherwise the time pointer func NullableTime(t time.Time) *time.Time { if t.IsZero() { return nil } return &t } // StringValue returns empty string for nil, otherwise the dereferenced value func StringValue(s *string) string { if s == nil { return "" } return *s } // TimeValue returns zero time for nil, otherwise the dereferenced value func TimeValue(t *time.Time) time.Time { if t == nil { return time.Time{} } return *t } // ToSearchQuery converts a user query to PostgreSQL tsquery format func ToSearchQuery(query string) string { // Simple conversion: split on spaces and join with & words := strings.Fields(query) if len(words) == 0 { return "" } return strings.Join(words, " & ") }