Skip to content

Commit

Permalink
Segment Large CSV Batch Processes (#1475)
Browse files Browse the repository at this point in the history
* segment delete parent job

* Add start of test

* reassociate child oids segmented

* segment recreate child ptiff

* segment update parent objects job

* rubocop and spec

* remove comments

* Update variable naming

* Update numbers

* skip already processed rows

* rubocop

* alter job limit in specs

* Update variable name

* Extend expectation

* Add expectations

* adds skipped row expectations to spec

* save delete batch process

* add comment back

* Add additional test

* confirm hitting perform_later twice

* Add test

* Refactor variable

* fix spec

* reassociate spec csv and expectations

* Add fixture

* Update stubbing and expectations

* Update to use variable

* Use variable instead and clean up

* adds row skipping to already processed rows

* Change access mount

* Implement segment on create parent and clean up

* make spec more resilient

* Add guards

* Add return and cleanup

* more cleaning

---------

Co-authored-by: JP Engstrom <[email protected]>
Co-authored-by: K8Sewell <[email protected]>
Co-authored-by: JP Engstrom <[email protected]>
  • Loading branch information
4 people authored Feb 4, 2025
1 parent d07251d commit 2a311eb
Show file tree
Hide file tree
Showing 19 changed files with 311 additions and 36 deletions.
5 changes: 3 additions & 2 deletions app/jobs/create_new_parent_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ def default_priority
-50
end

def perform(batch_process)
batch_process.create_new_parent_csv
def perform(batch_process, start_index = 0)
index = batch_process.create_new_parent_csv(start_index)
CreateNewParentJob.perform_later(batch_process, index) if !index.nil? && index != -1 && index > BatchProcess::BATCH_LIMIT
end
end
5 changes: 3 additions & 2 deletions app/jobs/delete_parent_objects_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ def default_priority
-50
end

def perform(batch_process)
batch_process.delete_parent_objects
def perform(batch_process, start_index = 0)
index = batch_process.delete_parent_objects(start_index)
DeleteParentObjectsJob.perform_later(batch_process, index) if !index.nil? && index != -1 && index > BatchProcess::BATCH_LIMIT
end
end
5 changes: 3 additions & 2 deletions app/jobs/reassociate_child_oids_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ def default_priority
50
end

def perform(batch_process)
batch_process.reassociate_child_oids
def perform(batch_process, start_index = 0)
index = batch_process.reassociate_child_oids(start_index)
ReassociateChildOidsJob.perform_later(batch_process, index) if !index.nil? && index != -1 && index > BatchProcess::BATCH_LIMIT
rescue => e
batch_process.batch_processing_event("ReassociateChildOidsJob failed due to #{e.message}", "failed")
end
Expand Down
5 changes: 3 additions & 2 deletions app/jobs/recreate_child_oid_ptiffs_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ def default_priority
9
end

def perform(batch_process)
batch_process.recreate_child_oid_ptiffs
def perform(batch_process, start_index = 0)
index = batch_process.recreate_child_oid_ptiffs(start_index)
RecreateChildOidPtiffsJob.perform_later(batch_process, index) if !index.nil? && index != -1 && index > BatchProcess::BATCH_LIMIT
end
end
5 changes: 3 additions & 2 deletions app/jobs/update_parent_objects_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ def default_priority
50
end

def perform(batch_process)
batch_process.update_parent_objects
def perform(batch_process, start_index = 0)
index = batch_process.update_parent_objects(start_index)
UpdateParentObjectsJob.perform_later(batch_process, index) if !index.nil? && index != -1 && index > BatchProcess::BATCH_LIMIT
end
end
1 change: 1 addition & 0 deletions app/models/batch_process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class BatchProcess < ApplicationRecord # rubocop:disable Metrics/ClassLength
has_many :child_objects, through: :batch_connections, source_type: "ChildObject", source: :connectable

CSV_MAXIMUM_ENTRIES = 10_000
BATCH_LIMIT = 50

# SHARED BY ALL BATCH ACTIONS: ------------------------------------------------------------------- #

Expand Down
5 changes: 4 additions & 1 deletion app/models/concerns/create_parent_object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ module CreateParentObject
# rubocop:disable Metrics/CyclomaticComplexity
# rubocop:disable Metrics/BlockLength
# rubocop:disable Layout/LineLength
def create_new_parent_csv
def create_new_parent_csv(start_index = 0)
self.admin_set = ''
sets = admin_set
parsed_csv.each_with_index do |row, index|
next if start_index > index
if row['digital_object_source'].present? && row['preservica_uri'].present? && !row['preservica_uri'].blank?
begin
parent_object = CsvRowParentService.new(row, index, current_ability, user).parent_object
Expand Down Expand Up @@ -116,7 +117,9 @@ def create_new_parent_csv
rescue StandardError => e
batch_processing_event("Skipping row [#{index + 2}] Unable to save parent: #{e.message}.", "Skipped Row")
end
return index + 1 if index + 1 - start_index > BatchProcess::BATCH_LIMIT
end
-1
end
# rubocop:enable Metrics/AbcSize
# rubocop:enable Metrics/MethodLength
Expand Down
7 changes: 6 additions & 1 deletion app/models/concerns/deletable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ module Deletable
# DELETE PARENT OBJECTS: ------------------------------------------------------------------------ #

# DELETES PARENT OBJECTS FROM INGESTED CSV
def delete_parent_objects
# rubocop:disable Metrics/MethodLength
def delete_parent_objects(start_index = 0)
self.admin_set = ''
sets = admin_set
parsed_csv.each_with_index do |row, index|
next if start_index > index
oid = row['oid']
action = row['action']
metadata_source = row['source']
Expand All @@ -22,8 +24,11 @@ def delete_parent_objects
setup_for_background_jobs(parent_object, metadata_source)
parent_object.destroy!
parent_object.processing_event("Parent #{parent_object.oid} has been deleted", 'deleted')
return index + 1 if index + 1 - start_index > BatchProcess::BATCH_LIMIT
end
-1
end
# rubocop:enable Metrics/MethodLength

# CHECKS TO SEE IF USER HAS ABILITY TO DELETE OBJECTS:
def deletable_parent_object(oid, index)
Expand Down
15 changes: 11 additions & 4 deletions app/models/concerns/reassociatable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ module Reassociatable
BLANK_VALUE = "_blank_"

# triggers the reassociate process
def reassociate_child_oids
def reassociate_child_oids(start_index = 0)
return unless batch_action == "reassociate child oids"
parents_needing_update, parent_destination_map = update_child_objects
parents_needing_update, parent_destination_map, index = update_child_objects(start_index)
update_related_parent_objects(parents_needing_update, parent_destination_map)
index
end

# finds which parents are needed to update
# rubocop:disable Metrics/AbcSize
# rubocop:disable Metrics/MethodLength
def update_child_objects
# rubocop:disable Metrics/CyclomaticComplexity
# rubocop:disable Metrics/PerceivedComplexity
def update_child_objects(start_index)
self.admin_set = ''
sets = admin_set
return unless batch_action == "reassociate child oids"
Expand All @@ -25,6 +28,7 @@ def update_child_objects
parent_destination_map = {}

parsed_csv.each_with_index do |row, index|
next if start_index > index
co = load_child(index, row["child_oid"].to_i)
po = load_parent(index, row["parent_oid"].to_i)
next unless co.present? && po.present?
Expand All @@ -45,11 +49,14 @@ def update_child_objects

values_to_update = check_headers(child_headers, row)
update_child_values(values_to_update, co, row, index)
return [parents_needing_update, parent_destination_map, index + 1] if index + 1 - start_index > BatchProcess::BATCH_LIMIT
end
[parents_needing_update, parent_destination_map]
[parents_needing_update, parent_destination_map, -1]
end
# rubocop:enable Metrics/AbcSize
# rubocop:enable Metrics/MethodLength
# rubocop:enable Metrics/CyclomaticComplexity
# rubocop:enable Metrics/PerceivedComplexity

# verifies headers are included. child headers found in csv_exportable:90
def check_headers(headers, row)
Expand Down
5 changes: 4 additions & 1 deletion app/models/concerns/recreate_child_ptiff.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ module RecreateChildPtiff
# rubocop:disable Metrics/MethodLength
# rubocop:disable Metrics/CyclomaticComplexity
# rubocop:disable Metrics/PerceivedComplexity
def recreate_child_oid_ptiffs
def recreate_child_oid_ptiffs(start_index = 0)
parents = Set[]
self.admin_set = ''
sets = admin_set
oids.each_with_index do |oid, index|
next if start_index > index
child_object = ChildObject.find_by_oid(oid.to_i)
unless child_object
batch_processing_event("Skipping row [#{index + 2}] with unknown Child: #{oid}", 'Skipped Row')
Expand All @@ -35,7 +36,9 @@ def recreate_child_oid_ptiffs
GeneratePtiffJob.perform_later(child_object, self) if file_size <= SetupMetadataJob::FIVE_HUNDRED_MB
attach_item(child_object)
child_object.processing_event("Ptiff Queued", "ptiff-queued")
return index + 1 if index + 1 - start_index > BatchProcess::BATCH_LIMIT
end
-1
end
# rubocop:enable Metrics/AbcSize
# rubocop:enable Metrics/MethodLength
Expand Down
11 changes: 7 additions & 4 deletions app/models/concerns/updatable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ def update_child_objects_caption
end

# rubocop:disable Metrics/BlockLength
def update_parent_objects
def update_parent_objects(start_index = 0)
self.admin_set = ''
sets = admin_set
return unless batch_action == "update parent objects"
return unless batch_action == 'update parent objects'
parsed_csv.each_with_index do |row, index|
next if start_index > index
oid = row['oid'] unless ['oid'].nil?
redirect = row['redirect_to'] unless ['redirect_to'].nil?
parent_object = updatable_parent_object(oid, index)
Expand All @@ -87,10 +88,10 @@ def update_parent_objects
setup_for_background_jobs(parent_object, metadata_source)
parent_object.admin_set = admin_set unless admin_set.nil?

if row['visibility'] == "Open with Permission" && row['permission_set_key'].blank?
if row['visibility'] == 'Open with Permission' && row['permission_set_key'].blank?
batch_processing_event("Skipping row [#{index + 2}]. Process failed. Permission Set missing from CSV.", 'Skipped Row')
next
elsif row['visibility'] == "Open with Permission" && row['permission_set_key'] != parent_object&.permission_set&.key
elsif row['visibility'] == 'Open with Permission' && row['permission_set_key'] != parent_object&.permission_set&.key
permission_set = OpenWithPermission::PermissionSet.find_by(key: row['permission_set_key'])
if permission_set.nil?
batch_processing_event("Skipping row [#{index + 2}]. Process failed. Permission Set missing or nonexistent.", 'Skipped Row')
Expand All @@ -113,7 +114,9 @@ def update_parent_objects
sync_from_preservica if parent_object.digital_object_source == 'Preservica'

processing_event_for_parent(parent_object)
return index + 1 if index + 1 - start_index > BatchProcess::BATCH_LIMIT
end
-1
end
# rubocop:enable Metrics/CyclomaticComplexity
# rubocop:enable Metrics/PerceivedComplexity
Expand Down
5 changes: 5 additions & 0 deletions spec/fixtures/csv/create_many_parent_fixture_ids.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
oid,source,admin_set
2005512,ladybird,brbl
2005513,ladybird,brbl
2005514,ladybird,brbl
2005515,ladybird,brbl
5 changes: 5 additions & 0 deletions spec/fixtures/csv/delete_many_parent_fixture_ids.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
oid,action,source,admin_set
2005512,delete,ladybird,brbl
2005513,delete,ladybird,brbl
2005514,delete,ladybird,brbl
2005515,delete,ladybird,brbl
5 changes: 5 additions & 0 deletions spec/fixtures/csv/reassociate_many_child_objects.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
child_oid,parent_oid,order,parent_title,label,caption,viewing_hint
1030368,2005515,,,,,
1032318,2005514,,,,,
1030368,2002826,,,,,
1032318,2002826,,,,,
44 changes: 44 additions & 0 deletions spec/jobs/create_new_parent_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe CreateNewParentJob, type: :job, prep_metadata_sources: true, prep_admin_sets: true do
let(:admin_set) { AdminSet.find_by(key: 'brbl') }
let(:user) { FactoryBot.create(:user) }
let(:create_many) { Rack::Test::UploadedFile.new(Rails.root.join(fixture_path, "csv", "create_many_parent_fixture_ids.csv")) }
let(:create_batch_process) { FactoryBot.create(:batch_process, user: user, file: create_many) }
let(:bare_create_batch_process) { FactoryBot.create(:batch_process, user: user) }
let(:total_parent_object_count) { 4 }

before do
allow(GoodJob).to receive(:preserve_job_records).and_return(true)
ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline)
end

it 'increments the job queue by one' do
create_parent_job = described_class.perform_later(bare_create_batch_process)
expect(create_parent_job.instance_variable_get(:@successfully_enqueued)).to eq true
end

context 'with more than limit of batch objects' do
before do
BatchProcess::BATCH_LIMIT = 2
expect(ParentObject.all.count).to eq 0
user.add_role(:editor, admin_set)
login_as(:user)
expect(described_class).to receive(:perform_later).exactly(2).times.and_call_original
end

around do |example|
perform_enqueued_jobs do
example.run
end
end

it 'goes through all parents in batches once' do
create_batch_process.save
expect(ParentObject.all.count).to eq total_parent_object_count
expect(IngestEvent.where(reason: 'Processing has been queued').count).to eq total_parent_object_count
end
end
end
54 changes: 54 additions & 0 deletions spec/jobs/delete_parent_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe DeleteParentObjectsJob, type: :job, prep_metadata_sources: true, prep_admin_sets: true do
let(:admin_set) { AdminSet.find_by(key: 'brbl') }
let(:user) { FactoryBot.create(:user) }
let(:create_many) { Rack::Test::UploadedFile.new(Rails.root.join(fixture_path, "csv", "create_many_parent_fixture_ids.csv")) }
let(:delete_many) { Rack::Test::UploadedFile.new(Rails.root.join(fixture_path, "csv", "delete_many_parent_fixture_ids.csv")) }
let(:create_batch_process) { FactoryBot.create(:batch_process, user: user, file: create_many) }
let(:delete_batch_process) { FactoryBot.create(:batch_process, user: user, file: delete_many, batch_action: 'delete parent objects') }

before do
allow(GoodJob).to receive(:preserve_job_records).and_return(true)
ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline)
end

context 'with tests active job queue' do
it 'increments the job queue by one' do
delete_parent_job = described_class.perform_later
expect(delete_parent_job.instance_variable_get(:@successfully_enqueued)).to be true
end
end

context 'with more than limit parent objects' do
before do
BatchProcess::BATCH_LIMIT = 2
expect(ParentObject.all.count).to eq 0
user.add_role(:editor, admin_set)
login_as(:user)
create_batch_process.save
total_parent_object_count = 4
expect(ParentObject.all.count).to eq total_parent_object_count
expect(described_class).to receive(:perform_later).exactly(2).times.and_call_original
end

around do |example|
perform_enqueued_jobs do
example.run
end
end

it 'goes through all parents in batches once' do
delete_batch_process.save
expect(IngestEvent.where(status: 'deleted').and(IngestEvent.where(reason: 'Parent 2005512 has been deleted')).count).to eq 1
expect(IngestEvent.where(status: 'Skipped Row').and(IngestEvent.where(reason: 'Skipping row [2] with parent oid: 2005512 because it was not found in local database')).count).to eq 0
expect(IngestEvent.where(status: 'deleted').and(IngestEvent.where(reason: 'Parent 2005513 has been deleted')).count).to eq 1
expect(IngestEvent.where(status: 'Skipped Row').and(IngestEvent.where(reason: 'Skipping row [3] with parent oid: 2005513 because it was not found in local database')).count).to eq 0
expect(IngestEvent.where(status: 'deleted').and(IngestEvent.where(reason: 'Parent 2005514 has been deleted')).count).to eq 1
expect(IngestEvent.where(status: 'deleted').and(IngestEvent.where(reason: 'Parent 2005515 has been deleted')).count).to eq 1
expect(ParentObject.all.count).to eq 0
end
end
end
Loading

0 comments on commit 2a311eb

Please sign in to comment.