diff --git a/data-rescue/flow_switch_resolver.rb b/data-rescue/flow_switch_resolver.rb new file mode 100644 index 00000000..066c233b --- /dev/null +++ b/data-rescue/flow_switch_resolver.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true +require 'csv' +require 'pry' +require 'json' + +=begin +select s.*,t.flow as "t_flow",t.id as "t_id" from submissions s +left outer join transmissions t on s.id = t.submission_id +where s.flow = 'docUpload' + and input_data->>'signature' is not null + and submitted_at is not null; +=end +CSV_FILE = File.expand_path('~/Documents/quarantine/flow_switch_pebt_to_docupload.csv') +OUTPUT_FILE = File.expand_path('./output-flow-switch.sql') + +class FlowSwitchResolver + def initialize(data) + @data = data + end + + def resolution_sql + return to_enum(:resolution_sql) unless block_given? + + @data.each do |row| + yield <<~SQL + UPDATE submissions SET flow = 'pebt', updated_at = NOW() WHERE id = '#{row["id"]}'; + UPDATE transmissions SET submitted_to_state_at = null, submitted_to_state_filename = null, updated_at = NOW() WHERE id = '#{row["t_id"]}'; + SQL + end + end +end + +class Outputter + def initialize(filename) + @filename = filename + end + + def write_lines(enumerable) + count = 0 + File.open(@filename, 'w') do |f| + f.puts "BEGIN;" + enumerable.each do |entry| + f.puts entry + count += 1 + end + f.puts "COMMIT;" + end + + puts "Outputted #{count} entries to #{@filename}" + end +end + +resolver = FlowSwitchResolver.new(CSV.read(CSV_FILE, headers: :first_row)).resolution_sql +Outputter.new(OUTPUT_FILE).write_lines(resolver) diff --git a/data-rescue/merge_the_remainder.rb b/data-rescue/merge_the_remainder.rb new file mode 100644 index 00000000..949f9ead --- /dev/null +++ b/data-rescue/merge_the_remainder.rb @@ -0,0 +1,244 @@ +# frozen_string_literal: true +require 'csv' +require 'pry' +require 'json' +require 'damerau-levenshtein' +require 'aws-sdk-s3' +raise "need to specify AWS_ACCESS_KEY_ID" unless ENV['AWS_ACCESS_KEY_ID'] && ENV['AWS_SECRET_ACCESS_KEY'] +CSV_FILE = File.expand_path('~/Documents/quarantine/all_data.csv') +OUTPUT_FILE = File.expand_path('~/Documents/quarantine/manual_resolution_of_everything-2023-08-11.sql') + +# Logic copied from submissions_matcher.rb +class SubmissionMerger + MERGE_STRATEGIES = { + # a = old value, b = new value + # Whatever is returned goes in the new field. If it returns `nil` then the record is different and we raise an error. + equality: ->(a, b) { b if a == b }, + string: ->(a, b) { b if a.gsub(" ", "").downcase == b.gsub(" ", "").downcase }, + uploads: ->(a, b) { JSON.generate(JSON.parse(a).concat(JSON.parse(b))) }, + dont_overwrite: ->(a, _) { a }, + } + # For some fields, we can afford to be a bit looser with equality checking / merge logic. + FIELD_MERGE_STRATEGIES = { + 'firstName' => MERGE_STRATEGIES[:string], + 'lastName' => MERGE_STRATEGIES[:string], + 'signature' => MERGE_STRATEGIES[:string], + 'residentialAddressCity' => MERGE_STRATEGIES[:string], + 'residentialAddressStreetAddress1' => MERGE_STRATEGIES[:string], + 'residentialAddressStreetAddress2' => MERGE_STRATEGIES[:string], + + 'identityFiles' => MERGE_STRATEGIES[:uploads], + 'enrollmentFiles' => MERGE_STRATEGIES[:uploads], + 'incomeFiles' => MERGE_STRATEGIES[:uploads], + 'unearnedIncomeFiles' => MERGE_STRATEGIES[:uploads], + + 'students' => MERGE_STRATEGIES[:dont_overwrite], + 'income' => MERGE_STRATEGIES[:dont_overwrite], + 'household' => MERGE_STRATEGIES[:dont_overwrite], + 'confirmationNumber' => MERGE_STRATEGIES[:dont_overwrite], + 'feedbackText' => MERGE_STRATEGIES[:dont_overwrite], + } + + def self.merge_records(submitted_record, unsubmitted_records) + unsubmitted_records.each do |row| + row['input_data'].each do |k, v| + old_val = submitted_record['input_data'][k] + if old_val.nil? + submitted_record['input_data'][k] = v + else + new_val = FIELD_MERGE_STRATEGIES.fetch(k, MERGE_STRATEGIES[:equality]).call(old_val, v) + if new_val.nil? + puts "Merge conflict in field #{k}. Old: #{old_val} / New: #{v}" + puts "What value should be used?" + new_val = $stdin.gets.strip + end + submitted_record['input_data'][k] = new_val + end + end + end + + [submitted_record, unsubmitted_records.map { |record| record['submission_id'] }] + end +end + +def download_files(submission_id) + s3 = Aws::S3::Client.new(region: 'us-west-1') + s3.list_objects(bucket: 'homeschool-pebt-production', prefix: submission_id).each do |response| + return response.contents.map do |s3_file| + extension = File.extname(s3_file['key']) + f = Tempfile.new(['download', extension]) + s3.get_object( + bucket: 'homeschool-pebt-production', + key: s3_file['key'] + ) do |chunk| + f.write chunk + end + + f + end + end +end + +def normalized_name(app) + if app['input_data']['signature'] + app['input_data']['signature'] + .downcase + .gsub(/^[ ]*|[ ]*$|(?<= )[ ]*/, '') + .gsub(/([a-z]+) [a-z]+ ([a-z]+)/, '\1 \2') + .gsub(" ", "").tr("áéíóú", "aeiou") + elsif app['input_data']['firstName'] + (app['input_data']['firstName'].strip + ' ' + app['input_data']['lastName'].strip) + .downcase + .gsub(/([a-z]+) [a-z]+ ([a-z]+)/, '\1 \2') + .gsub(" ", "").tr("áéíóú", "aeiou") + end +end + +class OutputFile + def initialize(filename) + @filename = filename + end + + def already_processed_rows + return [] unless File.exist?(@filename) + + File.readlines(@filename).map do |line| + line.match('-- id: (.*)') && $~[1] + end.compact + end + + def write_sql(id, sql) + File.open(@filename, 'a') do |f| + f.puts "-- id: #{id}" + f.puts sql + end + end +end + +class SubmissionsMatcher + def initialize(csv_file, output_file) + @data = CSV.open(csv_file, headers: :first_row).to_a + @data.each { |row| row['input_data'] = JSON.parse(row['input_data']) } + @output_file = output_file + end + + def process + incomplete_apps = + @data + .find_all { |r| r['last_transmission_failure_reason'] == 'skip_incomplete' } + .find_all { |r| !@output_file.already_processed_rows.include?(r['submission_id']) } + puts "Found #{incomplete_apps.length} incomplete apps to process..." + + incomplete_apps.each do |app| + next if app['input_data'].keys == ['docUpload'] # handled separately by s3_review script + + similar = similar_apps(app) + loop do + print_apps_table(similar, app) + + puts "Options: [M]erge / [D]elete this app (if a duplicate succeeded) / [S]earch / Merge with [O]ther / [F]ile download" + case $stdin.gets.chomp.downcase + when 'm' + $stdout.write "Merge with which application? " + merge_idx = $stdin.gets.chomp.to_i + merge_app = similar[merge_idx - 1] + result, merged_submission_ids = SubmissionMerger.merge_records(app, [merge_app]) + @output_file.write_sql(app['submission_id'], <<~SQL) + UPDATE submissions SET input_data = '#{JSON.generate(result['input_data']).gsub("'", "''")}' WHERE id = '#{result['submission_id']}'; + UPDATE submissions SET merged_into_submission_id='#{result['submission_id']}' WHERE id in ('#{merged_submission_ids.join('\',\'')}'); + UPDATE submissions SET updated_at = NOW() WHERE id = '#{result['submission_id']}' OR id in ('#{merged_submission_ids.join('\',\'')}'); + SQL + break + when 'd' + @output_file.write_sql(app['submission_id'], <<~SQL) + DELETE FROM transmissions WHERE submission_id = '#{app['submission_id']}'; + DELETE FROM user_files WHERE submission_id = '#{app['submission_id']}'; + DELETE FROM submissions WHERE id = '#{app['submission_id']}'; + SQL + break + when 's' + puts "\nMost closely matching 20 apps:" + similar = @data + .map { |o| [o, DamerauLevenshtein.distance(app['normalized_name'], o['normalized_name'])] unless o['normalized_name'].nil? } + .compact + .sort_by { |_app, rank| rank } + .first(20) + .map(&:first) + when 'o' + puts "Merge with which submission id? " + merge_id = $stdin.gets.chomp + merge_app = @data.find { |app| app['submission_id'] == merge_id } + if merge_app.nil? + puts "Could not find submission ID = #{merge_id}" + next + end + result, merged_submission_ids = SubmissionMerger.merge_records(app, [merge_app]) + @output_file.write_sql(app['submission_id'], <<~SQL) + UPDATE submissions SET input_data = '#{JSON.generate(result['input_data']).gsub("'", "''")}' WHERE id = '#{result['submission_id']}'; + UPDATE submissions SET merged_into_submission_id='#{result['submission_id']}' WHERE id in ('#{merged_submission_ids.join('\',\'')}'); + UPDATE submissions SET updated_at = NOW() WHERE id = '#{result['submission_id']}' OR id in ('#{merged_submission_ids.join('\',\'')}'); + SQL + break + when 'f' + files = download_files(app['submission_id']) + puts "Attached files:" + puts files.map(&:path) + end + end + end + end + + def similar_apps(app) + name = normalized_name(app) + + @data.find_all do |other| + other['normalized_name'] ||= normalized_name(other) + other['normalized_name'] == name && other['submission_id'] != app['submission_id'] + end + end + + FIELDS = ->(app) do + { + 'applicant_name' => [app['input_data'].fetch('firstName', nil), app['input_data'].fetch('lastName', nil)].compact.join(' '), + 'signature' => app['input_data']['signature'], + 'students' => app['input_data'].fetch('students', []).map { |s| [s['studentFirstName'], s['studentLastName']].compact.join(' ') }.join(', '), + 'submitted_at' => app['submitted_at'], + 'submitted_to_state_at' => app['submitted_to_state_at'], + 'num_fields' => app['input_data'].length.to_s, + 'id_prefix' => app['submission_id'][0..8] + } + end + def print_apps_table(apps, target) + fields = apps.map { |app| FIELDS.call(app) } + target_fields = FIELDS.call(target) + max_len_by_field = (fields.dup.append(target_fields)).flat_map(&:entries).each_with_object({}) { |(field_name, value), max_len| max_len[field_name] ||= field_name.length; max_len[field_name] = value.length if value && value.length > max_len[field_name] } + + # output header + row_length = 0 + row_length += $stdout.write("# ") + max_len_by_field.keys.each do |field_name| + row_length += $stdout.write(" | " + field_name.ljust(max_len_by_field[field_name])) + end + $stdout.write("\n") + $stdout.write('-' * row_length + "\n") + # output rows + fields.each_with_index.each do |app, i| + $stdout.write("%2d." % (i + 1)) + app.each do |field_name, value| + $stdout.write(" | " + (value || "").ljust(max_len_by_field[field_name])) + end + $stdout.write("\n") + end + $stdout.write("\n") + $stdout.write('--- INCOMPLETE APP: ' + '-' * (row_length - 21)) + $stdout.write("\n ") + FIELDS.call(target).each do |field_name, value| + $stdout.write(" | " + (value || "").ljust(max_len_by_field[field_name])) + end + $stdout.write("\n") + end +end + +puts "Loading data..." +output_file = OutputFile.new(OUTPUT_FILE) +SubmissionsMatcher.new(CSV_FILE, output_file).process diff --git a/data-rescue/query.sql b/data-rescue/query.sql new file mode 100644 index 00000000..166a17ea --- /dev/null +++ b/data-rescue/query.sql @@ -0,0 +1,68 @@ +-- the output of this goes into submissions_matcher.rb (as "toms_merge.csv") +with incomplete_submissions as ( + select * + from submissions s + where ( + s.flow = 'pebt' AND ( + s.input_data ->> 'hasMoreThanOneStudent' is null + OR s.input_data ->> 'firstName' is null + OR s.input_data ->> 'signature' is null + ) + OR + s.flow = 'docUpload' AND ( + s.input_data ->> 'firstName' is null + OR s.input_data ->> 'lastName' is null + OR s.input_data ->> 'applicationNumber' is null + ) + ) +), submissions_with_name as ( + select + id, + regexp_replace( + regexp_replace( + translate( + coalesce( + trim(lower(s.input_data ->> 'signature')), + lower(concat(trim(s.input_data ->> 'firstName'), ' ', trim(s.input_data ->> 'lastName')))) + , 'áéíóú-', 'aeiou ') + , '[\.]', '') + , '([a-z]*) [a-z]* ([a-z]*)', '\1 \2') + as normalized_name + from incomplete_submissions s +) +select + s.id, + normalized_name, + submitted_at, + updated_at, + flow, + input_data +from submissions s +inner join submissions_with_name on s.id = submissions_with_name.id +where submissions_with_name.normalized_name <> ' ' +order by normalized_name; + +-- the output of this goes into s3_review.rb as docuplad_submissions_that_had_errored.csv +select * from submissions s + inner join transmissions t on s.id = t.submission_id +where t.last_transmission_failure_reason = 'skip_incomplete' + and s.input_data->>'docUpload' is not null + and s.flow = 'docUpload'; + +-- the output of this goes into s3_review.rb as csv_of_all_completed_submissions_with_confirmationNumber.csv +select s.input_data->>'firstName' as first_name, s.input_data->>'lastName' as last_name, s.input_data->>'confirmationNumber' as confirmation_number +from submissions s +where s.submitted_at is not null + and s.input_data->>'confirmationNumber' is not null; + + +-- the output of this goes into merge_the_remainder.rb +-- all_data +SELECT + s.id as "submission_id", + s.submitted_at, + s.input_data, + t.submitted_to_state_at, + t.last_transmission_failure_reason +FROM submissions s +LEFT OUTER JOIN transmissions t on s.id = t.submission_id diff --git a/data-rescue/s3_review.rb b/data-rescue/s3_review.rb new file mode 100644 index 00000000..b64776aa --- /dev/null +++ b/data-rescue/s3_review.rb @@ -0,0 +1,95 @@ +require 'aws-sdk-s3' +require 'pry' +require 'csv' +raise "need to specify AWS_ACCESS_KEY_ID" unless ENV['AWS_ACCESS_KEY_ID'] && ENV['AWS_SECRET_ACCESS_KEY'] + +#inputs +# 1. csv of docUpload-only submissions +# 2. csv of all completed submissions - (firstName, lastName, confirmationNumber) +DOCUPLOADS_ONLY = File.expand_path('~/Documents/quarantine/docuplad_submissions_that_had_errored.csv') +COMPLETED_SUBMISSIONS = File.expand_path('~/Documents/quarantine/csv_of_all_completed_submissions_with_confirmationNumber.csv') +OUTPUT_FILE = File.expand_path('~/Documents/quarantine/manual_laterdoc_investigation_2023-08-09.sql') + +def download_files(submission_id) + s3 = Aws::S3::Client.new(region: 'us-west-1') + s3.list_objects(bucket: 'homeschool-pebt-production', prefix: submission_id).each do |response| + return response.contents.map do |s3_file| + extension = File.extname(s3_file['key']) + f = Tempfile.new(['download', extension]) + s3.get_object( + bucket: 'homeschool-pebt-production', + key: s3_file['key'] + ) do |chunk| + f.write chunk + end + + f + end + end +end + +def match_docupload_to_submission_data + rows = CSV.read(DOCUPLOADS_ONLY, headers: :first_row) + to_skip = already_processed_submissions + rows.each do |row| + next if to_skip.include?(row['submission_id']) + files = download_files(row['submission_id']) + + puts "Files: #{files.map(&:path).join("\n")}" + application_number = nil + + while application_number.nil? + puts "Enter First name (q to skip): " + first_name = $stdin.gets.strip + break if first_name == 'q' + + puts "Enter Last name: " + last_name = $stdin.gets.strip + + application_number = find_matching_submission(first_name, last_name) + end + + append_query(row['submission_id'], row['input_data'], first_name, last_name, application_number) + end +end + +def find_matching_submission(first_name, last_name) + CSV.read(COMPLETED_SUBMISSIONS, headers: :first_row).each do |row| + return row['confirmation_number'] if normalize_name(row['first_name']) == normalize_name(first_name) && normalize_name(row['last_name']) == normalize_name(last_name) + end + + nil +end + +def normalize_name(name) + return nil if name.nil? + name.strip.downcase +end + +def append_query(submission_id, input_data, first_name, last_name, application_number) + File.open(OUTPUT_FILE, 'a') do |f| + if application_number.nil? + f.puts "-- no match: #{submission_id}" + else + new_input_data = JSON.generate(JSON.parse(input_data).merge('firstName' => first_name, 'lastName' => last_name, 'applicationNumber' => application_number)).gsub("'", "''") + f.puts "-- found match: #{submission_id}" + f.puts "UPDATE submissions SET input_data = '#{new_input_data}' WHERE id='#{submission_id}';" + end + end +end + +def already_processed_submissions + return [] unless File.exist?(OUTPUT_FILE) + File.readlines(OUTPUT_FILE).map do |line| + line.match('-- (found|no) match: (.*)') && $~[2] + end.compact +end + +match_docupload_to_submission_data + +# for each docUpload-only submissions: +# download the docs +# user reviews the docs +# the script asks for the firstName and lastName to attach to that laterdoc +# (tbd) search for applications with that name to find a confirmation number +# script outputs SQL to run to add those fields diff --git a/data-rescue/submissions_matcher.rb b/data-rescue/submissions_matcher.rb new file mode 100644 index 00000000..496f1bcf --- /dev/null +++ b/data-rescue/submissions_matcher.rb @@ -0,0 +1,101 @@ +# frozen_string_literal: true +require 'csv' +require 'pry' +require 'json' +CSV_FILE = File.expand_path('~/Documents/quarantine/toms_merge.csv') + +class SubmissionsMatcher + MERGE_STRATEGIES = { + # a = old value, b = new value + # Whatever is returned goes in the new field. If it returns `nil` then the record is different and we raise an error. + equality: ->(a, b) { b if a == b }, + string: ->(a, b) { b if a.gsub(" ", "").downcase == b.gsub(" ", "").downcase }, + uploads: ->(a, b) { JSON.generate(JSON.parse(a).concat(JSON.parse(b))) }, + dont_overwrite: ->(a, _) { a }, + } + # For some fields, we can afford to be a bit looser with equality checking / merge logic. + FIELD_MERGE_STRATEGIES = { + 'firstName' => MERGE_STRATEGIES[:string], + 'lastName' => MERGE_STRATEGIES[:string], + 'signature' => MERGE_STRATEGIES[:string], + 'residentialAddressCity' => MERGE_STRATEGIES[:string], + 'residentialAddressStreetAddress1' => MERGE_STRATEGIES[:string], + 'residentialAddressStreetAddress2' => MERGE_STRATEGIES[:string], + + 'identityFiles' => MERGE_STRATEGIES[:uploads], + 'enrollmentFiles' => MERGE_STRATEGIES[:uploads], + 'incomeFiles' => MERGE_STRATEGIES[:uploads], + 'unearnedIncomeFiles' => MERGE_STRATEGIES[:uploads], + + 'students' => MERGE_STRATEGIES[:dont_overwrite], + 'income' => MERGE_STRATEGIES[:dont_overwrite], + 'household' => MERGE_STRATEGIES[:dont_overwrite], + 'confirmationNumber' => MERGE_STRATEGIES[:dont_overwrite], + 'feedbackText' => MERGE_STRATEGIES[:dont_overwrite], + } + + def initialize(csv_file) + @csv = CSV.open(csv_file, headers: :first_row) + end + + def process + data = @csv.to_a + data.each { |row| row['input_data'] = JSON.parse(row['input_data']) } + data = data.group_by { |row| row['normalized_name'] }.keep_if { |_name, rows| rows.length > 1 } + puts "Read #{data.length} groups of people from CSV" + + submitted_rows = Hash[data.filter { |_name, rows| rows.any? { |row| row['submitted_at'] } }] + puts "Found #{submitted_rows.length} groups with a submitted_at" + + sql_commands = ['BEGIN;'] + submitted_rows.each do |k, rows| + begin + merged_record, other_ids = merge_records(rows) + sql_commands << generate_sql_command(merged_record, other_ids) + rescue => ex + binding.pry if ex.message.match?('implicit') + $stderr.puts "Error merging #{k}: #{ex.message}" + end + end + sql_commands << 'COMMIT;' + + File.open('out.sql', 'w') do |f| + f.write(sql_commands.join("\n")) + end + puts "Wrote #{sql_commands.length} SQL commands to 'out.sql'" + end + + def generate_sql_command(record, other_ids) + <<~SQL + UPDATE submissions SET input_data = '#{JSON.generate(record['input_data']).gsub("'", "''")}' WHERE id = '#{record['id']}'; + UPDATE submissions SET merged_into_submission_id='#{record['id']}' WHERE id in ('#{other_ids.join('\',\'')}'); + UPDATE submissions SET updated_at = NOW() WHERE id = '#{record['id']}' OR id in ('#{other_ids.join('\',\'')}'); + SQL + end + + def merge_records(rows) + # find the submitted record + submitted_record = rows.find { |row| row['submitted_at'] } + + # merge + unsubmitted_records = rows.find_all { |row| row['id'] != submitted_record['id'] }.sort_by { |row| DateTime.parse(row['updated_at']) }.reverse + unsubmitted_records.each do |row| + row['input_data'].each do |k, v| + old_val = submitted_record['input_data'][k] + if old_val.nil? + submitted_record['input_data'][k] = v + else + new_val = FIELD_MERGE_STRATEGIES.fetch(k, MERGE_STRATEGIES[:equality]).call(old_val, v) + if new_val.nil? + raise "key #{k} does not match (#{old_val.inspect} vs #{v.inspect})" + end + submitted_record['input_data'][k] = new_val + end + end + end + + [submitted_record, unsubmitted_records.map { |record| record['id'] }] + end +end + +SubmissionsMatcher.new(CSV_FILE).process