Refactor large Go files into focused modules

Split dashboard.go (3,528 lines) into:
- routes.go: HTTP route registration
- api_domains.go: Domain API handlers
- api_feeds.go: Feed API handlers
- api_publish.go: Publishing API handlers
- api_search.go: Search API handlers
- templates.go: HTML templates
- dashboard.go: Stats functions only (235 lines)

Split publisher.go (1,502 lines) into:
- pds_auth.go: Authentication and account management
- pds_records.go: Record operations (upload, update, delete)
- handle.go: Handle derivation from feed URLs
- image.go: Image processing and favicon fetching
- publisher.go: Core types and PublishItem (439 lines)

Split feed.go (1,137 lines) into:
- item.go: Item struct and DB operations
- feed_check.go: Feed checking and processing
- feed.go: Feed struct and DB operations (565 lines)

Also includes domain import batch size increase (1k -> 100k).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
primal
2026-01-29 22:25:02 -05:00
parent 3999e96f26
commit 1066f42189
17 changed files with 5106 additions and 4957 deletions
-572
View File
@@ -1,15 +1,9 @@
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"regexp"
"strings"
"sync/atomic"
"time"
"github.com/jackc/pgx/v5"
@@ -95,37 +89,6 @@ func classifyFeedByTitle(title string, currentCategory string) string {
return currentCategory
}
// Enclosure represents a media attachment (audio, video, image)
type Enclosure struct {
URL string `json:"url"`
Type string `json:"type"` // MIME type (audio/mpeg, image/jpeg, etc.)
Length int64 `json:"length"` // Size in bytes
}
// Item represents an individual entry/article from a feed
type Item struct {
ID int64 `json:"id,omitempty"`
FeedURL string `json:"feed_url"`
GUID string `json:"guid,omitempty"`
Title string `json:"title,omitempty"`
Link string `json:"link,omitempty"`
Description string `json:"description,omitempty"`
Content string `json:"content,omitempty"`
Author string `json:"author,omitempty"`
PubDate time.Time `json:"pub_date,omitempty"`
DiscoveredAt time.Time `json:"discovered_at"`
UpdatedAt time.Time `json:"updated_at,omitempty"`
// Media attachments
Enclosure *Enclosure `json:"enclosure,omitempty"` // Primary enclosure (podcast audio, etc.)
ImageURLs []string `json:"image_urls,omitempty"` // Image URLs extracted from content
Tags []string `json:"tags,omitempty"` // Category/tag strings from feed
// Publishing to PDS
PublishedAt time.Time `json:"published_at,omitempty"`
PublishedUri string `json:"published_uri,omitempty"`
}
// Feed represents a discovered RSS/Atom feed with metadata
type Feed struct {
URL string `json:"url"`
@@ -537,505 +500,6 @@ func scanFeeds(rows pgx.Rows) ([]*Feed, error) {
return feeds, rows.Err()
}
// saveItem stores an item in PostgreSQL (upsert by feed_url + guid)
func (c *Crawler) saveItem(item *Item) error {
// Serialize enclosure fields
var enclosureUrl, enclosureType *string
var enclosureLength *int64
if item.Enclosure != nil {
enclosureUrl = NullableString(item.Enclosure.URL)
enclosureType = NullableString(item.Enclosure.Type)
if item.Enclosure.Length > 0 {
enclosureLength = &item.Enclosure.Length
}
}
// Serialize imageUrls as JSON
var imageUrlsJSON *string
if len(item.ImageURLs) > 0 {
if data, err := json.Marshal(item.ImageURLs); err == nil {
s := string(data)
imageUrlsJSON = &s
}
}
// Serialize tags as JSON
var tagsJSON *string
if len(item.Tags) > 0 {
if data, err := json.Marshal(item.Tags); err == nil {
s := string(data)
tagsJSON = &s
}
}
_, err := c.db.Exec(`
INSERT INTO items (feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls, tags)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
ON CONFLICT(feed_url, guid) DO UPDATE SET
title = EXCLUDED.title,
link = EXCLUDED.link,
description = EXCLUDED.description,
content = EXCLUDED.content,
author = EXCLUDED.author,
pub_date = EXCLUDED.pub_date,
updated_at = EXCLUDED.updated_at,
enclosure_url = EXCLUDED.enclosure_url,
enclosure_type = EXCLUDED.enclosure_type,
enclosure_length = EXCLUDED.enclosure_length,
image_urls = EXCLUDED.image_urls,
tags = EXCLUDED.tags
`,
item.FeedURL, item.GUID, NullableString(item.Title), NullableString(item.Link),
NullableString(item.Description), NullableString(item.Content), NullableString(item.Author),
NullableTime(item.PubDate), item.DiscoveredAt, NullableTime(item.UpdatedAt),
enclosureUrl, enclosureType, enclosureLength, imageUrlsJSON, tagsJSON,
)
return err
}
// saveItems stores multiple items efficiently
func (c *Crawler) saveItems(items []*Item) error {
if len(items) == 0 {
return nil
}
tx, err := c.db.Begin()
if err != nil {
return err
}
defer tx.Rollback(context.Background())
for _, item := range items {
if item == nil || item.GUID == "" {
continue // Skip nil items or items without GUID
}
// Serialize enclosure fields
var enclosureUrl, enclosureType *string
var enclosureLength *int64
if item.Enclosure != nil {
enclosureUrl = NullableString(item.Enclosure.URL)
enclosureType = NullableString(item.Enclosure.Type)
if item.Enclosure.Length > 0 {
enclosureLength = &item.Enclosure.Length
}
}
// Serialize imageUrls as JSON
var imageUrlsJSON *string
if len(item.ImageURLs) > 0 {
if data, err := json.Marshal(item.ImageURLs); err == nil {
s := string(data)
imageUrlsJSON = &s
}
}
// Serialize tags as JSON
var tagsJSON *string
if len(item.Tags) > 0 {
if data, err := json.Marshal(item.Tags); err == nil {
s := string(data)
tagsJSON = &s
}
}
_, err := tx.Exec(context.Background(), `
INSERT INTO items (feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls, tags)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
ON CONFLICT(feed_url, guid) DO UPDATE SET
title = EXCLUDED.title,
link = EXCLUDED.link,
description = EXCLUDED.description,
content = EXCLUDED.content,
author = EXCLUDED.author,
pub_date = EXCLUDED.pub_date,
updated_at = EXCLUDED.updated_at,
enclosure_url = EXCLUDED.enclosure_url,
enclosure_type = EXCLUDED.enclosure_type,
enclosure_length = EXCLUDED.enclosure_length,
image_urls = EXCLUDED.image_urls,
tags = EXCLUDED.tags
`,
item.FeedURL, item.GUID, NullableString(item.Title), NullableString(item.Link),
NullableString(item.Description), NullableString(item.Content), NullableString(item.Author),
NullableTime(item.PubDate), item.DiscoveredAt, NullableTime(item.UpdatedAt),
enclosureUrl, enclosureType, enclosureLength, imageUrlsJSON, tagsJSON,
)
if err != nil {
continue // Skip failed items
}
}
return tx.Commit(context.Background())
}
// GetItemsByFeed returns all items for a specific feed
func (c *Crawler) GetItemsByFeed(feedURL string, limit int) ([]*Item, error) {
rows, err := c.db.Query(`
SELECT id, feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls, tags,
published_at, published_uri
FROM items
WHERE feed_url = $1
ORDER BY pub_date DESC
LIMIT $2
`, feedURL, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanItems(rows)
}
// SearchItems performs a full-text search on items
func (c *Crawler) SearchItems(query string, limit int) ([]*Item, error) {
tsquery := ToSearchQuery(query)
rows, err := c.db.Query(`
SELECT id, feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls, tags,
published_at, published_uri
FROM items
WHERE search_vector @@ to_tsquery('english', $1)
ORDER BY ts_rank(search_vector, to_tsquery('english', $1)) DESC, pub_date DESC
LIMIT $2
`, tsquery, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanItems(rows)
}
// scanItems is a helper to scan multiple item rows
func scanItems(rows pgx.Rows) ([]*Item, error) {
var items []*Item
for rows.Next() {
item := &Item{}
var guid, title, link, description, content, author *string
var pubDate, updatedAt, publishedAt *time.Time
var enclosureUrl, enclosureType *string
var enclosureLength *int64
var imageUrlsJSON, tagsJSON *string
var publishedUri *string
if err := rows.Scan(
&item.ID, &item.FeedURL, &guid, &title, &link,
&description, &content, &author, &pubDate,
&item.DiscoveredAt, &updatedAt,
&enclosureUrl, &enclosureType, &enclosureLength, &imageUrlsJSON, &tagsJSON,
&publishedAt, &publishedUri,
); err != nil {
continue
}
item.GUID = StringValue(guid)
item.Title = StringValue(title)
item.Link = StringValue(link)
item.Description = StringValue(description)
item.Content = StringValue(content)
item.Author = StringValue(author)
item.PubDate = TimeValue(pubDate)
item.UpdatedAt = TimeValue(updatedAt)
// Parse enclosure
if enclosureUrl != nil && *enclosureUrl != "" {
item.Enclosure = &Enclosure{
URL: *enclosureUrl,
Type: StringValue(enclosureType),
}
if enclosureLength != nil {
item.Enclosure.Length = *enclosureLength
}
}
// Parse imageUrls JSON
if imageUrlsJSON != nil && *imageUrlsJSON != "" {
var urls []string
if err := json.Unmarshal([]byte(*imageUrlsJSON), &urls); err == nil {
item.ImageURLs = urls
}
}
// Parse tags JSON
if tagsJSON != nil && *tagsJSON != "" {
var tags []string
if err := json.Unmarshal([]byte(*tagsJSON), &tags); err == nil {
item.Tags = tags
}
}
item.PublishedAt = TimeValue(publishedAt)
item.PublishedUri = StringValue(publishedUri)
items = append(items, item)
}
return items, rows.Err()
}
// CleanupOldItems removes items older than 12 months
func (c *Crawler) CleanupOldItems() (int64, error) {
cutoff := time.Now().AddDate(-1, 0, 0) // 12 months ago
result, err := c.db.Exec(`
DELETE FROM items WHERE pub_date < $1 AND pub_date IS NOT NULL
`, cutoff)
if err != nil {
return 0, err
}
return result, nil
}
// processFeed parses and stores a feed with full metadata
func (c *Crawler) processFeed(feedURL, sourceHost, body string, headers http.Header) {
// Fast path: check without lock
if c.feedExists(feedURL) {
return
}
c.feedsMu.Lock()
defer c.feedsMu.Unlock()
// Double-check after acquiring lock
if c.feedExists(feedURL) {
return
}
feedType := c.detectFeedType(body)
now := time.Now()
feed := &Feed{
URL: normalizeURL(feedURL),
Type: feedType,
Category: classifyFeed(feedURL),
DiscoveredAt: now,
LastCrawledAt: now,
Status: "active",
SourceHost: sourceHost,
TLD: getTLD(sourceHost),
ETag: headers.Get("ETag"),
LastModified: headers.Get("Last-Modified"),
}
// Parse feed-specific metadata and items
var items []*Item
switch feedType {
case "rss":
items = c.parseRSSMetadata(body, feed)
case "atom":
items = c.parseAtomMetadata(body, feed)
case "json":
items = c.parseJSONFeedMetadata(body, feed)
}
// Refine category based on parsed title (e.g., "Comments on:")
feed.Category = classifyFeedByTitle(feed.Title, feed.Category)
// Calculate next crawl time
feed.NextCrawlAt = c.calculateNextCrawl(feed)
if err := c.saveFeed(feed); err != nil {
return
}
// Save items
if len(items) > 0 {
c.saveItems(items)
}
}
// addFeed adds a discovered feed URL (not yet fetched)
func (c *Crawler) addFeed(feedURL, feedType, sourceHost, sourceURL string) {
// Fast path: check without lock
if c.feedExists(feedURL) {
return
}
c.feedsMu.Lock()
defer c.feedsMu.Unlock()
// Double-check after acquiring lock
if c.feedExists(feedURL) {
return
}
now := time.Now()
normalizedURL := normalizeURL(feedURL)
feed := &Feed{
URL: normalizedURL,
Type: feedType,
Category: classifyFeed(feedURL),
DiscoveredAt: now,
Status: "active",
SourceURL: normalizeURL(sourceURL),
SourceHost: sourceHost,
TLD: getTLD(sourceHost),
NextCrawlAt: now, // Should be crawled immediately
}
if err := c.saveFeed(feed); err != nil {
return
}
}
// CheckFeed performs a conditional request to check if a feed has been updated
// Returns: changed (bool), error
func (c *Crawler) CheckFeed(feed *Feed) (bool, error) {
atomic.AddInt32(&c.feedsChecked, 1)
// Try different scheme/www combinations since we store URLs without scheme
urlVariants := []string{
"https://" + feed.URL,
"http://" + feed.URL,
"https://www." + feed.URL,
"http://www." + feed.URL,
}
var resp *http.Response
var err error
var successURL string
for _, tryURL := range urlVariants {
req, reqErr := http.NewRequest("GET", tryURL, nil)
if reqErr != nil {
continue
}
req.Header.Set("User-Agent", c.UserAgent)
// Add conditional headers if we have them
if feed.ETag != "" {
req.Header.Set("If-None-Match", feed.ETag)
}
if feed.LastModified != "" {
req.Header.Set("If-Modified-Since", feed.LastModified)
}
resp, err = c.client.Do(req)
if err == nil {
successURL = tryURL
break
}
}
_ = successURL // May be used later for logging/debugging
// If no request succeeded, resp will be nil
if resp == nil {
if err == nil {
err = fmt.Errorf("all URL variants failed")
}
now := time.Now()
feed.LastCrawledAt = now
feed.ErrorCount++
feed.NoUpdate++
feed.NextCrawlAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
feed.LastError = err.Error()
feed.LastErrorAt = now
feed.Status = "error"
// Auto-hold feeds that fail 100+ times
if feed.ErrorCount >= 100 && feed.PublishStatus == "pass" {
feed.PublishStatus = "hold"
fmt.Printf("Feed auto-held after %d errors: %s\n", feed.ErrorCount, feed.URL)
}
c.saveFeed(feed)
return false, err
}
defer resp.Body.Close()
now := time.Now()
feed.LastCrawledAt = now
// 304 Not Modified - feed hasn't changed
if resp.StatusCode == http.StatusNotModified {
feed.NoUpdate++
// Adaptive backoff: 100s base + 100s per consecutive no-change
feed.NextCrawlAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
feed.ErrorCount = 0
feed.LastError = ""
feed.Status = "active"
c.saveFeed(feed)
return false, nil
}
// Non-200 response
if resp.StatusCode != http.StatusOK {
feed.ErrorCount++
feed.NoUpdate++
feed.NextCrawlAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
feed.LastError = resp.Status
feed.LastErrorAt = now
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusGone {
feed.Status = "dead"
} else {
feed.Status = "error"
}
// Auto-hold feeds that fail 100+ times
if feed.ErrorCount >= 100 && feed.PublishStatus == "pass" {
feed.PublishStatus = "hold"
fmt.Printf("Feed auto-held after %d errors: %s\n", feed.ErrorCount, feed.URL)
}
c.saveFeed(feed)
return false, nil
}
// 200 OK - feed has new content
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
feed.ErrorCount++
feed.NoUpdate++
feed.NextCrawlAt = now.Add(time.Duration(100+100*feed.NoUpdate) * time.Second)
feed.LastError = err.Error()
feed.LastErrorAt = now
feed.Status = "error"
// Auto-hold feeds that fail 100+ times
if feed.ErrorCount >= 100 && feed.PublishStatus == "pass" {
feed.PublishStatus = "hold"
fmt.Printf("Feed auto-held after %d errors: %s\n", feed.ErrorCount, feed.URL)
}
c.saveFeed(feed)
return false, err
}
body := string(bodyBytes)
// Update cache headers
feed.ETag = resp.Header.Get("ETag")
feed.LastModified = resp.Header.Get("Last-Modified")
// Re-detect type and parse metadata
feedType := c.detectFeedType(body)
feed.Type = feedType
var items []*Item
switch feedType {
case "rss":
items = c.parseRSSMetadata(body, feed)
case "atom":
items = c.parseAtomMetadata(body, feed)
case "json":
items = c.parseJSONFeedMetadata(body, feed)
}
// Content changed - reset backoff
feed.NoUpdate = 0
feed.NextCrawlAt = now.Add(100 * time.Second)
feed.ErrorCount = 0
feed.LastError = ""
feed.Status = "active"
c.saveFeed(feed)
// Save items
if len(items) > 0 {
c.saveItems(items)
}
return true, nil
}
// SetPublishStatus sets the publish status for a feed ('hold', 'pass', 'skip')
// If status is 'pass', the account handle is also set (auto-derived if empty)
func (c *Crawler) SetPublishStatus(feedURL, status, account string) error {
@@ -1099,39 +563,3 @@ func (c *Crawler) GetPublishCandidates(limit int) ([]*Feed, error) {
return scanFeeds(rows)
}
// GetUnpublishedItems returns items for a feed that haven't been published yet
func (c *Crawler) GetUnpublishedItems(feedURL string, limit int) ([]*Item, error) {
rows, err := c.db.Query(`
SELECT id, feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls, tags,
published_at, published_uri
FROM items
WHERE feed_url = $1 AND published_at IS NULL
ORDER BY pub_date ASC
LIMIT $2
`, feedURL, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanItems(rows)
}
// MarkItemPublished marks an item as published with the given URI
func (c *Crawler) MarkItemPublished(itemID int64, uri string) error {
_, err := c.db.Exec(`
UPDATE items SET published_at = NOW(), published_uri = $1 WHERE id = $2
`, uri, itemID)
return err
}
// GetUnpublishedItemCount returns the count of unpublished items for a feed
func (c *Crawler) GetUnpublishedItemCount(feedURL string) (int, error) {
var count int
err := c.db.QueryRow(`
SELECT COUNT(*) FROM items WHERE feed_url = $1 AND published_at IS NULL
`, feedURL).Scan(&count)
return count, err
}