package main import ( "encoding/json" "fmt" "net/http" "os" "strings" "github.com/1440news/commons" ) // StartServer starts the HTTP server for the publisher API func (s *PublisherService) StartServer(addr string) error { mux := http.NewServeMux() // Health check mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("OK")) }) // Publishing APIs mux.HandleFunc("/api/enablePublish", s.handleEnablePublish) mux.HandleFunc("/api/disablePublish", s.handleDisablePublish) mux.HandleFunc("/api/setPublishStatus", s.handleSetPublishStatus) mux.HandleFunc("/api/publishEnabled", s.handlePublishEnabled) mux.HandleFunc("/api/publishDenied", s.handlePublishDenied) mux.HandleFunc("/api/publishCandidates", s.handlePublishCandidates) mux.HandleFunc("/api/unpublishedItems", s.handleUnpublishedItems) mux.HandleFunc("/api/testPublish", s.handleTestPublish) mux.HandleFunc("/api/publishFeed", s.handlePublishFeed) mux.HandleFunc("/api/publishFeedFull", s.handlePublishFeedFull) mux.HandleFunc("/api/createAccount", s.handleCreateAccount) mux.HandleFunc("/api/updateProfile", s.handleUpdateProfile) mux.HandleFunc("/api/deriveHandle", s.handleDeriveHandle) mux.HandleFunc("/api/resetAllPublishing", s.handleResetAllPublishing) mux.HandleFunc("/api/refreshProfiles", s.handleRefreshProfiles) mux.HandleFunc("/api/setItemStatus", s.handleSetItemStatus) fmt.Printf("Publisher API running at http://%s\n", addr) return http.ListenAndServe(addr, mux) } func (s *PublisherService) handleEnablePublish(w http.ResponseWriter, r *http.Request) { feedURL := r.URL.Query().Get("url") account := r.URL.Query().Get("account") if feedURL == "" { http.Error(w, "url parameter required", http.StatusBadRequest) return } feedURL = commons.NormalizeURL(feedURL) if account == "" { account = commons.DeriveHandleFromFeed(feedURL) if account == "" { http.Error(w, "could not derive account handle from URL", http.StatusBadRequest) return } } if err := s.SetPublishStatus(feedURL, "pass", account); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } count, _ := s.GetUnpublishedItemCount(feedURL) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "status": "pass", "url": feedURL, "account": account, "unpublished_items": count, }) } func (s *PublisherService) handleDisablePublish(w http.ResponseWriter, r *http.Request) { feedURL := r.URL.Query().Get("url") if feedURL == "" { http.Error(w, "url parameter required", http.StatusBadRequest) return } feedURL = commons.NormalizeURL(feedURL) if err := s.SetPublishStatus(feedURL, "skip", ""); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "status": "skip", "url": feedURL, }) } func (s *PublisherService) handleSetPublishStatus(w http.ResponseWriter, r *http.Request) { feedURL := r.URL.Query().Get("url") status := r.URL.Query().Get("status") account := r.URL.Query().Get("account") if feedURL == "" { http.Error(w, "url parameter required", http.StatusBadRequest) return } if status != "pass" && status != "skip" && status != "hold" && status != "drop" { http.Error(w, "status must be 'pass', 'hold', 'skip', or 'drop'", http.StatusBadRequest) return } feedURL = commons.NormalizeURL(feedURL) result := map[string]interface{}{ "url": feedURL, "status": status, } if status == "pass" { if account == "" { account = commons.DeriveHandleFromFeed(feedURL) } result["account"] = account } if err := s.SetPublishStatus(feedURL, status, account); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(result) } func (s *PublisherService) handlePublishEnabled(w http.ResponseWriter, r *http.Request) { feeds, err := s.GetFeedsByPublishStatus("pass") if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } type FeedPublishInfo struct { URL string `json:"url"` Title string `json:"title"` Account string `json:"account"` UnpublishedCount int `json:"unpublished_count"` } var result []FeedPublishInfo for _, f := range feeds { count, _ := s.GetUnpublishedItemCount(f.URL) result = append(result, FeedPublishInfo{ URL: f.URL, Title: f.Title, Account: f.PublishAccount, UnpublishedCount: count, }) } if result == nil { result = []FeedPublishInfo{} } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(result) } func (s *PublisherService) handlePublishDenied(w http.ResponseWriter, r *http.Request) { feeds, err := s.GetFeedsByPublishStatus("skip") if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } type FeedDeniedInfo struct { URL string `json:"url"` Title string `json:"title"` SourceHost string `json:"source_host"` } var result []FeedDeniedInfo for _, f := range feeds { result = append(result, FeedDeniedInfo{ URL: f.URL, Title: f.Title, SourceHost: f.DomainHost, }) } if result == nil { result = []FeedDeniedInfo{} } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(result) } func (s *PublisherService) handlePublishCandidates(w http.ResponseWriter, r *http.Request) { limit := 50 if l := r.URL.Query().Get("limit"); l != "" { fmt.Sscanf(l, "%d", &limit) if limit > 200 { limit = 200 } } feeds, err := s.GetPublishCandidates(limit) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } type CandidateInfo struct { URL string `json:"url"` Title string `json:"title"` Category string `json:"category"` SourceHost string `json:"source_host"` ItemCount int `json:"item_count"` DerivedHandle string `json:"derived_handle"` } var result []CandidateInfo for _, f := range feeds { result = append(result, CandidateInfo{ URL: f.URL, Title: f.Title, Category: f.Category, SourceHost: f.DomainHost, ItemCount: f.ItemCount, DerivedHandle: commons.DeriveHandleFromFeed(f.URL), }) } if result == nil { result = []CandidateInfo{} } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(result) } func (s *PublisherService) handleUnpublishedItems(w http.ResponseWriter, r *http.Request) { feedURL := r.URL.Query().Get("url") if feedURL == "" { http.Error(w, "url parameter required", http.StatusBadRequest) return } limit := 50 if l := r.URL.Query().Get("limit"); l != "" { fmt.Sscanf(l, "%d", &limit) if limit > 200 { limit = 200 } } items, err := s.GetUnpublishedItems(feedURL, limit) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } if items == nil { items = []*commons.Item{} } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(items) } func (s *PublisherService) handleTestPublish(w http.ResponseWriter, r *http.Request) { feedURL := r.URL.Query().Get("feedUrl") guidParam := r.URL.Query().Get("guid") handle := r.URL.Query().Get("handle") password := r.URL.Query().Get("password") pdsHost := r.URL.Query().Get("pds") if feedURL == "" || guidParam == "" { http.Error(w, "feedUrl and guid parameters required", http.StatusBadRequest) return } if handle == "" || password == "" { http.Error(w, "handle and password parameters required", http.StatusBadRequest) return } if pdsHost == "" { pdsHost = s.pdsHost } item, err := s.GetItemByGUID(feedURL, guidParam) if err != nil { http.Error(w, "item not found: "+err.Error(), http.StatusNotFound) return } publisher := NewPublisher(pdsHost) session, err := publisher.CreateSession(handle, password) if err != nil { http.Error(w, "auth failed: "+err.Error(), http.StatusUnauthorized) return } uri, err := publisher.PublishItem(session, item) if err != nil { http.Error(w, "publish failed: "+err.Error(), http.StatusInternalServerError) return } s.MarkItemPublished(item.FeedURL, item.GUID, uri) rkeyTime := item.PubDate if rkeyTime.IsZero() { rkeyTime = item.DiscoveredAt } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "status": "published", "uri": uri, "feedUrl": item.FeedURL, "guid": item.GUID, "title": item.Title, "rkey": GenerateRkey(item.GUID, rkeyTime), }) } func (s *PublisherService) handlePublishFeed(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "status": "not_implemented", }) } func (s *PublisherService) handlePublishFeedFull(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "status": "not_implemented", }) } func (s *PublisherService) handleCreateAccount(w http.ResponseWriter, r *http.Request) { handle := r.URL.Query().Get("handle") email := r.URL.Query().Get("email") password := r.URL.Query().Get("password") pdsHost := r.URL.Query().Get("pds") inviteCode := r.URL.Query().Get("inviteCode") pdsAdminPassword := r.URL.Query().Get("pdsAdminPassword") if handle == "" || password == "" { http.Error(w, "handle and password parameters required", http.StatusBadRequest) return } if pdsHost == "" { pdsHost = s.pdsHost } if email == "" { email = handle + "@1440.news" } publisher := NewPublisher(pdsHost) if pdsAdminPassword != "" && inviteCode == "" { code, err := publisher.CreateInviteCode(pdsAdminPassword, 1) if err != nil { http.Error(w, "create invite failed: "+err.Error(), http.StatusInternalServerError) return } inviteCode = code } session, err := publisher.CreateAccount(handle, email, password, inviteCode) if err != nil { http.Error(w, "create account failed: "+err.Error(), http.StatusInternalServerError) return } if err := publisher.FollowAsDirectory(session.DID); err != nil { fmt.Printf("API: directory follow failed for %s: %v\n", handle, err) } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "status": "created", "handle": session.Handle, "did": session.DID, }) } func (s *PublisherService) handleUpdateProfile(w http.ResponseWriter, r *http.Request) { handle := r.URL.Query().Get("handle") password := r.URL.Query().Get("password") pdsHost := r.URL.Query().Get("pds") displayName := r.URL.Query().Get("displayName") description := r.URL.Query().Get("description") faviconURL := r.URL.Query().Get("faviconUrl") if handle == "" || password == "" { http.Error(w, "handle and password parameters required", http.StatusBadRequest) return } if pdsHost == "" { pdsHost = s.pdsHost } publisher := NewPublisher(pdsHost) session, err := publisher.CreateSession(handle, password) if err != nil { http.Error(w, "auth failed: "+err.Error(), http.StatusUnauthorized) return } var avatar *BlobRef if faviconURL != "" { faviconData, mimeType, err := FetchFaviconBytes(faviconURL) if err != nil { http.Error(w, "fetch favicon failed: "+err.Error(), http.StatusBadRequest) return } avatar, err = publisher.UploadBlob(session, faviconData, mimeType) if err != nil { http.Error(w, "upload favicon failed: "+err.Error(), http.StatusInternalServerError) return } } if err := publisher.UpdateProfile(session, displayName, description, avatar); err != nil { http.Error(w, "update profile failed: "+err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "status": "updated", "handle": handle, "displayName": displayName, "hasAvatar": avatar != nil, }) } func (s *PublisherService) handleDeriveHandle(w http.ResponseWriter, r *http.Request) { feedURL := r.URL.Query().Get("url") if feedURL == "" { http.Error(w, "url parameter required", http.StatusBadRequest) return } handle := commons.DeriveHandleFromFeed(feedURL) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "url": feedURL, "handle": handle, }) } func (s *PublisherService) handleResetAllPublishing(w http.ResponseWriter, r *http.Request) { accountsCleared, err := s.db.Exec(`UPDATE feeds SET publish_account = NULL WHERE publish_account IS NOT NULL`) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } itemsCleared, err := s.db.Exec(`UPDATE items SET published_at = NULL WHERE published_at IS NOT NULL`) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } statusReset, err := s.db.Exec(`UPDATE feeds SET publish_status = 'hold' WHERE publish_status IS NOT NULL`) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "success": true, "accounts_cleared": accountsCleared, "items_cleared": itemsCleared, "status_reset": statusReset, }) } func (s *PublisherService) handleRefreshProfiles(w http.ResponseWriter, r *http.Request) { password := r.URL.Query().Get("password") pdsHost := r.URL.Query().Get("pds") if password == "" { http.Error(w, "password parameter required", http.StatusBadRequest) return } if pdsHost == "" { pdsHost = s.pdsHost } publisher := NewPublisher(pdsHost) s.RefreshAllProfiles(publisher, password) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(map[string]interface{}{ "success": true, "message": "profiles refreshed", }) } // handleSetItemStatus sets an item's status to 'pass' or 'fail' // If setting to 'fail' and the item was published, deletes the Bluesky post func (s *PublisherService) handleSetItemStatus(w http.ResponseWriter, r *http.Request) { feedURL := r.URL.Query().Get("feedUrl") guid := r.URL.Query().Get("guid") status := r.URL.Query().Get("status") if feedURL == "" || guid == "" { http.Error(w, "feedUrl and guid parameters required", http.StatusBadRequest) return } if status != "pass" && status != "fail" { http.Error(w, "status must be 'pass' or 'fail'", http.StatusBadRequest) return } // Get the item to check if it was published item, err := s.GetItemByGUID(feedURL, guid) if err != nil { http.Error(w, "item not found: "+err.Error(), http.StatusNotFound) return } result := map[string]interface{}{ "feedUrl": feedURL, "guid": guid, "status": status, } // If setting to fail and item was published, delete the post if status == "fail" && item.PublishedUri != "" { // Get the feed's publish account to authenticate var account string err := s.db.QueryRow(`SELECT publish_account FROM feeds WHERE url = $1`, feedURL).Scan(&account) if err != nil || account == "" { http.Error(w, "could not find publish account for feed", http.StatusInternalServerError) return } // Authenticate and delete the post session, err := s.publisher.CreateSession(account, s.feedPassword) if err != nil { http.Error(w, "auth failed: "+err.Error(), http.StatusUnauthorized) return } if err := s.publisher.DeletePost(session, item.PublishedUri); err != nil { // Log but don't fail - the post might already be deleted fmt.Printf("Warning: failed to delete post %s: %v\n", item.PublishedUri, err) result["delete_warning"] = err.Error() } else { result["post_deleted"] = true } // Clear the published fields _, err = s.db.Exec(`UPDATE items SET published_at = NULL, published_uri = NULL WHERE feed_url = $1 AND guid = $2`, feedURL, guid) if err != nil { fmt.Printf("Warning: failed to clear published fields: %v\n", err) } } // Update the item status _, err = s.db.Exec(`UPDATE items SET status = $1 WHERE feed_url = $2 AND guid = $3`, status, feedURL, guid) if err != nil { http.Error(w, "failed to update status: "+err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(result) } // Database helper methods func (s *PublisherService) SetPublishStatus(feedURL, status, account string) error { feedURL = commons.NormalizeURL(feedURL) if status == "pass" && account == "" { account = commons.DeriveHandleFromFeed(feedURL) } _, err := s.db.Exec(` UPDATE feeds SET publish_status = $1, publish_account = $2 WHERE url = $3 `, status, commons.NullableString(account), feedURL) return err } func (s *PublisherService) GetFeedsByPublishStatus(status string) ([]*commons.Feed, error) { rows, err := s.db.Query(` SELECT url, type, category, title, description, language, site_url, discovered_at, last_checked_at, next_check_at, last_build_date, etag, last_modified, status, last_error, last_error_at, source_url, domain_host, domain_tld, item_count, oldest_item_date, newest_item_date, no_update, publish_status, publish_account FROM feeds WHERE publish_status = $1 `, status) if err != nil { return nil, err } defer rows.Close() return scanFeeds(rows) } func (s *PublisherService) GetPublishCandidates(limit int) ([]*commons.Feed, error) { rows, err := s.db.Query(` SELECT url, type, category, title, description, language, site_url, discovered_at, last_checked_at, next_check_at, last_build_date, etag, last_modified, status, last_error, last_error_at, source_url, domain_host, domain_tld, item_count, oldest_item_date, newest_item_date, no_update, publish_status, publish_account FROM feeds WHERE publish_status = 'hold' AND item_count > 0 AND status = 'pass' ORDER BY item_count DESC LIMIT $1 `, limit) if err != nil { return nil, err } defer rows.Close() return scanFeeds(rows) } func (s *PublisherService) GetUnpublishedItemCount(feedURL string) (int, error) { var count int err := s.db.QueryRow(` SELECT COUNT(*) FROM items WHERE feed_url = $1 AND published_at IS NULL AND status = 'pass' `, feedURL).Scan(&count) return count, err } func (s *PublisherService) GetItemByGUID(feedURL, guid string) (*commons.Item, error) { items, err := s.db.Query(` SELECT feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at, enclosure_url, enclosure_type, enclosure_length, image_urls, tags, status, published_at, published_uri FROM items WHERE feed_url = $1 AND guid = $2 `, feedURL, guid) if err != nil { return nil, err } defer items.Close() result, err := scanItems(items) if err != nil { return nil, err } if len(result) == 0 { return nil, fmt.Errorf("item not found") } return result[0], nil } func (s *PublisherService) RefreshAllProfiles(publisher *Publisher, feedPassword string) { rows, err := s.db.Query(` SELECT url, title, description, site_url, domain_host as source_host, publish_account FROM feeds WHERE publish_account IS NOT NULL AND publish_account <> '' `) if err != nil { fmt.Printf("RefreshProfiles: query error: %v\n", err) return } defer rows.Close() for rows.Next() { var feedURL, account string var title, description, siteURL, sourceHost *string if err := rows.Scan(&feedURL, &title, &description, &siteURL, &sourceHost, &account); err != nil { continue } session, err := publisher.CreateSession(account, feedPassword) if err != nil { fmt.Printf("RefreshProfiles: login failed for %s: %v\n", account, err) continue } displayName := commons.StringValue(title) if displayName == "" { displayName = account } desc := stripHTML(commons.StringValue(description)) if desc == "" { desc = "News feed via 1440.news" } feedURLFull := "https://" + feedURL desc = feedURLFull + "\n\n" + desc if len(displayName) > 64 { displayName = displayName[:61] + "..." } if len(desc) > 256 { desc = desc[:253] + "..." } var avatar *BlobRef faviconSource := commons.StringValue(siteURL) if faviconSource == "" { faviconSource = commons.StringValue(sourceHost) } if faviconSource != "" { faviconData, mimeType, err := FetchFaviconBytes(faviconSource) if err == nil && len(faviconData) > 0 { avatar, _ = publisher.UploadBlob(session, faviconData, mimeType) } } if err := publisher.UpdateProfile(session, displayName, desc, avatar); err != nil { fmt.Printf("RefreshProfiles: update failed for %s: %v\n", account, err) } else { fmt.Printf("RefreshProfiles: updated %s\n", account) } } } // scanFeeds helper func scanFeeds(rows interface{ Next() bool; Scan(...interface{}) error; Err() error }) ([]*commons.Feed, error) { var feeds []*commons.Feed for rows.Next() { feed := &commons.Feed{} var feedType, category, title, description, language, siteURL *string var lastCheckedAt, nextCheckAt, lastBuildDate *interface{} var etag, lastModified, lastError, sourceURL, domainTLD *string var lastErrorAt *interface{} var oldestItemDate, newestItemDate *interface{} var publishStatus, publishAccount *string err := rows.Scan( &feed.URL, &feedType, &category, &title, &description, &language, &siteURL, &feed.DiscoveredAt, &lastCheckedAt, &nextCheckAt, &lastBuildDate, &etag, &lastModified, &feed.Status, &lastError, &lastErrorAt, &sourceURL, &feed.DomainHost, &domainTLD, &feed.ItemCount, &oldestItemDate, &newestItemDate, &feed.NoUpdate, &publishStatus, &publishAccount, ) if err != nil { continue } feed.Type = commons.StringValue(feedType) feed.Category = commons.StringValue(category) feed.Title = commons.StringValue(title) feed.Description = commons.StringValue(description) feed.Language = commons.StringValue(language) feed.SiteURL = commons.StringValue(siteURL) feed.ETag = commons.StringValue(etag) feed.LastModified = commons.StringValue(lastModified) feed.LastError = commons.StringValue(lastError) feed.SourceURL = commons.StringValue(sourceURL) feed.DomainTLD = commons.StringValue(domainTLD) feed.PublishStatus = commons.StringValue(publishStatus) feed.PublishAccount = commons.StringValue(publishAccount) feeds = append(feeds, feed) } return feeds, rows.Err() } func init() { // Try to load pds.env if it exists if data, err := os.ReadFile("pds.env"); err == nil { for _, line := range strings.Split(string(data), "\n") { line = strings.TrimSpace(line) if line == "" || strings.HasPrefix(line, "#") { continue } parts := strings.SplitN(line, "=", 2) if len(parts) == 2 { key := strings.TrimSpace(parts[0]) value := strings.TrimSpace(parts[1]) if os.Getenv(key) == "" { os.Setenv(key, value) } } } } }