package main import ( "fmt" "log" "os" "os/signal" "strings" "syscall" "time" "github.com/1440news/commons" ) func main() { fmt.Println("Starting publisher service...") // Open database connection db, err := commons.OpenDatabase("") if err != nil { log.Fatalf("Failed to connect to database: %v", err) } defer db.Close() // Load PDS configuration pdsHost := os.Getenv("PDS_HOST") if pdsHost == "" { pdsHost = "https://pds.1440.news" } feedPassword := os.Getenv("FEED_PASSWORD") if feedPassword == "" { feedPassword = "feed1440!" } // Create publisher service pub := NewPublisherService(db, pdsHost, feedPassword) // Start HTTP server go func() { if err := pub.StartServer("0.0.0.0:4322"); err != nil { log.Fatalf("Server error: %v", err) } }() // Start publish loop go pub.StartPublishLoop() fmt.Printf("Publisher service running (PDS: %s)\n", pdsHost) // Wait for shutdown signal sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) <-sigChan fmt.Println("Shutting down publisher service...") } // loadEnvFile loads environment variables from a file (e.g., pds.env) func loadEnvFile(filename string) { data, err := os.ReadFile(filename) if err != nil { return // File doesn't exist or can't be read } for _, line := range strings.Split(string(data), "\n") { line = strings.TrimSpace(line) if line == "" || strings.HasPrefix(line, "#") { continue } parts := strings.SplitN(line, "=", 2) if len(parts) == 2 { key := strings.TrimSpace(parts[0]) value := strings.TrimSpace(parts[1]) if os.Getenv(key) == "" { os.Setenv(key, value) } } } } // PublisherService manages publishing items to AT Protocol PDS type PublisherService struct { db *commons.DB publisher *Publisher pdsHost string feedPassword string } // NewPublisherService creates a new publisher service func NewPublisherService(db *commons.DB, pdsHost, feedPassword string) *PublisherService { return &PublisherService{ db: db, publisher: NewPublisher(pdsHost), pdsHost: pdsHost, feedPassword: feedPassword, } } // StartPublishLoop runs the automatic publishing loop func (s *PublisherService) StartPublishLoop() { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() // Run immediately on start s.publishPendingItems() for range ticker.C { s.publishPendingItems() } } // publishPendingItems publishes unpublished items for all enabled feeds func (s *PublisherService) publishPendingItems() { // Get all feeds with publish_status = 'pass' rows, err := s.db.Query(` SELECT url, publish_account FROM feeds WHERE publish_status = 'pass' AND publish_account IS NOT NULL `) if err != nil { fmt.Printf("Publish loop: query error: %v\n", err) return } defer rows.Close() var feeds []struct { URL string Account string } for rows.Next() { var url, account string if err := rows.Scan(&url, &account); err != nil { continue } feeds = append(feeds, struct { URL string Account string }{url, account}) } if len(feeds) == 0 { return } fmt.Printf("Publish loop: checking %d feeds\n", len(feeds)) for _, feed := range feeds { s.publishFeedItems(feed.URL, feed.Account) } } // publishFeedItems publishes unpublished items for a single feed func (s *PublisherService) publishFeedItems(feedURL, account string) { // Get unpublished items (limit to 5 per cycle to avoid overwhelming) items, err := s.GetUnpublishedItems(feedURL, 5) if err != nil || len(items) == 0 { return } // Authenticate with the feed account session, err := s.publisher.CreateSession(account, s.feedPassword) if err != nil { fmt.Printf("Publish: auth failed for %s: %v\n", account, err) return } for _, item := range items { uri, err := s.publisher.PublishItem(session, item) if err != nil { fmt.Printf("Publish: failed to publish %s: %v\n", item.GUID, err) continue } if err := s.MarkItemPublished(item.FeedURL, item.GUID, uri); err != nil { fmt.Printf("Publish: failed to mark published %s: %v\n", item.GUID, err) } // Small delay between posts time.Sleep(1100 * time.Millisecond) } } // GetUnpublishedItems returns unpublished items for a feed (only items with status='pass') func (s *PublisherService) GetUnpublishedItems(feedURL string, limit int) ([]*commons.Item, error) { rows, err := s.db.Query(` SELECT feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at, enclosure_url, enclosure_type, enclosure_length, image_urls, tags, status, published_at, published_uri FROM items WHERE feed_url = $1 AND published_at IS NULL AND status = 'pass' ORDER BY pub_date ASC LIMIT $2 `, feedURL, limit) if err != nil { return nil, err } defer rows.Close() return scanItems(rows) } // MarkItemPublished marks an item as published with the given URI func (s *PublisherService) MarkItemPublished(feedURL, guid, uri string) error { _, err := s.db.Exec(` UPDATE items SET published_at = NOW(), published_uri = $1 WHERE feed_url = $2 AND guid = $3 `, uri, feedURL, guid) return err }