package main import ( "encoding/json" "fmt" "io" "net/http" "os" "strings" "sync" "sync/atomic" "time" "golang.org/x/net/html" ) type Crawler struct { MaxDepth int MaxPagesPerHost int Timeout time.Duration UserAgent string visited sync.Map feedsMu sync.Mutex client *http.Client hostsProcessed int32 feedsChecked int32 startTime time.Time db *DB displayedCrawlRate int displayedCheckRate int domainsImported int32 cachedStats *DashboardStats cachedAllDomains []DomainStat statsMu sync.RWMutex } func NewCrawler(connString string) (*Crawler, error) { db, err := OpenDatabase(connString) if err != nil { return nil, fmt.Errorf("failed to open database: %v", err) } return &Crawler{ MaxDepth: 10, MaxPagesPerHost: 10, Timeout: 10 * time.Second, UserAgent: "FeedCrawler/1.0", startTime: time.Now(), db: db, client: &http.Client{ Timeout: 10 * time.Second, CheckRedirect: func(req *http.Request, via []*http.Request) error { if len(via) >= 10 { return fmt.Errorf("stopped after 10 redirects") } return nil }, }, }, nil } func (c *Crawler) Close() error { if c.db != nil { fmt.Println("Closing database...") return c.db.Close() } return nil } // StartStatsLoop updates cached stats every 10 seconds func (c *Crawler) StartStatsLoop() { for { c.UpdateStats() time.Sleep(10 * time.Second) } } // StartCleanupLoop runs item cleanup once per week func (c *Crawler) StartCleanupLoop() { for { deleted, err := c.CleanupOldItems() if err != nil { fmt.Printf("Cleanup error: %v\n", err) } else if deleted > 0 { fmt.Printf("Cleanup: removed %d old items\n", deleted) } time.Sleep(7 * 24 * time.Hour) } } // StartMaintenanceLoop performs periodic database maintenance func (c *Crawler) StartMaintenanceLoop() { vacuumTicker := time.NewTicker(24 * time.Hour) analyzeTicker := time.NewTicker(1 * time.Hour) defer vacuumTicker.Stop() defer analyzeTicker.Stop() for { select { case <-analyzeTicker.C: // Update statistics for query planner if _, err := c.db.Exec("ANALYZE"); err != nil { fmt.Printf("ANALYZE error: %v\n", err) } case <-vacuumTicker.C: // Reclaim dead tuple space (VACUUM is lighter than VACUUM FULL) fmt.Println("Running VACUUM...") if _, err := c.db.Exec("VACUUM"); err != nil { fmt.Printf("VACUUM error: %v\n", err) } else { fmt.Println("VACUUM complete") } } } } // StartPublishLoop automatically publishes unpublished items for approved feeds // Grabs up to 50 items sorted by discovered_at, publishes one per second, then reloops func (c *Crawler) StartPublishLoop() { // Load PDS credentials from environment or pds.env file pdsHost := os.Getenv("PDS_HOST") pdsAdminPassword := os.Getenv("PDS_ADMIN_PASSWORD") if pdsHost == "" || pdsAdminPassword == "" { if data, err := os.ReadFile("pds.env"); err == nil { for _, line := range strings.Split(string(data), "\n") { line = strings.TrimSpace(line) if strings.HasPrefix(line, "#") || line == "" { continue } parts := strings.SplitN(line, "=", 2) if len(parts) == 2 { key := strings.TrimSpace(parts[0]) value := strings.TrimSpace(parts[1]) switch key { case "PDS_HOST": pdsHost = value case "PDS_ADMIN_PASSWORD": pdsAdminPassword = value } } } } } if pdsHost == "" || pdsAdminPassword == "" { fmt.Println("Publish loop: PDS credentials not configured, skipping") return } fmt.Printf("Publish loop: starting with PDS %s\n", pdsHost) feedPassword := "feed1440!" // Cache sessions per account sessions := make(map[string]*PDSSession) publisher := NewPublisher(pdsHost) // Refresh existing account profiles on startup c.RefreshAllProfiles(publisher, feedPassword) for { // Get up to 50 unpublished items from approved feeds, sorted by discovered_at ASC items, err := c.GetAllUnpublishedItems(50) if err != nil { fmt.Printf("Publish loop error: %v\n", err) time.Sleep(1 * time.Second) continue } if len(items) == 0 { time.Sleep(1 * time.Second) continue } // Publish one item per second for _, item := range items { // Get or create session for this feed's account account := c.getAccountForFeed(item.FeedURL) if account == "" { time.Sleep(1 * time.Second) continue } session, ok := sessions[account] if !ok { // Try to log in session, err = publisher.CreateSession(account, feedPassword) if err != nil { // Account might not exist - try to create it inviteCode, err := publisher.CreateInviteCode(pdsAdminPassword, 1) if err != nil { fmt.Printf("Publish: failed to create invite for %s: %v\n", account, err) time.Sleep(1 * time.Second) continue } email := account + "@1440.news" session, err = publisher.CreateAccount(account, email, feedPassword, inviteCode) if err != nil { fmt.Printf("Publish: failed to create account %s: %v\n", account, err) time.Sleep(1 * time.Second) continue } fmt.Printf("Publish: created account %s\n", account) c.db.Exec("UPDATE feeds SET publish_account = $1 WHERE url = $2", account, item.FeedURL) // Set up profile for new account feedInfo := c.getFeedInfo(item.FeedURL) if feedInfo != nil { displayName := feedInfo.Title if displayName == "" { displayName = account } // Build description with feed URL (strip HTML tags) description := stripHTML(feedInfo.Description) if description == "" { description = "News feed via 1440.news" } // Add feed URL as first line of description feedURLFull := "https://" + item.FeedURL description = feedURLFull + "\n\n" + description // Truncate if needed if len(displayName) > 64 { displayName = displayName[:61] + "..." } if len(description) > 256 { description = description[:253] + "..." } // Fetch and upload favicon as avatar var avatar *BlobRef faviconSource := feedInfo.SiteURL if faviconSource == "" { // Fallback to deriving from feed URL faviconSource = feedInfo.SourceHost } if faviconSource != "" { faviconURL := publisher.FetchFavicon(faviconSource) if faviconURL != "" { avatar = publisher.fetchAndUploadImage(session, faviconURL) } } if err := publisher.UpdateProfile(session, displayName, description, avatar); err != nil { fmt.Printf("Publish: failed to set profile for %s: %v\n", account, err) } else { fmt.Printf("Publish: set profile for %s\n", account) } // Have directory account follow this new account if err := publisher.FollowAsDirectory(session.DID); err != nil { fmt.Printf("Publish: directory follow failed for %s: %v\n", account, err) } else { fmt.Printf("Publish: directory now following %s\n", account) } } } sessions[account] = session } // Shorten URLs before publishing itemToPublish := item if item.Link != "" { if shortURL, err := c.GetShortURLForPost(item.Link, &item.ID, item.FeedURL); err == nil { fmt.Printf("Publish: shortened %s -> %s\n", item.Link[:min(40, len(item.Link))], shortURL) itemToPublish.Link = shortURL } else { fmt.Printf("Publish: short URL failed for %s: %v\n", item.Link[:min(40, len(item.Link))], err) } } // Publish the item uri, err := publisher.PublishItem(session, &itemToPublish) if err != nil { fmt.Printf("Publish: failed item %d: %v\n", item.ID, err) // Clear session cache on auth errors if strings.Contains(err.Error(), "401") || strings.Contains(err.Error(), "auth") { delete(sessions, account) } } else { c.MarkItemPublished(item.ID, uri) fmt.Printf("Publish: %s -> %s\n", item.Title[:min(40, len(item.Title))], account) } time.Sleep(1 * time.Second) } time.Sleep(1 * time.Second) } } // getAccountForFeed returns the publish account for a feed URL func (c *Crawler) getAccountForFeed(feedURL string) string { var account *string err := c.db.QueryRow(` SELECT publish_account FROM feeds WHERE url = $1 AND publish_status = 'pass' AND status = 'active' `, feedURL).Scan(&account) if err != nil || account == nil || *account == "" { // Derive handle from feed URL return DeriveHandleFromFeed(feedURL) } return *account } // FeedInfo holds basic feed metadata for profile setup type FeedInfo struct { Title string Description string SiteURL string SourceHost string } // getFeedInfo returns feed metadata for profile setup func (c *Crawler) getFeedInfo(feedURL string) *FeedInfo { var title, description, siteURL, sourceHost *string err := c.db.QueryRow(` SELECT title, description, site_url, source_host FROM feeds WHERE url = $1 `, feedURL).Scan(&title, &description, &siteURL, &sourceHost) if err != nil { return nil } return &FeedInfo{ Title: StringValue(title), Description: StringValue(description), SiteURL: StringValue(siteURL), SourceHost: StringValue(sourceHost), } } // RefreshAllProfiles updates profiles for all existing accounts with feed URLs func (c *Crawler) RefreshAllProfiles(publisher *Publisher, feedPassword string) { rows, err := c.db.Query(` SELECT url, title, description, site_url, source_host, publish_account FROM feeds WHERE publish_account IS NOT NULL AND publish_account <> '' `) if err != nil { fmt.Printf("RefreshProfiles: query error: %v\n", err) return } defer rows.Close() for rows.Next() { var feedURL, account string var title, description, siteURL, sourceHost *string if err := rows.Scan(&feedURL, &title, &description, &siteURL, &sourceHost, &account); err != nil { continue } // Login to account session, err := publisher.CreateSession(account, feedPassword) if err != nil { fmt.Printf("RefreshProfiles: login failed for %s: %v\n", account, err) continue } // Build profile displayName := StringValue(title) if displayName == "" { displayName = account } desc := stripHTML(StringValue(description)) if desc == "" { desc = "News feed via 1440.news" } // Add feed URL as first line feedURLFull := "https://" + feedURL desc = feedURLFull + "\n\n" + desc // Truncate if needed if len(displayName) > 64 { displayName = displayName[:61] + "..." } if len(desc) > 256 { desc = desc[:253] + "..." } // Fetch and upload favicon as avatar var avatar *BlobRef faviconSource := StringValue(siteURL) if faviconSource == "" { // Fallback to source host faviconSource = StringValue(sourceHost) } if faviconSource != "" { faviconURL := publisher.FetchFavicon(faviconSource) if faviconURL != "" { avatar = publisher.fetchAndUploadImage(session, faviconURL) } } if err := publisher.UpdateProfile(session, displayName, desc, avatar); err != nil { fmt.Printf("RefreshProfiles: update failed for %s: %v\n", account, err) } else { fmt.Printf("RefreshProfiles: updated %s\n", account) } } } // GetAllUnpublishedItems returns unpublished items from all approved feeds func (c *Crawler) GetAllUnpublishedItems(limit int) ([]Item, error) { rows, err := c.db.Query(` SELECT i.id, i.feed_url, i.guid, i.title, i.link, i.description, i.content, i.author, i.pub_date, i.discovered_at, i.image_urls, i.tags, i.enclosure_url, i.enclosure_type, i.enclosure_length FROM items i JOIN feeds f ON i.feed_url = f.url WHERE f.publish_status = 'pass' AND f.status = 'active' AND i.published_at IS NULL ORDER BY i.discovered_at ASC LIMIT $1 `, limit) if err != nil { return nil, err } defer rows.Close() var items []Item for rows.Next() { var item Item var guid, title, link, description, content, author, imageURLsJSON, tagsJSON *string var pubDate, discoveredAt *time.Time var enclosureURL, enclosureType *string var enclosureLength *int64 err := rows.Scan(&item.ID, &item.FeedURL, &guid, &title, &link, &description, &content, &author, &pubDate, &discoveredAt, &imageURLsJSON, &tagsJSON, &enclosureURL, &enclosureType, &enclosureLength) if err != nil { continue } item.GUID = StringValue(guid) item.Title = StringValue(title) item.Link = StringValue(link) item.Description = StringValue(description) item.Content = StringValue(content) item.Author = StringValue(author) item.PubDate = TimeValue(pubDate) item.DiscoveredAt = TimeValue(discoveredAt) // Parse image URLs from JSON array if imageURLsJSON != nil && *imageURLsJSON != "" { json.Unmarshal([]byte(*imageURLsJSON), &item.ImageURLs) } // Parse tags from JSON array if tagsJSON != nil && *tagsJSON != "" { json.Unmarshal([]byte(*tagsJSON), &item.Tags) } // Parse enclosure if enclosureURL != nil && *enclosureURL != "" { item.Enclosure = &Enclosure{ URL: *enclosureURL, Type: StringValue(enclosureType), } if enclosureLength != nil { item.Enclosure.Length = *enclosureLength } } items = append(items, item) } return items, nil } // StartDomainCheckLoop runs HEAD requests on approved domains to verify they're reachable func (c *Crawler) StartDomainCheckLoop() { numWorkers := 100 // Buffered channel for domain work workChan := make(chan *Domain, 100) // Start workers for i := 0; i < numWorkers; i++ { go func() { for domain := range workChan { // Do HEAD request to verify domain is reachable checkErr := c.checkDomain(domain.Host) errStr := "" if checkErr != nil { errStr = checkErr.Error() } if err := c.markDomainChecked(domain.Host, errStr); err != nil { fmt.Printf("Error marking domain %s as checked: %v\n", domain.Host, err) } } }() } const fetchSize = 100 for { domains, err := c.GetDomainsToCheck(fetchSize) if err != nil { fmt.Printf("Error fetching domains to check: %v\n", err) } if len(domains) == 0 { time.Sleep(1 * time.Second) continue } fmt.Printf("%s domain-check: %d domains to verify\n", time.Now().Format("15:04:05"), len(domains)) for _, domain := range domains { workChan <- domain } time.Sleep(1 * time.Second) } } // checkDomain performs a HEAD request to verify a domain is reachable func (c *Crawler) checkDomain(host string) error { url := "https://" + host req, err := http.NewRequest("HEAD", url, nil) if err != nil { return err } req.Header.Set("User-Agent", c.UserAgent) resp, err := c.client.Do(req) if err != nil { // Try HTTP fallback url = "http://" + host req, err = http.NewRequest("HEAD", url, nil) if err != nil { return err } req.Header.Set("User-Agent", c.UserAgent) resp, err = c.client.Do(req) if err != nil { return err } } defer resp.Body.Close() if resp.StatusCode >= 400 { return fmt.Errorf("HTTP %d", resp.StatusCode) } return nil } // StartCrawlLoop runs the domain crawling loop independently (crawls checked domains) func (c *Crawler) StartCrawlLoop() { numWorkers := 100 // Buffered channel for domain work workChan := make(chan *Domain, 100) // Start workers for i := 0; i < numWorkers; i++ { go func() { for domain := range workChan { feedsFound, crawlErr := c.crawlHost(domain.Host) errStr := "" if crawlErr != nil { errStr = crawlErr.Error() } if err := c.markDomainCrawled(domain.Host, feedsFound, errStr); err != nil { fmt.Printf("Error marking domain %s as crawled: %v\n", domain.Host, err) } } }() } const fetchSize = 100 for { domains, err := c.GetDomainsToCrawl(fetchSize) if err != nil { fmt.Printf("Error fetching domains to crawl: %v\n", err) } if len(domains) == 0 { c.displayedCrawlRate = 0 time.Sleep(1 * time.Second) continue } fmt.Printf("%s crawl: %d domains to crawl\n", time.Now().Format("15:04:05"), len(domains)) for _, domain := range domains { workChan <- domain } time.Sleep(1 * time.Second) } } // StartCheckLoop runs the feed checking loop independently func (c *Crawler) StartCheckLoop() { numWorkers := 100 // Buffered channel for feed work workChan := make(chan *Feed, 100) // Start workers for i := 0; i < numWorkers; i++ { go func() { for feed := range workChan { c.CheckFeed(feed) } }() } const fetchSize = 100 for { feeds, err := c.GetFeedsDueForCheck(fetchSize) if err != nil { fmt.Printf("Error fetching feeds: %v\n", err) } if len(feeds) == 0 { c.displayedCheckRate = 0 time.Sleep(1 * time.Second) continue } fmt.Printf("%s check: %d feeds to check\n", time.Now().Format("15:04:05"), len(feeds)) for _, feed := range feeds { workChan <- feed } time.Sleep(1 * time.Second) } } func (c *Crawler) crawlHost(host string) (feedsFound int, err error) { atomic.AddInt32(&c.hostsProcessed, 1) localVisited := make(map[string]bool) pagesVisited := 0 // Try HTTPS first, fall back to HTTP if no pages were visited c.crawlPage("https://"+host, host, 0, localVisited, &pagesVisited) if pagesVisited == 0 { c.crawlPage("http://"+host, host, 0, localVisited, &pagesVisited) } // Count feeds found for this specific host feedsFound, _ = c.GetFeedCountByHost(host) if pagesVisited == 0 { return feedsFound, fmt.Errorf("could not connect") } return feedsFound, nil } func (c *Crawler) crawlPage(pageURL, sourceHost string, depth int, localVisited map[string]bool, pagesVisited *int) { if *pagesVisited >= c.MaxPagesPerHost || depth > c.MaxDepth { return } if localVisited[pageURL] { return } if _, visited := c.visited.LoadOrStore(pageURL, true); visited { return } localVisited[pageURL] = true *pagesVisited++ body, contentType, headers, err := c.fetchPage(pageURL) if err != nil { return } if c.isFeedContent(body, contentType) { c.processFeed(pageURL, sourceHost, body, headers) return } doc, err := html.Parse(strings.NewReader(body)) if err != nil { return } feedLinks := c.extractFeedLinks(doc, pageURL) for _, fl := range feedLinks { c.addFeed(fl.URL, fl.Type, sourceHost, pageURL) } anchorFeeds := c.extractAnchorFeeds(doc, pageURL) for _, fl := range anchorFeeds { c.addFeed(fl.URL, fl.Type, sourceHost, pageURL) } if depth < c.MaxDepth { links := c.extractLinks(doc, pageURL) for _, link := range links { if shouldCrawl(link, pageURL) { c.crawlPage(link, sourceHost, depth+1, localVisited, pagesVisited) } } } } func (c *Crawler) fetchPage(pageURL string) (string, string, http.Header, error) { req, err := http.NewRequest("GET", pageURL, nil) if err != nil { return "", "", nil, err } req.Header.Set("User-Agent", c.UserAgent) resp, err := c.client.Do(req) if err != nil { return "", "", nil, err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return "", "", nil, fmt.Errorf("status code: %d", resp.StatusCode) } bodyBytes, err := io.ReadAll(resp.Body) if err != nil { return "", "", nil, err } contentType := resp.Header.Get("Content-Type") return string(bodyBytes), contentType, resp.Header, nil }