Files
publish/server.go
primal 3b5c4ddeb2 Add item status (pass/fail) support
- Filter unpublished items by status='pass' in publish loop
- Add DeletePost function to remove posts from PDS
- Add /api/setItemStatus endpoint to mark items pass/fail
- When marking fail, deletes the Bluesky post if it was published

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 15:46:27 -05:00

800 lines
23 KiB
Go

package main
import (
"encoding/json"
"fmt"
"net/http"
"os"
"strings"
"github.com/1440news/commons"
)
// StartServer starts the HTTP server for the publisher API
func (s *PublisherService) StartServer(addr string) error {
mux := http.NewServeMux()
// Health check
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
})
// Publishing APIs
mux.HandleFunc("/api/enablePublish", s.handleEnablePublish)
mux.HandleFunc("/api/disablePublish", s.handleDisablePublish)
mux.HandleFunc("/api/setPublishStatus", s.handleSetPublishStatus)
mux.HandleFunc("/api/publishEnabled", s.handlePublishEnabled)
mux.HandleFunc("/api/publishDenied", s.handlePublishDenied)
mux.HandleFunc("/api/publishCandidates", s.handlePublishCandidates)
mux.HandleFunc("/api/unpublishedItems", s.handleUnpublishedItems)
mux.HandleFunc("/api/testPublish", s.handleTestPublish)
mux.HandleFunc("/api/publishFeed", s.handlePublishFeed)
mux.HandleFunc("/api/publishFeedFull", s.handlePublishFeedFull)
mux.HandleFunc("/api/createAccount", s.handleCreateAccount)
mux.HandleFunc("/api/updateProfile", s.handleUpdateProfile)
mux.HandleFunc("/api/deriveHandle", s.handleDeriveHandle)
mux.HandleFunc("/api/resetAllPublishing", s.handleResetAllPublishing)
mux.HandleFunc("/api/refreshProfiles", s.handleRefreshProfiles)
mux.HandleFunc("/api/setItemStatus", s.handleSetItemStatus)
fmt.Printf("Publisher API running at http://%s\n", addr)
return http.ListenAndServe(addr, mux)
}
func (s *PublisherService) handleEnablePublish(w http.ResponseWriter, r *http.Request) {
feedURL := r.URL.Query().Get("url")
account := r.URL.Query().Get("account")
if feedURL == "" {
http.Error(w, "url parameter required", http.StatusBadRequest)
return
}
feedURL = commons.NormalizeURL(feedURL)
if account == "" {
account = commons.DeriveHandleFromFeed(feedURL)
if account == "" {
http.Error(w, "could not derive account handle from URL", http.StatusBadRequest)
return
}
}
if err := s.SetPublishStatus(feedURL, "pass", account); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
count, _ := s.GetUnpublishedItemCount(feedURL)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "pass",
"url": feedURL,
"account": account,
"unpublished_items": count,
})
}
func (s *PublisherService) handleDisablePublish(w http.ResponseWriter, r *http.Request) {
feedURL := r.URL.Query().Get("url")
if feedURL == "" {
http.Error(w, "url parameter required", http.StatusBadRequest)
return
}
feedURL = commons.NormalizeURL(feedURL)
if err := s.SetPublishStatus(feedURL, "skip", ""); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "skip",
"url": feedURL,
})
}
func (s *PublisherService) handleSetPublishStatus(w http.ResponseWriter, r *http.Request) {
feedURL := r.URL.Query().Get("url")
status := r.URL.Query().Get("status")
account := r.URL.Query().Get("account")
if feedURL == "" {
http.Error(w, "url parameter required", http.StatusBadRequest)
return
}
if status != "pass" && status != "skip" && status != "hold" && status != "drop" {
http.Error(w, "status must be 'pass', 'hold', 'skip', or 'drop'", http.StatusBadRequest)
return
}
feedURL = commons.NormalizeURL(feedURL)
result := map[string]interface{}{
"url": feedURL,
"status": status,
}
if status == "pass" {
if account == "" {
account = commons.DeriveHandleFromFeed(feedURL)
}
result["account"] = account
}
if err := s.SetPublishStatus(feedURL, status, account); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
}
func (s *PublisherService) handlePublishEnabled(w http.ResponseWriter, r *http.Request) {
feeds, err := s.GetFeedsByPublishStatus("pass")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
type FeedPublishInfo struct {
URL string `json:"url"`
Title string `json:"title"`
Account string `json:"account"`
UnpublishedCount int `json:"unpublished_count"`
}
var result []FeedPublishInfo
for _, f := range feeds {
count, _ := s.GetUnpublishedItemCount(f.URL)
result = append(result, FeedPublishInfo{
URL: f.URL,
Title: f.Title,
Account: f.PublishAccount,
UnpublishedCount: count,
})
}
if result == nil {
result = []FeedPublishInfo{}
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
}
func (s *PublisherService) handlePublishDenied(w http.ResponseWriter, r *http.Request) {
feeds, err := s.GetFeedsByPublishStatus("skip")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
type FeedDeniedInfo struct {
URL string `json:"url"`
Title string `json:"title"`
SourceHost string `json:"source_host"`
}
var result []FeedDeniedInfo
for _, f := range feeds {
result = append(result, FeedDeniedInfo{
URL: f.URL,
Title: f.Title,
SourceHost: f.DomainHost,
})
}
if result == nil {
result = []FeedDeniedInfo{}
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
}
func (s *PublisherService) handlePublishCandidates(w http.ResponseWriter, r *http.Request) {
limit := 50
if l := r.URL.Query().Get("limit"); l != "" {
fmt.Sscanf(l, "%d", &limit)
if limit > 200 {
limit = 200
}
}
feeds, err := s.GetPublishCandidates(limit)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
type CandidateInfo struct {
URL string `json:"url"`
Title string `json:"title"`
Category string `json:"category"`
SourceHost string `json:"source_host"`
ItemCount int `json:"item_count"`
DerivedHandle string `json:"derived_handle"`
}
var result []CandidateInfo
for _, f := range feeds {
result = append(result, CandidateInfo{
URL: f.URL,
Title: f.Title,
Category: f.Category,
SourceHost: f.DomainHost,
ItemCount: f.ItemCount,
DerivedHandle: commons.DeriveHandleFromFeed(f.URL),
})
}
if result == nil {
result = []CandidateInfo{}
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
}
func (s *PublisherService) handleUnpublishedItems(w http.ResponseWriter, r *http.Request) {
feedURL := r.URL.Query().Get("url")
if feedURL == "" {
http.Error(w, "url parameter required", http.StatusBadRequest)
return
}
limit := 50
if l := r.URL.Query().Get("limit"); l != "" {
fmt.Sscanf(l, "%d", &limit)
if limit > 200 {
limit = 200
}
}
items, err := s.GetUnpublishedItems(feedURL, limit)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if items == nil {
items = []*commons.Item{}
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(items)
}
func (s *PublisherService) handleTestPublish(w http.ResponseWriter, r *http.Request) {
feedURL := r.URL.Query().Get("feedUrl")
guidParam := r.URL.Query().Get("guid")
handle := r.URL.Query().Get("handle")
password := r.URL.Query().Get("password")
pdsHost := r.URL.Query().Get("pds")
if feedURL == "" || guidParam == "" {
http.Error(w, "feedUrl and guid parameters required", http.StatusBadRequest)
return
}
if handle == "" || password == "" {
http.Error(w, "handle and password parameters required", http.StatusBadRequest)
return
}
if pdsHost == "" {
pdsHost = s.pdsHost
}
item, err := s.GetItemByGUID(feedURL, guidParam)
if err != nil {
http.Error(w, "item not found: "+err.Error(), http.StatusNotFound)
return
}
publisher := NewPublisher(pdsHost)
session, err := publisher.CreateSession(handle, password)
if err != nil {
http.Error(w, "auth failed: "+err.Error(), http.StatusUnauthorized)
return
}
uri, err := publisher.PublishItem(session, item)
if err != nil {
http.Error(w, "publish failed: "+err.Error(), http.StatusInternalServerError)
return
}
s.MarkItemPublished(item.FeedURL, item.GUID, uri)
rkeyTime := item.PubDate
if rkeyTime.IsZero() {
rkeyTime = item.DiscoveredAt
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "published",
"uri": uri,
"feedUrl": item.FeedURL,
"guid": item.GUID,
"title": item.Title,
"rkey": GenerateRkey(item.GUID, rkeyTime),
})
}
func (s *PublisherService) handlePublishFeed(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "not_implemented",
})
}
func (s *PublisherService) handlePublishFeedFull(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "not_implemented",
})
}
func (s *PublisherService) handleCreateAccount(w http.ResponseWriter, r *http.Request) {
handle := r.URL.Query().Get("handle")
email := r.URL.Query().Get("email")
password := r.URL.Query().Get("password")
pdsHost := r.URL.Query().Get("pds")
inviteCode := r.URL.Query().Get("inviteCode")
pdsAdminPassword := r.URL.Query().Get("pdsAdminPassword")
if handle == "" || password == "" {
http.Error(w, "handle and password parameters required", http.StatusBadRequest)
return
}
if pdsHost == "" {
pdsHost = s.pdsHost
}
if email == "" {
email = handle + "@1440.news"
}
publisher := NewPublisher(pdsHost)
if pdsAdminPassword != "" && inviteCode == "" {
code, err := publisher.CreateInviteCode(pdsAdminPassword, 1)
if err != nil {
http.Error(w, "create invite failed: "+err.Error(), http.StatusInternalServerError)
return
}
inviteCode = code
}
session, err := publisher.CreateAccount(handle, email, password, inviteCode)
if err != nil {
http.Error(w, "create account failed: "+err.Error(), http.StatusInternalServerError)
return
}
if err := publisher.FollowAsDirectory(session.DID); err != nil {
fmt.Printf("API: directory follow failed for %s: %v\n", handle, err)
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "created",
"handle": session.Handle,
"did": session.DID,
})
}
func (s *PublisherService) handleUpdateProfile(w http.ResponseWriter, r *http.Request) {
handle := r.URL.Query().Get("handle")
password := r.URL.Query().Get("password")
pdsHost := r.URL.Query().Get("pds")
displayName := r.URL.Query().Get("displayName")
description := r.URL.Query().Get("description")
faviconURL := r.URL.Query().Get("faviconUrl")
if handle == "" || password == "" {
http.Error(w, "handle and password parameters required", http.StatusBadRequest)
return
}
if pdsHost == "" {
pdsHost = s.pdsHost
}
publisher := NewPublisher(pdsHost)
session, err := publisher.CreateSession(handle, password)
if err != nil {
http.Error(w, "auth failed: "+err.Error(), http.StatusUnauthorized)
return
}
var avatar *BlobRef
if faviconURL != "" {
faviconData, mimeType, err := FetchFaviconBytes(faviconURL)
if err != nil {
http.Error(w, "fetch favicon failed: "+err.Error(), http.StatusBadRequest)
return
}
avatar, err = publisher.UploadBlob(session, faviconData, mimeType)
if err != nil {
http.Error(w, "upload favicon failed: "+err.Error(), http.StatusInternalServerError)
return
}
}
if err := publisher.UpdateProfile(session, displayName, description, avatar); err != nil {
http.Error(w, "update profile failed: "+err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "updated",
"handle": handle,
"displayName": displayName,
"hasAvatar": avatar != nil,
})
}
func (s *PublisherService) handleDeriveHandle(w http.ResponseWriter, r *http.Request) {
feedURL := r.URL.Query().Get("url")
if feedURL == "" {
http.Error(w, "url parameter required", http.StatusBadRequest)
return
}
handle := commons.DeriveHandleFromFeed(feedURL)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"url": feedURL,
"handle": handle,
})
}
func (s *PublisherService) handleResetAllPublishing(w http.ResponseWriter, r *http.Request) {
accountsCleared, err := s.db.Exec(`UPDATE feeds SET publish_account = NULL WHERE publish_account IS NOT NULL`)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
itemsCleared, err := s.db.Exec(`UPDATE items SET published_at = NULL WHERE published_at IS NOT NULL`)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
statusReset, err := s.db.Exec(`UPDATE feeds SET publish_status = 'hold' WHERE publish_status IS NOT NULL`)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"success": true,
"accounts_cleared": accountsCleared,
"items_cleared": itemsCleared,
"status_reset": statusReset,
})
}
func (s *PublisherService) handleRefreshProfiles(w http.ResponseWriter, r *http.Request) {
password := r.URL.Query().Get("password")
pdsHost := r.URL.Query().Get("pds")
if password == "" {
http.Error(w, "password parameter required", http.StatusBadRequest)
return
}
if pdsHost == "" {
pdsHost = s.pdsHost
}
publisher := NewPublisher(pdsHost)
s.RefreshAllProfiles(publisher, password)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"success": true,
"message": "profiles refreshed",
})
}
// handleSetItemStatus sets an item's status to 'pass' or 'fail'
// If setting to 'fail' and the item was published, deletes the Bluesky post
func (s *PublisherService) handleSetItemStatus(w http.ResponseWriter, r *http.Request) {
feedURL := r.URL.Query().Get("feedUrl")
guid := r.URL.Query().Get("guid")
status := r.URL.Query().Get("status")
if feedURL == "" || guid == "" {
http.Error(w, "feedUrl and guid parameters required", http.StatusBadRequest)
return
}
if status != "pass" && status != "fail" {
http.Error(w, "status must be 'pass' or 'fail'", http.StatusBadRequest)
return
}
// Get the item to check if it was published
item, err := s.GetItemByGUID(feedURL, guid)
if err != nil {
http.Error(w, "item not found: "+err.Error(), http.StatusNotFound)
return
}
result := map[string]interface{}{
"feedUrl": feedURL,
"guid": guid,
"status": status,
}
// If setting to fail and item was published, delete the post
if status == "fail" && item.PublishedUri != "" {
// Get the feed's publish account to authenticate
var account string
err := s.db.QueryRow(`SELECT publish_account FROM feeds WHERE url = $1`, feedURL).Scan(&account)
if err != nil || account == "" {
http.Error(w, "could not find publish account for feed", http.StatusInternalServerError)
return
}
// Authenticate and delete the post
session, err := s.publisher.CreateSession(account, s.feedPassword)
if err != nil {
http.Error(w, "auth failed: "+err.Error(), http.StatusUnauthorized)
return
}
if err := s.publisher.DeletePost(session, item.PublishedUri); err != nil {
// Log but don't fail - the post might already be deleted
fmt.Printf("Warning: failed to delete post %s: %v\n", item.PublishedUri, err)
result["delete_warning"] = err.Error()
} else {
result["post_deleted"] = true
}
// Clear the published fields
_, err = s.db.Exec(`UPDATE items SET published_at = NULL, published_uri = NULL WHERE feed_url = $1 AND guid = $2`, feedURL, guid)
if err != nil {
fmt.Printf("Warning: failed to clear published fields: %v\n", err)
}
}
// Update the item status
_, err = s.db.Exec(`UPDATE items SET status = $1 WHERE feed_url = $2 AND guid = $3`, status, feedURL, guid)
if err != nil {
http.Error(w, "failed to update status: "+err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
}
// Database helper methods
func (s *PublisherService) SetPublishStatus(feedURL, status, account string) error {
feedURL = commons.NormalizeURL(feedURL)
if status == "pass" && account == "" {
account = commons.DeriveHandleFromFeed(feedURL)
}
_, err := s.db.Exec(`
UPDATE feeds SET publish_status = $1, publish_account = $2 WHERE url = $3
`, status, commons.NullableString(account), feedURL)
return err
}
func (s *PublisherService) GetFeedsByPublishStatus(status string) ([]*commons.Feed, error) {
rows, err := s.db.Query(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_checked_at, next_check_at, last_build_date,
etag, last_modified,
status, last_error, last_error_at,
source_url, domain_host, domain_tld,
item_count, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds
WHERE publish_status = $1
`, status)
if err != nil {
return nil, err
}
defer rows.Close()
return scanFeeds(rows)
}
func (s *PublisherService) GetPublishCandidates(limit int) ([]*commons.Feed, error) {
rows, err := s.db.Query(`
SELECT url, type, category, title, description, language, site_url,
discovered_at, last_checked_at, next_check_at, last_build_date,
etag, last_modified,
status, last_error, last_error_at,
source_url, domain_host, domain_tld,
item_count, oldest_item_date, newest_item_date,
no_update,
publish_status, publish_account
FROM feeds
WHERE publish_status = 'hold' AND item_count > 0 AND status = 'pass'
ORDER BY item_count DESC
LIMIT $1
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanFeeds(rows)
}
func (s *PublisherService) GetUnpublishedItemCount(feedURL string) (int, error) {
var count int
err := s.db.QueryRow(`
SELECT COUNT(*) FROM items WHERE feed_url = $1 AND published_at IS NULL AND status = 'pass'
`, feedURL).Scan(&count)
return count, err
}
func (s *PublisherService) GetItemByGUID(feedURL, guid string) (*commons.Item, error) {
items, err := s.db.Query(`
SELECT feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls, tags,
status, published_at, published_uri
FROM items
WHERE feed_url = $1 AND guid = $2
`, feedURL, guid)
if err != nil {
return nil, err
}
defer items.Close()
result, err := scanItems(items)
if err != nil {
return nil, err
}
if len(result) == 0 {
return nil, fmt.Errorf("item not found")
}
return result[0], nil
}
func (s *PublisherService) RefreshAllProfiles(publisher *Publisher, feedPassword string) {
rows, err := s.db.Query(`
SELECT url, title, description, site_url, domain_host as source_host, publish_account
FROM feeds
WHERE publish_account IS NOT NULL AND publish_account <> ''
`)
if err != nil {
fmt.Printf("RefreshProfiles: query error: %v\n", err)
return
}
defer rows.Close()
for rows.Next() {
var feedURL, account string
var title, description, siteURL, sourceHost *string
if err := rows.Scan(&feedURL, &title, &description, &siteURL, &sourceHost, &account); err != nil {
continue
}
session, err := publisher.CreateSession(account, feedPassword)
if err != nil {
fmt.Printf("RefreshProfiles: login failed for %s: %v\n", account, err)
continue
}
displayName := commons.StringValue(title)
if displayName == "" {
displayName = account
}
desc := stripHTML(commons.StringValue(description))
if desc == "" {
desc = "News feed via 1440.news"
}
feedURLFull := "https://" + feedURL
desc = feedURLFull + "\n\n" + desc
if len(displayName) > 64 {
displayName = displayName[:61] + "..."
}
if len(desc) > 256 {
desc = desc[:253] + "..."
}
var avatar *BlobRef
faviconSource := commons.StringValue(siteURL)
if faviconSource == "" {
faviconSource = commons.StringValue(sourceHost)
}
if faviconSource != "" {
faviconData, mimeType, err := FetchFaviconBytes(faviconSource)
if err == nil && len(faviconData) > 0 {
avatar, _ = publisher.UploadBlob(session, faviconData, mimeType)
}
}
if err := publisher.UpdateProfile(session, displayName, desc, avatar); err != nil {
fmt.Printf("RefreshProfiles: update failed for %s: %v\n", account, err)
} else {
fmt.Printf("RefreshProfiles: updated %s\n", account)
}
}
}
// scanFeeds helper
func scanFeeds(rows interface{ Next() bool; Scan(...interface{}) error; Err() error }) ([]*commons.Feed, error) {
var feeds []*commons.Feed
for rows.Next() {
feed := &commons.Feed{}
var feedType, category, title, description, language, siteURL *string
var lastCheckedAt, nextCheckAt, lastBuildDate *interface{}
var etag, lastModified, lastError, sourceURL, domainTLD *string
var lastErrorAt *interface{}
var oldestItemDate, newestItemDate *interface{}
var publishStatus, publishAccount *string
err := rows.Scan(
&feed.URL, &feedType, &category, &title, &description, &language, &siteURL,
&feed.DiscoveredAt, &lastCheckedAt, &nextCheckAt, &lastBuildDate,
&etag, &lastModified,
&feed.Status, &lastError, &lastErrorAt,
&sourceURL, &feed.DomainHost, &domainTLD,
&feed.ItemCount, &oldestItemDate, &newestItemDate,
&feed.NoUpdate,
&publishStatus, &publishAccount,
)
if err != nil {
continue
}
feed.Type = commons.StringValue(feedType)
feed.Category = commons.StringValue(category)
feed.Title = commons.StringValue(title)
feed.Description = commons.StringValue(description)
feed.Language = commons.StringValue(language)
feed.SiteURL = commons.StringValue(siteURL)
feed.ETag = commons.StringValue(etag)
feed.LastModified = commons.StringValue(lastModified)
feed.LastError = commons.StringValue(lastError)
feed.SourceURL = commons.StringValue(sourceURL)
feed.DomainTLD = commons.StringValue(domainTLD)
feed.PublishStatus = commons.StringValue(publishStatus)
feed.PublishAccount = commons.StringValue(publishAccount)
feeds = append(feeds, feed)
}
return feeds, rows.Err()
}
func init() {
// Try to load pds.env if it exists
if data, err := os.ReadFile("pds.env"); err == nil {
for _, line := range strings.Split(string(data), "\n") {
line = strings.TrimSpace(line)
if line == "" || strings.HasPrefix(line, "#") {
continue
}
parts := strings.SplitN(line, "=", 2)
if len(parts) == 2 {
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
if os.Getenv(key) == "" {
os.Setenv(key, value)
}
}
}
}
}