- Rename module from publisher to publish - Change all shared.* references to commons.* - Use commons.Item, commons.Feed, etc from shared library Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
212 lines
5.0 KiB
Go
212 lines
5.0 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/1440news/commons"
|
|
)
|
|
|
|
func main() {
|
|
fmt.Println("Starting publisher service...")
|
|
|
|
// Open database connection
|
|
db, err := commons.OpenDatabase("")
|
|
if err != nil {
|
|
log.Fatalf("Failed to connect to database: %v", err)
|
|
}
|
|
defer db.Close()
|
|
|
|
// Load PDS configuration
|
|
pdsHost := os.Getenv("PDS_HOST")
|
|
if pdsHost == "" {
|
|
pdsHost = "https://pds.1440.news"
|
|
}
|
|
|
|
feedPassword := os.Getenv("FEED_PASSWORD")
|
|
if feedPassword == "" {
|
|
feedPassword = "feed1440!"
|
|
}
|
|
|
|
// Create publisher service
|
|
pub := NewPublisherService(db, pdsHost, feedPassword)
|
|
|
|
// Start HTTP server
|
|
go func() {
|
|
if err := pub.StartServer("0.0.0.0:4322"); err != nil {
|
|
log.Fatalf("Server error: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Start publish loop
|
|
go pub.StartPublishLoop()
|
|
|
|
fmt.Printf("Publisher service running (PDS: %s)\n", pdsHost)
|
|
|
|
// Wait for shutdown signal
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
|
<-sigChan
|
|
|
|
fmt.Println("Shutting down publisher service...")
|
|
}
|
|
|
|
// loadEnvFile loads environment variables from a file (e.g., pds.env)
|
|
func loadEnvFile(filename string) {
|
|
data, err := os.ReadFile(filename)
|
|
if err != nil {
|
|
return // File doesn't exist or can't be read
|
|
}
|
|
|
|
for _, line := range strings.Split(string(data), "\n") {
|
|
line = strings.TrimSpace(line)
|
|
if line == "" || strings.HasPrefix(line, "#") {
|
|
continue
|
|
}
|
|
|
|
parts := strings.SplitN(line, "=", 2)
|
|
if len(parts) == 2 {
|
|
key := strings.TrimSpace(parts[0])
|
|
value := strings.TrimSpace(parts[1])
|
|
if os.Getenv(key) == "" {
|
|
os.Setenv(key, value)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// PublisherService manages publishing items to AT Protocol PDS
|
|
type PublisherService struct {
|
|
db *commons.DB
|
|
publisher *Publisher
|
|
pdsHost string
|
|
feedPassword string
|
|
}
|
|
|
|
// NewPublisherService creates a new publisher service
|
|
func NewPublisherService(db *commons.DB, pdsHost, feedPassword string) *PublisherService {
|
|
return &PublisherService{
|
|
db: db,
|
|
publisher: NewPublisher(pdsHost),
|
|
pdsHost: pdsHost,
|
|
feedPassword: feedPassword,
|
|
}
|
|
}
|
|
|
|
// StartPublishLoop runs the automatic publishing loop
|
|
func (s *PublisherService) StartPublishLoop() {
|
|
ticker := time.NewTicker(5 * time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
// Run immediately on start
|
|
s.publishPendingItems()
|
|
|
|
for range ticker.C {
|
|
s.publishPendingItems()
|
|
}
|
|
}
|
|
|
|
// publishPendingItems publishes unpublished items for all enabled feeds
|
|
func (s *PublisherService) publishPendingItems() {
|
|
// Get all feeds with publish_status = 'pass'
|
|
rows, err := s.db.Query(`
|
|
SELECT url, publish_account
|
|
FROM feeds
|
|
WHERE publish_status = 'pass' AND publish_account IS NOT NULL
|
|
`)
|
|
if err != nil {
|
|
fmt.Printf("Publish loop: query error: %v\n", err)
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
var feeds []struct {
|
|
URL string
|
|
Account string
|
|
}
|
|
|
|
for rows.Next() {
|
|
var url, account string
|
|
if err := rows.Scan(&url, &account); err != nil {
|
|
continue
|
|
}
|
|
feeds = append(feeds, struct {
|
|
URL string
|
|
Account string
|
|
}{url, account})
|
|
}
|
|
|
|
if len(feeds) == 0 {
|
|
return
|
|
}
|
|
|
|
fmt.Printf("Publish loop: checking %d feeds\n", len(feeds))
|
|
|
|
for _, feed := range feeds {
|
|
s.publishFeedItems(feed.URL, feed.Account)
|
|
}
|
|
}
|
|
|
|
// publishFeedItems publishes unpublished items for a single feed
|
|
func (s *PublisherService) publishFeedItems(feedURL, account string) {
|
|
// Get unpublished items (limit to 5 per cycle to avoid overwhelming)
|
|
items, err := s.GetUnpublishedItems(feedURL, 5)
|
|
if err != nil || len(items) == 0 {
|
|
return
|
|
}
|
|
|
|
// Authenticate with the feed account
|
|
session, err := s.publisher.CreateSession(account, s.feedPassword)
|
|
if err != nil {
|
|
fmt.Printf("Publish: auth failed for %s: %v\n", account, err)
|
|
return
|
|
}
|
|
|
|
for _, item := range items {
|
|
uri, err := s.publisher.PublishItem(session, item)
|
|
if err != nil {
|
|
fmt.Printf("Publish: failed to publish %s: %v\n", item.GUID, err)
|
|
continue
|
|
}
|
|
|
|
if err := s.MarkItemPublished(item.FeedURL, item.GUID, uri); err != nil {
|
|
fmt.Printf("Publish: failed to mark published %s: %v\n", item.GUID, err)
|
|
}
|
|
|
|
// Small delay between posts
|
|
time.Sleep(1100 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
// GetUnpublishedItems returns unpublished items for a feed
|
|
func (s *PublisherService) GetUnpublishedItems(feedURL string, limit int) ([]*commons.Item, error) {
|
|
rows, err := s.db.Query(`
|
|
SELECT feed_url, guid, title, link, description, content, author, pub_date, discovered_at, updated_at,
|
|
enclosure_url, enclosure_type, enclosure_length, image_urls, tags,
|
|
published_at, published_uri
|
|
FROM items
|
|
WHERE feed_url = $1 AND published_at IS NULL
|
|
ORDER BY pub_date ASC
|
|
LIMIT $2
|
|
`, feedURL, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
return scanItems(rows)
|
|
}
|
|
|
|
// MarkItemPublished marks an item as published with the given URI
|
|
func (s *PublisherService) MarkItemPublished(feedURL, guid, uri string) error {
|
|
_, err := s.db.Exec(`
|
|
UPDATE items SET published_at = NOW(), published_uri = $1 WHERE feed_url = $2 AND guid = $3
|
|
`, uri, feedURL, guid)
|
|
return err
|
|
}
|