-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathqueue.clj
148 lines (127 loc) · 5.02 KB
/
queue.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
(ns tech.queue
(:require [tech.io.url :as url]
[tech.queue.protocols :as q-proto]
[tech.queue.providers :as providers]
[tech.queue.worker :as worker]
[tech.queue.time :as time]
[clojure.core.async.impl.protocols :as async-protocols]
[clojure.core.async :as async]
[clojure.tools.logging :as log]
[tech.parallel :as parallel]))
(def ^:dynamic *static-queue-options* {})
(def ^:dynamic *url-parts->queue*
(fn [url-parts options]
(let [path (:path url-parts)
provider (q-proto/url-parts->provider
url-parts
(merge *static-queue-options* options))]
(q-proto/get-or-create-queue! provider
(keyword (last path))
(merge *static-queue-options* options)))))
(defmacro ^:private lookup-provider
[url options & body]
`(let [~'url-parts (url/url->parts ~url)
~'queue (*url-parts->queue* ~'url-parts ~options)]
~@body))
(defn url->queue
[url options]
(lookup-provider
url options
queue))
(defn put!
"Place a msg in a queue"
[url msg & [options]]
(lookup-provider
url options
(when-not (map? msg)
(throw (ex-info "Queue messages must be maps" {:msg msg})))
(q-proto/put! queue msg options)))
(defn take!
"Retrive a task from a queue"
[url & [options]]
(lookup-provider
url options
(q-proto/take! queue options)))
(defn task->msg
"Conversion of a task to a message"
[url task & [options]]
(lookup-provider
url options
(q-proto/task->msg queue task)))
(defn msg->birthdate
"Queues always place birthdates on messages if they are not already on them."
[url msg & [options]]
(time/msg->birthdate msg))
(defn complete!
"Complete a task. Tasks, once taken, are only temporarily invisible.
In order to remove a task from a queue it must be successfully completed."
[url task & [options]]
(lookup-provider
url options
(q-proto/complete! queue task options)))
(defn stats
"Retrieve information about the queue. Returns a map containing:
:in-flight (required, integer) - Number of items in flight in the queue."
[url & [options]]
(lookup-provider
url options
(q-proto/stats queue options)))
(defn delete-queue!
"Delete a queue. Not supported across all queue providers."
[url & [options]]
(let [url-parts (url/url->parts url)
path (:path url)
provider (q-proto/url-parts->provider url-parts
(merge *static-queue-options* options))]
(q-proto/delete-queue! provider (keyword (last path)) options)))
(defn queue->task-seq-details
"Convert a queue to a task sequence. Processors must call complete! on each task
in order to remove it from the queue."
[url {:keys [receive-message-wait-time-seconds]
:or {receive-message-wait-time-seconds 10}
:as options}]
(lookup-provider
url options
(let [input-chan (async/chan)
reader-thread (async/thread
(try
(->> (repeatedly #(let [take-result (take! url options)]
(when-not (= :timeout take-result)
(async/>!! input-chan take-result))
(async-protocols/closed? input-chan)))
;;Take while true, then terminate thread
;;Note that an exception will terminate the take thread. This is by design.
(take-while not)
last)
(catch Throwable e
(println e)
(log/error e)))
(log/warn "queue thread exiting"))
input-seq (parallel/async-channel-to-lazy-seq input-chan)]
{:shutdown-fn (fn []
(async/close! input-chan)
(async/<!! reader-thread))
:input-seq input-seq})))
(defn pmap-queue
"Create an infinite sequence of results of reading from the queue. Tasks that fail to process
will be reprocessed according to the visibility timeout member of options. See protocols.clj.
Result will be infinite unless the queue itself gets deleted."
[url num-threads processing-fn & [options]]
(let [{:keys [shutdown-fn input-seq]} (queue->task-seq-details url options)
process-fn (fn [task]
(try
(let [result (processing-fn (task->msg url task options))]
(complete! url task options)
result)
(catch Throwable e
(shutdown-fn)
(throw e))))]
(parallel/queued-pmap num-threads process-fn input-seq)))
(defn seq->queue!
"Place a sequence into a queue. Blocking operation, exceptions are propagated to
callers."
[url options data-seq]
(->> data-seq
(map #(put! url % options))
dorun)
:ok)