-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
90 lines (77 loc) · 2.6 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main
import (
"context"
"encoding/json"
"log"
"sync"
"time"
"github.com/Techeer-Hogwarts/crawling/cmd"
"github.com/Techeer-Hogwarts/crawling/cmd/blogs"
"github.com/Techeer-Hogwarts/crawling/cmd/rabbitmq"
"github.com/Techeer-Hogwarts/crawling/cmd/redisInteractor"
"github.com/rabbitmq/amqp091-go"
"github.com/redis/go-redis/v9"
)
func main() {
newConnection := rabbitmq.NewConnection()
defer newConnection.Close()
newRedisClient, err := redisInteractor.NewClient()
if err != nil {
log.Fatalf("Failed to create a new Redis client: %v", err)
}
redisContext := context.Background()
newChannel := rabbitmq.NewChannel(newConnection)
defer newChannel.Close()
queue1 := rabbitmq.DeclareQueue(newChannel, "crawl_queue")
consumedMessages := rabbitmq.ConsumeMessages(newChannel, queue1.Name)
const numWorkers = 5 // Number of concurrent workers
var wg sync.WaitGroup
wg.Add(numWorkers) // Add the number of workers to the WaitGroup
for i := 0; i < numWorkers; i++ {
log.Printf("Starting worker %d", i)
go func(workerID int) { // Each worker is a goroutine
defer wg.Done()
for msg := range consumedMessages { // Continuously process messages
log.Printf("Worker %d processing message: %s", workerID, msg.Body)
processMessage(msg, redisContext, newRedisClient)
}
}(i)
}
wg.Wait()
}
func processMessage(msg amqp091.Delivery, redisContext context.Context, newRedisClient *redis.Client) {
var blogRequest blogs.BlogRequest
err := json.Unmarshal(msg.Body, &blogRequest)
if err != nil {
log.Printf("Failed to unmarshal message: %v", err)
return
}
// signUp_blog_fetch, blogs_daily_update, shared_post_fetch
crawlingType := msg.Type
url, host, err := cmd.ValidateAndSanitizeURL(string(blogRequest.Data))
if err != nil {
log.Printf("Invalid or unsafe URL: %v", err)
return
}
log.Printf("Processing URL: %v", url)
blogRequest.UserID = cmd.ExtractUserID(msg.MessageId)
blogPosts, err := cmd.CrawlBlog(url, host, crawlingType)
if err != nil {
log.Printf("Failed to crawl blog: %v, userID: %v", err, blogRequest.UserID)
return
}
blogPosts.UserID = blogRequest.UserID
// responseJSON, _ := json.MarshalIndent(blogPosts, "", " ")
// fmt.Println(string(responseJSON))
err = redisInteractor.SetData(redisContext, newRedisClient, msg.MessageId, blogPosts)
if err != nil {
log.Printf("Failed to set data: %v", err)
return
}
err = redisInteractor.NotifyCompletion(redisContext, newRedisClient, msg.MessageId, crawlingType)
if err != nil {
log.Printf("Failed to notify completion: %v", err)
return
}
log.Printf("Successfully processed and stored blog data. Time: %v", time.Now())
}