Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataset upload fix #46

Merged
merged 3 commits into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 16 additions & 72 deletions Model/src/main/java/org/gusdb/wdk/model/answer/AnswerValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
import org.gusdb.wdk.model.query.spec.QueryInstanceSpec;
import org.gusdb.wdk.model.question.Question;
import org.gusdb.wdk.model.record.Field;
import org.gusdb.wdk.model.record.PrimaryKeyDefinition;
import org.gusdb.wdk.model.record.PrimaryKeyIterator;
import org.gusdb.wdk.model.record.ResultSetPrimaryKeyIterator;
import org.gusdb.wdk.model.record.RecordClass;
import org.gusdb.wdk.model.record.RecordInstance;
import org.gusdb.wdk.model.record.TableField;
Expand Down Expand Up @@ -886,70 +889,31 @@ public void setSortingMap(Map<String, Boolean> sortingMap) {
_sortedIdSql = null;
}

/**
* This method is redundant with getAllIds(), consider deprecate either one of them.
*
* @return returns a list of all primary key values.
*/
public Object[][] getPrimaryKeyValues() throws WdkModelException {
String[] columns = _answerSpec.getQuestion().getRecordClass().getPrimaryKeyDefinition().getColumnRefs();
List<Object[]> buffer = new ArrayList<>();

Optional<AnswerFilterInstance> legacyFilter = _answerSpec.getLegacyFilter();
try (ResultList resultList =
legacyFilter.isPresent() ?
legacyFilter.get().getResults(this) :
_idsQueryInstance.getResults()) {
while (resultList.next()) {
Object[] pkValues = new String[columns.length];
for (int columnIndex = 0; columnIndex < columns.length; columnIndex++) {
pkValues[columnIndex] = resultList.get(columns[columnIndex]);
}
buffer.add(pkValues);
}
Object[][] ids = new String[buffer.size()][columns.length];
buffer.toArray(ids);
return ids;
}
}

private void reset() {
_sortedIdSql = null;
_checksum = null;
_resultSizeFactory.clear();
}

/**
* Get a list of all the primary key tuples of all the records in the answer. It is a shortcut of iterating
* through all the pages and get the primary keys.
* Creates a closable iterator of IDs for this answer
*
* NOTE! caller must close the return value to avoid resource leaks.
*
* This method is redundant with getPrimaryKeyValues(), consider deprecate either one of them.
* @return an iterator of all the primary key tuples of all the records in the answer
* @throws WdkModelException if unable to execute ID query
*/
public List<String[]> getAllIds() throws WdkModelException {
String idSql = getSortedIdSql();
String[] pkColumns = _answerSpec.getQuestion().getRecordClass().getPrimaryKeyDefinition().getColumnRefs();
List<String[]> pkValues = new ArrayList<>();
WdkModel wdkModel = _answerSpec.getQuestion().getWdkModel();
DataSource dataSource = wdkModel.getAppDb().getDataSource();
ResultSet resultSet = null;
public PrimaryKeyIterator getAllIds() throws WdkModelException {
try {
resultSet = SqlUtils.executeQuery(dataSource, idSql, _idsQueryInstance.getQuery().getFullName() + "__all-ids");
while (resultSet.next()) {
String[] values = new String[pkColumns.length];
for (int i = 0; i < pkColumns.length; i++) {
Object value = resultSet.getObject(pkColumns[i]);
values[i] = (value == null) ? null : value.toString();
}
pkValues.add(values);
}
}
catch (SQLException ex) {
throw new WdkModelException(ex);
PrimaryKeyDefinition pkDef = _answerSpec.getQuestion().getRecordClass().getPrimaryKeyDefinition();
DataSource dataSource = _wdkModel.getAppDb().getDataSource();
String idSql = getSortedIdSql();
String queryDescriptor = _idsQueryInstance.getQuery().getFullName() + "__all-ids";
return new ResultSetPrimaryKeyIterator(pkDef, SqlUtils.executeQuery(dataSource, idSql, queryDescriptor));
}
finally {
SqlUtils.closeResultSetAndStatement(resultSet, null);
catch (SQLException e) {
throw new WdkModelException("Unable to execute ID query", e);
}
return pkValues;
}

public void setPageIndex(int startIndex, int endIndex) {
Expand Down Expand Up @@ -977,26 +941,6 @@ public JSONObject getFilterSummaryJson(String filterName) throws WdkUserExceptio
}
}

/**
* Returns one big string containing all IDs in this answer value's result in
* the following format: each '\n'-delimited line contains one record, whose
* primary keys are joined and delimited by a comma.
*
* @return list of all record IDs
*/
public String getAllIdsAsString() throws WdkModelException {
List<String[]> pkValues = getAllIds();
StringBuilder buffer = new StringBuilder();
for (String[] pkValue : pkValues) {
if (buffer.length() > 0) buffer.append("\n");
for (int i = 0; i < pkValue.length; i++) {
if (i > 0) buffer.append(", ");
buffer.append(pkValue[i]);
}
}
return buffer.toString();
}

private final static String ID_QUERY_HANDLE = "pidq";
private final static String QUERY_HANDLE = "inq";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
import static org.gusdb.fgputil.functional.Functions.mapToList;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;

import org.gusdb.fgputil.EncryptionUtil;
import org.gusdb.fgputil.FormatUtil;
import org.gusdb.fgputil.ListBuilder;
import org.gusdb.fgputil.MapBuilder;
import org.gusdb.fgputil.db.platform.DBPlatform;
import org.gusdb.fgputil.validation.ValidObjectFactory.RunnableObj;
Expand All @@ -20,6 +19,7 @@
import org.gusdb.wdk.model.answer.ResultSizeFactory;
import org.gusdb.wdk.model.answer.spec.AnswerSpec;
import org.gusdb.wdk.model.record.DynamicRecordInstance;
import org.gusdb.wdk.model.record.PrimaryKeyIterator;
import org.gusdb.wdk.model.record.RecordClass;
import org.gusdb.wdk.model.record.RecordInstance;
import org.gusdb.wdk.model.user.User;
Expand Down Expand Up @@ -98,15 +98,35 @@ public String getChecksum() throws WdkModelException {
}

@Override
public List<String[]> getAllIds() throws WdkModelException {
public PrimaryKeyIterator getAllIds() throws WdkModelException {
String[] pkArray = new String[_pkMap.size()];
String[] pkColNames = _recordClass.getPrimaryKeyDefinition().getColumnRefs();
if (pkArray.length != pkColNames.length)
throw new WdkModelException("Incoming primary key array does not match recordclass PK column ref array");
for (int i = 0; i < pkColNames.length; i++) {
pkArray[i] = (String)_pkMap.get(pkColNames[i]);
}
return new ListBuilder<String[]>().add(pkArray).toList();
return new PrimaryKeyIterator() {

private boolean valueReturned = false;

@Override
public boolean hasNext() {
return !valueReturned;
}

@Override
public String[] next() {
if (valueReturned) throw new NoSuchElementException();
valueReturned = true;
return pkArray;
}

@Override
public void close() {
// nothing to do here
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@
import java.io.Reader;

public abstract class DatasetContents {

private static final int BUF_SIZE = 8192;

// constants used to estimate number of records
public static final int ESTIMATED_CHARS_PER_ID = 10;
public static final int ESTIMATED_BYTES_PER_ID = 15;

protected final String fileName;

protected DatasetContents(String fileName) {
Expand All @@ -19,6 +24,8 @@ public String getUploadFileName() {
return fileName;
}

public abstract long getEstimatedRowCount();

@SuppressWarnings("ThrowFromFinallyBlock")
public String truncate(final int len) throws WdkModelException {
Reader reader = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -296,7 +298,7 @@ private Dataset createOrGetDataset(
);

insertDatasetValues(connection, datasetId, parser.iterator(content),
parser.datasetContentWidth(content));
parser.datasetContentWidth(content), content.getEstimatedRowCount());
connection.commit();

// create and insert user dataset.
Expand Down Expand Up @@ -533,24 +535,33 @@ private void insertDatasetValues(
final Connection connection,
final long datasetId,
final DatasetIterator data,
final int length
final int numDataColumns,
final long estimatedRowCount
) throws SQLException, WdkModelException, WdkUserException {
String sql = buildDatasetValuesInsertQuery(length);
String sql = buildDatasetValuesInsertQuery(numDataColumns);
LOG.info("Built the following insert SQL: " + sql);
int idAllocationBatchSize = calculateIdAllocationBatchSize(estimatedRowCount);
Queue<Long> datasetValueIdQueue = new LinkedList<>();
try (PreparedStatement psInsert = connection.prepareStatement(sql)) {
long batchRow = 0;
long rowOrderNumber = 1;
while (data.hasNext()) {
String[] value = data.next();

// get a new value id.
long datasetValueId = _userDb.getPlatform()
.getNextId(_userDb.getDataSource(), _userSchema, TABLE_DATASET_VALUES);
// get a new value id
if (datasetValueIdQueue.isEmpty()) {
datasetValueIdQueue.addAll(
_userDb.getPlatform().getNextNIds(
_userDb.getDataSource(),
_userSchema,
TABLE_DATASET_VALUES,
idAllocationBatchSize));
}

psInsert.setLong(1, datasetValueId);
psInsert.setLong(1, datasetValueIdQueue.poll());
psInsert.setLong(2, datasetId);
psInsert.setLong(3, rowOrderNumber);
for (int j = 0; j < length; j++) {
for (int j = 0; j < numDataColumns; j++) {
psInsert.setString(j + 4, value[j]);
}
psInsert.addBatch();
Expand All @@ -567,6 +578,17 @@ private void insertDatasetValues(
}
}

private int calculateIdAllocationBatchSize(long estimatedRowCount) {
// (0,10] = 1
if (estimatedRowCount <= 10) return 1;
// (10,100] = 10
if (estimatedRowCount <= 100) return 10;
// (100,1000] = 25
if (estimatedRowCount <= 1000) return 25;
// (1000,Inf) = 250
return 250;
}

private void validateValue(final String[] row) throws WdkUserException {
// check the number of columns
if (row.length > MAX_VALUE_COLUMNS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,28 @@ public class DatasetFileContents extends DatasetContents {
*/
private String checksum;

/**
* Number of records expected in this dataset file
* (some files are written knowing how many records are contained within)
*/
private final Long numRecords;

public DatasetFileContents(
final String fileName,
final File contents
) {
this(fileName, contents, null);
}

public DatasetFileContents(
final String fileName,
final File contents,
final Long numRecords) {
super(fileName);
LOG.info("Created new DatasetFileContents object pointing at file: " + contents.getAbsolutePath());
this.contents = contents;
this.owned = false;
this.numRecords = numRecords;
}

DatasetFileContents(
Expand All @@ -63,6 +77,7 @@ public DatasetFileContents(
tmp.deleteOnExit();
this.owned = true;
this.contents = tmp;
this.numRecords = null;
}

/**
Expand Down Expand Up @@ -126,4 +141,11 @@ private static String genChecksum(final File file) {
throw new WdkRuntimeException(e);
}
}

@Override
public long getEstimatedRowCount() {
return numRecords != null
? numRecords
: (contents.length() / ESTIMATED_BYTES_PER_ID) + 1; // round up
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.List;

public class DatasetListContents extends DatasetContents {

private final List<String> idList;
private String checksum;

Expand All @@ -15,6 +16,7 @@ public DatasetListContents(final List<String> idList) {
this.idList = idList;
}


@Override
public String getChecksum() {
if (checksum != null)
Expand Down Expand Up @@ -113,4 +115,9 @@ private void inc() {
done = true;
}
}

@Override
public long getEstimatedRowCount() {
return idList.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.gusdb.fgputil.EncryptionUtil;

public class DatasetStringContents extends DatasetContents {

private final String contents;

public DatasetStringContents(final String fileName, final String contents) {
Expand All @@ -22,4 +23,9 @@ public String getChecksum() {
public Reader getContentReader() {
return new StringReader(contents);
}

@Override
public long getEstimatedRowCount() {
return (contents.length() / ESTIMATED_CHARS_PER_ID) + 1; // round up
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.gusdb.wdk.model.record;

import java.util.Iterator;

public interface PrimaryKeyIterator extends Iterator<String[]>, AutoCloseable {

// no additional methods

}
Loading