Add graceful shutdown for goroutines
- Add shutdownCh channel to signal goroutines to stop - Check IsShuttingDown() in all main loops - Wait 2 seconds for goroutines to finish before closing DB Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
+42
@@ -33,6 +33,7 @@ type Crawler struct {
|
|||||||
cachedStats *DashboardStats
|
cachedStats *DashboardStats
|
||||||
cachedAllDomains []DomainStat
|
cachedAllDomains []DomainStat
|
||||||
statsMu sync.RWMutex
|
statsMu sync.RWMutex
|
||||||
|
shutdownCh chan struct{} // closed on shutdown to signal goroutines
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCrawler(connString string) (*Crawler, error) {
|
func NewCrawler(connString string) (*Crawler, error) {
|
||||||
@@ -48,6 +49,7 @@ func NewCrawler(connString string) (*Crawler, error) {
|
|||||||
UserAgent: "FeedCrawler/1.0",
|
UserAgent: "FeedCrawler/1.0",
|
||||||
startTime: time.Now(),
|
startTime: time.Now(),
|
||||||
db: db,
|
db: db,
|
||||||
|
shutdownCh: make(chan struct{}),
|
||||||
client: &http.Client{
|
client: &http.Client{
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
||||||
@@ -60,7 +62,24 @@ func NewCrawler(connString string) (*Crawler, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsShuttingDown returns true if shutdown has been initiated
|
||||||
|
func (c *Crawler) IsShuttingDown() bool {
|
||||||
|
select {
|
||||||
|
case <-c.shutdownCh:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Crawler) Close() error {
|
func (c *Crawler) Close() error {
|
||||||
|
// Signal all goroutines to stop
|
||||||
|
close(c.shutdownCh)
|
||||||
|
|
||||||
|
// Give goroutines time to finish current operations
|
||||||
|
fmt.Println("Waiting for goroutines to finish...")
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
if c.db != nil {
|
if c.db != nil {
|
||||||
fmt.Println("Closing database...")
|
fmt.Println("Closing database...")
|
||||||
return c.db.Close()
|
return c.db.Close()
|
||||||
@@ -71,6 +90,9 @@ func (c *Crawler) Close() error {
|
|||||||
// StartStatsLoop updates cached stats every 10 seconds
|
// StartStatsLoop updates cached stats every 10 seconds
|
||||||
func (c *Crawler) StartStatsLoop() {
|
func (c *Crawler) StartStatsLoop() {
|
||||||
for {
|
for {
|
||||||
|
if c.IsShuttingDown() {
|
||||||
|
return
|
||||||
|
}
|
||||||
c.UpdateStats()
|
c.UpdateStats()
|
||||||
time.Sleep(10 * time.Second)
|
time.Sleep(10 * time.Second)
|
||||||
}
|
}
|
||||||
@@ -79,6 +101,9 @@ func (c *Crawler) StartStatsLoop() {
|
|||||||
// StartCleanupLoop runs item cleanup once per week
|
// StartCleanupLoop runs item cleanup once per week
|
||||||
func (c *Crawler) StartCleanupLoop() {
|
func (c *Crawler) StartCleanupLoop() {
|
||||||
for {
|
for {
|
||||||
|
if c.IsShuttingDown() {
|
||||||
|
return
|
||||||
|
}
|
||||||
deleted, err := c.CleanupOldItems()
|
deleted, err := c.CleanupOldItems()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Cleanup error: %v\n", err)
|
fmt.Printf("Cleanup error: %v\n", err)
|
||||||
@@ -161,6 +186,10 @@ func (c *Crawler) StartPublishLoop() {
|
|||||||
c.RefreshAllProfiles(publisher, feedPassword)
|
c.RefreshAllProfiles(publisher, feedPassword)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
if c.IsShuttingDown() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Get up to 50 unpublished items from approved feeds, sorted by discovered_at ASC
|
// Get up to 50 unpublished items from approved feeds, sorted by discovered_at ASC
|
||||||
items, err := c.GetAllUnpublishedItems(50)
|
items, err := c.GetAllUnpublishedItems(50)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -176,6 +205,9 @@ func (c *Crawler) StartPublishLoop() {
|
|||||||
|
|
||||||
// Publish one item per second
|
// Publish one item per second
|
||||||
for _, item := range items {
|
for _, item := range items {
|
||||||
|
if c.IsShuttingDown() {
|
||||||
|
return
|
||||||
|
}
|
||||||
// Get or create session for this feed's account
|
// Get or create session for this feed's account
|
||||||
account := c.getAccountForFeed(item.FeedURL)
|
account := c.getAccountForFeed(item.FeedURL)
|
||||||
if account == "" {
|
if account == "" {
|
||||||
@@ -532,6 +564,11 @@ func (c *Crawler) StartDomainLoop() {
|
|||||||
|
|
||||||
const fetchSize = 1000
|
const fetchSize = 1000
|
||||||
for {
|
for {
|
||||||
|
if c.IsShuttingDown() {
|
||||||
|
close(workChan)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
domains, err := c.GetDomainsToProcess(fetchSize)
|
domains, err := c.GetDomainsToProcess(fetchSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Error fetching domains to process: %v\n", err)
|
fmt.Printf("Error fetching domains to process: %v\n", err)
|
||||||
@@ -581,6 +618,11 @@ func (c *Crawler) StartFeedCheckLoop() {
|
|||||||
|
|
||||||
const fetchSize = 100
|
const fetchSize = 100
|
||||||
for {
|
for {
|
||||||
|
if c.IsShuttingDown() {
|
||||||
|
close(workChan)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
feeds, err := c.GetFeedsDueForCheck(fetchSize)
|
feeds, err := c.GetFeedsDueForCheck(fetchSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Error fetching feeds: %v\n", err)
|
fmt.Printf("Error fetching feeds: %v\n", err)
|
||||||
|
|||||||
Reference in New Issue
Block a user