diff --git a/src/onyx/peer/task_lifecycle.clj b/src/onyx/peer/task_lifecycle.clj index d830a2c21..3ec6550e3 100644 --- a/src/onyx/peer/task_lifecycle.clj +++ b/src/onyx/peer/task_lifecycle.clj @@ -441,19 +441,29 @@ (advance (update-event! state f))))) (defn recover-input [state] - (let [{:keys [recover-coordinates recovered?] :as context} (get-context state) - input-pipeline (get-input-pipeline state)] - (when-not recovered? - (let [event (get-event state) - stored (res/recover-input event recover-coordinates)] - (info (:onyx.core/log-prefix event) "Recover pipeline checkpoint:" stored) - (p/recover! input-pipeline (t/replica-version state) stored))) - (if (p/synced? input-pipeline (t/epoch state)) - (-> state - (set-context! nil) - (advance)) - ;; ensure we don't try to recover input again before synced - (set-context! state (assoc context :recovered? true))))) + (let [{:keys [recover-coordinates restored-fut] :as context} (get-context state) + input-pipeline (get-input-pipeline state) + restored-fut (if restored-fut + restored-fut + (future (Thread/sleep 800) + (res/recover-input (get-event state) recover-coordinates)))] + (cond (and (future-done? restored-fut) + (p/synced? input-pipeline (t/epoch state))) + (let [stored @restored-fut] + (info (:onyx.core/log-prefix (get-event state)) "Recover pipeline checkpoint:" stored) + (p/recover! input-pipeline (t/replica-version state) stored) + (-> state + (set-context! nil) + (advance))) + + (:start-time context) + (if (> (System/nanoTime) (+ (:start-time context) (* 700 1000000))) + (throw (ex-info "Past recover tolerance time." {})) + state) + + :else + ;; ensure we don't try to recover output again before synced and the checkpoint is fetched + (set-context! state (assoc context :restored-fut restored-fut :start-time (System/nanoTime)))))) (defn cleanup-previous-state-store! [state] (when-let [state-store (get-state-store state)] @@ -466,37 +476,42 @@ (defn recover-state [state] (let [{:keys [onyx.core/task-id onyx.core/peer-opts] :as event} (get-event state) - _ (cleanup-previous-state-store! state) - state-serializers (onyx.state.serializers.utils/event->state-serializers event) - db-name (db-name event) - state-store (db/create-db peer-opts db-name state-serializers) - _ (set-state-store! state state-store) - {:keys [recover-coordinates]} (get-context state) - recovered-windows (res/recover-windows event state-store recover-coordinates)] - (-> state - (set-windows-state! recovered-windows) - (ws/assign-windows (onyx.types/new-state-event :recovered - event - (t/replica-version state) - (t/epoch state))) - (advance)))) - -(defn recover-output [state] - (let [{:keys [recover-coordinates recovered?] :as context} (get-context state) - pipeline (get-output-pipeline state)] - (when-not recovered? - (let [event (get-event state) - ;; output recovery is only supported with onyx/n-peers set - ;; as we can't currently scale slot recovery up and down - stored (res/recover-output event recover-coordinates)] - (info (:onyx.core/log-prefix event) "Recover output pipeline checkpoint:" stored) - (p/recover! pipeline (t/replica-version state) stored))) - (if (p/synced? pipeline (t/epoch state)) + {:keys [recover-coordinates restored-fut] :as context} (get-context state) + restored-fut (if restored-fut + restored-fut + (let [_ (cleanup-previous-state-store! state) + state-serializers (onyx.state.serializers.utils/event->state-serializers event) + db-name (db-name event) + state-store (db/create-db peer-opts db-name state-serializers) + _ (set-state-store! state state-store)] + (future (res/recover-windows event state-store recover-coordinates))))] + (if (future-done? restored-fut) (-> state + (set-windows-state! @restored-fut) + (ws/assign-windows (onyx.types/new-state-event :recovered + event + (t/replica-version state) + (t/epoch state))) (set-context! nil) (advance)) - ;; ensure we don't try to recover output again before synced - (set-context! state (assoc context :recovered? true))))) + (set-context! state (assoc context :restored-fut restored-fut))))) + +(defn recover-output [state] + (let [{:keys [recover-coordinates restored-fut] :as context} (get-context state) + output-pipeline (get-output-pipeline state) + restored-fut (if restored-fut + restored-fut + (future (res/recover-output (get-event state) recover-coordinates)))] + (if (and (future-done? restored-fut) + (p/synced? output-pipeline (t/epoch state))) + (let [stored @restored-fut] + (info (:onyx.core/log-prefix (get-event state)) "Recover pipeline checkpoint:" stored) + (p/recover! output-pipeline (t/replica-version state) stored) + (-> state + (set-context! nil) + (advance))) + ;; ensure we don't try to recover otput again before synced and the checkpoint is fetched + (set-context! state (assoc context :restored-fut restored-fut))))) (defn event->pub-liveness [event] (ms->ns (arg-or-default :onyx.peer/publisher-liveness-timeout-ms @@ -852,6 +867,7 @@ (when messenger (component/stop messenger)) (when coordinator (coordinator/stop coordinator scheduler-event)) (some-> event :onyx.core/storage cp/stop) + (some-> this get-context :restored-fut future-cancel) (some-> input-pipeline (p/stop event)) (some-> output-pipeline (p/stop event)) this) @@ -1058,6 +1074,7 @@ this) (goto-recover! [this] (.set idx recover-idx) + (some-> this get-context :restored-fut future-cancel) (-> this (set-context! nil) (reset-event!))) diff --git a/test/onyx/windowing/basic_windowing_crash_test.clj b/test/onyx/windowing/basic_windowing_crash_test.clj index d12355adc..7bf2a5798 100644 --- a/test/onyx/windowing/basic_windowing_crash_test.clj +++ b/test/onyx/windowing/basic_windowing_crash_test.clj @@ -173,11 +173,9 @@ (let [{:keys [job-id]} (onyx.api/submit-job peer-config job) _ (onyx.test-helper/feedback-exception! peer-config job-id) results (take-segments! @out-chan 5)] - (println "COORDS" (onyx.api/job-snapshot-coordinates peer-config id job-id)) + (println "Snapshot coordinates:" (onyx.api/job-snapshot-coordinates peer-config id job-id)) + (println "GC checkpoints.") (onyx.api/gc-checkpoints peer-config id job-id) - (println "Try again" ) + (println "GC checkpoints a second time.") (onyx.api/gc-checkpoints peer-config id job-id) - ;; FIXME: improve test by pulling in old coordinates, recovering and then starting gc again. - ;; TODO, allow gc to take part over tenancy history. - (is (= expected-windows (output->final-counts @test-state))))))))