Files
primal 3b5c4ddeb2 Add item status (pass/fail) support
- Filter unpublished items by status='pass' in publish loop
- Add DeletePost function to remove posts from PDS
- Add /api/setItemStatus endpoint to mark items pass/fail
- When marking fail, deletes the Bluesky post if it was published

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-02 15:46:27 -05:00

212 lines
5.1 KiB
Go

package main
import (
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/1440news/commons"
)
func main() {
fmt.Println("Starting publisher service...")
// Open database connection
db, err := commons.OpenDatabase("")
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
defer db.Close()
// Load PDS configuration
pdsHost := os.Getenv("PDS_HOST")
if pdsHost == "" {
pdsHost = "https://pds.1440.news"
}
feedPassword := os.Getenv("FEED_PASSWORD")
if feedPassword == "" {
feedPassword = "feed1440!"
}
// Create publisher service
pub := NewPublisherService(db, pdsHost, feedPassword)
// Start HTTP server
go func() {
if err := pub.StartServer("0.0.0.0:4322"); err != nil {
log.Fatalf("Server error: %v", err)
}
}()
// Start publish loop
go pub.StartPublishLoop()
fmt.Printf("Publisher service running (PDS: %s)\n", pdsHost)
// Wait for shutdown signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
fmt.Println("Shutting down publisher service...")
}
// loadEnvFile loads environment variables from a file (e.g., pds.env)
func loadEnvFile(filename string) {
data, err := os.ReadFile(filename)
if err != nil {
return // File doesn't exist or can't be read
}
for _, line := range strings.Split(string(data), "\n") {
line = strings.TrimSpace(line)
if line == "" || strings.HasPrefix(line, "#") {
continue
}
parts := strings.SplitN(line, "=", 2)
if len(parts) == 2 {
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
if os.Getenv(key) == "" {
os.Setenv(key, value)
}
}
}
}
// PublisherService manages publishing items to AT Protocol PDS
type PublisherService struct {
db *commons.DB
publisher *Publisher
pdsHost string
feedPassword string
}
// NewPublisherService creates a new publisher service
func NewPublisherService(db *commons.DB, pdsHost, feedPassword string) *PublisherService {
return &PublisherService{
db: db,
publisher: NewPublisher(pdsHost),
pdsHost: pdsHost,
feedPassword: feedPassword,
}
}
// StartPublishLoop runs the automatic publishing loop
func (s *PublisherService) StartPublishLoop() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
// Run immediately on start
s.publishPendingItems()
for range ticker.C {
s.publishPendingItems()
}
}
// publishPendingItems publishes unpublished items for all enabled feeds
func (s *PublisherService) publishPendingItems() {
// Get all feeds with publish_status = 'pass'
rows, err := s.db.Query(`
SELECT url, publish_account
FROM feeds
WHERE publish_status = 'pass' AND publish_account IS NOT NULL
`)
if err != nil {
fmt.Printf("Publish loop: query error: %v\n", err)
return
}
defer rows.Close()
var feeds []struct {
URL string
Account string
}
for rows.Next() {
var url, account string
if err := rows.Scan(&url, &account); err != nil {
continue
}
feeds = append(feeds, struct {
URL string
Account string
}{url, account})
}
if len(feeds) == 0 {
return
}
fmt.Printf("Publish loop: checking %d feeds\n", len(feeds))
for _, feed := range feeds {
s.publishFeedItems(feed.URL, feed.Account)
}
}
// publishFeedItems publishes unpublished items for a single feed
func (s *PublisherService) publishFeedItems(feedURL, account string) {
// Get unpublished items (limit to 5 per cycle to avoid overwhelming)
items, err := s.GetUnpublishedItems(feedURL, 5)
if err != nil || len(items) == 0 {
return
}
// Authenticate with the feed account
session, err := s.publisher.CreateSession(account, s.feedPassword)
if err != nil {
fmt.Printf("Publish: auth failed for %s: %v\n", account, err)
return
}
for _, item := range items {
uri, err := s.publisher.PublishItem(session, item)
if err != nil {
fmt.Printf("Publish: failed to publish %s: %v\n", item.GUID, err)
continue
}
if err := s.MarkItemPublished(item.FeedURL, item.GUID, uri); err != nil {
fmt.Printf("Publish: failed to mark published %s: %v\n", item.GUID, err)
}
// Small delay between posts
time.Sleep(1100 * time.Millisecond)
}
}
// GetUnpublishedItems returns unpublished items for a feed (only items with status='pass')
func (s *PublisherService) GetUnpublishedItems(feedURL string, limit int) ([]*commons.Item, error) {
rows, err := s.db.Query(`
SELECT feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
enclosure_url, enclosure_type, enclosure_length, image_urls, tags,
status, published_at, published_uri
FROM items
WHERE feed_url = $1 AND published_at IS NULL AND status = 'pass'
ORDER BY pub_date ASC
LIMIT $2
`, feedURL, limit)
if err != nil {
return nil, err
}
defer rows.Close()
return scanItems(rows)
}
// MarkItemPublished marks an item as published with the given URI
func (s *PublisherService) MarkItemPublished(feedURL, guid, uri string) error {
_, err := s.db.Exec(`
UPDATE items SET published_at = NOW(), published_uri = $1 WHERE feed_url = $2 AND guid = $3
`, uri, feedURL, guid)
return err
}