Skip to content

Commit

Permalink
projection segment merge fixes (#17460)
Browse files Browse the repository at this point in the history
changes:
* fix issue when merging projections from multiple-incremental persists which was hoping that some 'dim conversion' buffers were not closed, but they already were (by the merging iterator). fix involves selectively persisting these conversion buffers to temp files in the segment write out directory and mapping them and tying them to the segment level closer so that they are available after the lifetime of the parent merger
* modify auto column serializers to use segment write out directory for temp files instead of java.io.tmpdir
* fix queryable index projection to not put the time-like column as a dimension, instead only adding it as __time
* use smoosh for temp files so can safely write any Serializer to a temp smoosh
  • Loading branch information
clintropolis authored Nov 16, 2024
1 parent 7f335ff commit 24a1faf
Show file tree
Hide file tree
Showing 32 changed files with 669 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

public class SerializerUtils
{

public <T extends OutputStream> void writeString(T out, String name) throws IOException
{
byte[] nameBytes = StringUtils.toUtf8(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
Expand Down Expand Up @@ -87,6 +88,8 @@ public class AutoTypeColumnMerger implements DimensionMergerV9
private boolean isVariantType = false;
private byte variantTypeByte = 0x00;

private final File segmentBaseDir;

/**
* @param name column name
* @param outputName output smoosh file name. if this is a base table column, it will be the equivalent to
Expand All @@ -105,6 +108,7 @@ public AutoTypeColumnMerger(
@Nullable ColumnType castToType,
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
File segmentBaseDir,
Closer closer
)
{
Expand All @@ -114,6 +118,7 @@ public AutoTypeColumnMerger(
this.castToType = castToType;
this.indexSpec = indexSpec;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.segmentBaseDir = segmentBaseDir;
this.closer = closer;
}

Expand Down Expand Up @@ -265,7 +270,7 @@ public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws I
);
}

serializer.openDictionaryWriter();
serializer.openDictionaryWriter(segmentBaseDir);
serializer.serializeFields(mergedFields);

int stringCardinality;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.collections.bitmap.ImmutableBitmap;
import org.apache.druid.collections.bitmap.MutableBitmap;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.io.smoosh.FileSmoosher;
import org.apache.druid.java.util.common.io.smoosh.SmooshedFileMapper;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.filter.DruidPredicateFactory;
import org.apache.druid.query.filter.ValueMatcher;
Expand All @@ -51,13 +54,21 @@
import org.apache.druid.segment.data.V3CompressedVSizeColumnarMultiIntsSerializer;
import org.apache.druid.segment.data.VSizeColumnarIntsSerializer;
import org.apache.druid.segment.data.VSizeColumnarMultiIntsSerializer;
import org.apache.druid.segment.serde.ColumnSerializerUtils;
import org.apache.druid.segment.serde.Serializer;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -101,6 +112,16 @@ public abstract class DictionaryEncodedColumnMerger<T extends Comparable<T>> imp
@Nullable
protected T firstDictionaryValue;

protected File segmentBaseDir;

/**
* This becomes non-null if {@link #markAsParent()} is called indicating that this column is a base table 'parent'
* to some projection column, which requires persisting id conversion buffers to a temporary files. If there are no
* projections defined (or projections which reference this column) then id conversion buffers will be freed after
* calling {@link #writeIndexes(List)}
*/
@MonotonicNonNull
protected PersistedIdConversions persistedIdConversions;

public DictionaryEncodedColumnMerger(
String dimensionName,
Expand All @@ -109,6 +130,7 @@ public DictionaryEncodedColumnMerger(
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
File segmentBaseDir,
Closer closer
)
{
Expand All @@ -118,8 +140,8 @@ public DictionaryEncodedColumnMerger(
this.capabilities = capabilities;
this.segmentWriteOutMedium = segmentWriteOutMedium;
this.nullRowsBitmap = indexSpec.getBitmapSerdeFactory().getBitmapFactory().makeEmptyMutableBitmap();

this.progress = progress;
this.segmentBaseDir = segmentBaseDir;
this.closer = closer;
}

Expand All @@ -129,6 +151,19 @@ public DictionaryEncodedColumnMerger(
@Nullable
protected abstract T coerceValue(T value);

@Override
public void markAsParent()
{
final File tmpOutputFilesDir = new File(segmentBaseDir, "tmp_" + outputName + "_merger");
try {
FileUtils.mkdirp(tmpOutputFilesDir);
}
catch (IOException e) {
throw new RuntimeException(e);
}
persistedIdConversions = closer.register(new PersistedIdConversions(tmpOutputFilesDir));
}

@Override
public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws IOException
{
Expand Down Expand Up @@ -192,7 +227,18 @@ public void writeMergedValueDictionary(List<IndexableAdapter> adapters) throws I
writeDictionary(() -> dictionaryMergeIterator);
for (int i = 0; i < adapters.size(); i++) {
if (dimValueLookups[i] != null && dictionaryMergeIterator.needConversion(i)) {
dimConversions.set(i, dictionaryMergeIterator.conversions[i]);
final IntBuffer conversionBuffer;
if (persistedIdConversions != null) {
// if we are a projection parent column, persist the id mapping buffer so that child mergers have access
// to the mappings during serialization to adjust their dictionary ids as needed when serializing
conversionBuffer = persistedIdConversions.map(
dimensionName + "_idConversions_" + i,
dictionaryMergeIterator.conversions[i]
);
} else {
conversionBuffer = dictionaryMergeIterator.conversions[i];
}
dimConversions.set(i, conversionBuffer);
}
}
cardinality = dictionaryMergeIterator.getCardinality();
Expand Down Expand Up @@ -702,4 +748,117 @@ protected interface ExtendedIndexesMerger
void mergeIndexes(int dictId, MutableBitmap mergedIndexes) throws IOException;
void write() throws IOException;
}

protected static class IdConversionSerializer implements Serializer
{
private final IntBuffer buffer;
private final ByteBuffer scratch;

protected IdConversionSerializer(IntBuffer buffer)
{
this.buffer = buffer.asReadOnlyBuffer();
this.buffer.position(0);
this.scratch = ByteBuffer.allocate(Integer.BYTES).order(ByteOrder.nativeOrder());
}

@Override
public long getSerializedSize()
{
return (long) buffer.capacity() * Integer.BYTES;
}

@Override
public void writeTo(WritableByteChannel channel, FileSmoosher smoosher) throws IOException
{
// currently no support for id conversion buffers larger than 2gb
buffer.position(0);
while (buffer.remaining() > 0) {
scratch.position(0);
scratch.putInt(buffer.get());
scratch.flip();
channel.write(scratch);
}
}
}

/**
* Closer of {@link PersistedIdConversion} and a parent path which they are stored in for easy cleanup when the
* segment is closed.
*/
protected static class PersistedIdConversions implements Closeable
{
private final File tempDir;
private final Closer closer;

protected PersistedIdConversions(File tempDir)
{
this.tempDir = tempDir;
this.closer = Closer.create();
}

@Nullable
public IntBuffer map(String name, IntBuffer intBuffer) throws IOException
{
final File bufferDir = new File(tempDir, name);
FileUtils.mkdirp(bufferDir);
final IdConversionSerializer serializer = new IdConversionSerializer(intBuffer);
return closer.register(new PersistedIdConversion(bufferDir, serializer)).getBuffer();
}

@Override
public void close() throws IOException
{
try {
closer.close();
}
finally {
FileUtils.deleteDirectory(tempDir);
}
}
}

/**
* Peristent dictionary id conversion mappings, artifacts created during segment merge which map old dictionary ids
* to new dictionary ids. These persistent mappings are only used when the id mapping needs a lifetime longer than
* the merge of the column itself, such as when the column being merged is a 'parent' column of a projection.
*
* @see DimensionMergerV9#markAsParent()
* @see DimensionMergerV9#attachParent(DimensionMergerV9, List)
*/
protected static class PersistedIdConversion implements Closeable
{
private final File idConversionFile;
private final SmooshedFileMapper bufferMapper;
private final IntBuffer buffer;

private boolean isClosed;

protected PersistedIdConversion(File idConversionDir, Serializer idConversionSerializer) throws IOException
{
this.idConversionFile = idConversionDir;
this.bufferMapper = ColumnSerializerUtils.mapSerializer(idConversionDir, idConversionSerializer, idConversionDir.getName());
final ByteBuffer mappedBuffer = bufferMapper.mapFile(idConversionDir.getName());
mappedBuffer.order(ByteOrder.nativeOrder());
this.buffer = mappedBuffer.asIntBuffer();
}

@Nullable
public IntBuffer getBuffer()
{
if (isClosed) {
return null;
}
return buffer;
}

@Override
public void close() throws IOException
{
if (!isClosed) {
isClosed = true;
bufferMapper.close();
FileUtils.deleteDirectory(idConversionFile);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
Expand All @@ -29,6 +30,7 @@
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;

import javax.annotation.Nullable;
import java.io.File;
import java.util.Comparator;

/**
Expand Down Expand Up @@ -100,6 +102,28 @@ default MultiValueHandling getMultivalueHandling()
*/
DimensionIndexer<EncodedType, EncodedKeyComponentType, ActualType> makeIndexer(boolean useMaxMemoryEstimates);

/**
* @deprecated use {@link #makeMerger(String, IndexSpec, SegmentWriteOutMedium, ColumnCapabilities, ProgressIndicator, File, Closer)}
*
* This method exists for backwards compatiblity with older versions of Druid since this is an unofficial extension
* point that must be implemented to create custom dimension types, and will be removed in a future release.
*/
@Deprecated
default DimensionMergerV9 makeMerger(
String outputName,
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
Closer closer
)
{
throw DruidException.defensive(
"this method is no longer supported, use makeMerger(String, IndexSpec, SegmentWriteOutMedium, ColumnCapabilities, ProgressIndicator, File, Closer) instead"
);
}


/**
* Creates a new DimensionMergerV9, a per-dimension object responsible for merging indexes/row data across segments
* and building the on-disk representation of a dimension. For use with IndexMergerV9 only.
Expand All @@ -113,16 +137,32 @@ default MultiValueHandling getMultivalueHandling()
* needed
* @param capabilities The ColumnCapabilities of the dimension represented by this DimensionHandler
* @param progress ProgressIndicator used by the merging process
* @param segmentBaseDir segment write out path; temporary files may be created here, though should delete
* after merge is finished OR be registered with the Closer parameter
* @param closer Closer tied to segment completion. Anything which is not cleaned up inside of the
* merger after merge is complete should be registered with this closer. For example,
* resources which are required for final serialization of the column
* @return A new DimensionMergerV9 object.
*/
DimensionMergerV9 makeMerger(
default DimensionMergerV9 makeMerger(
String outputName,
IndexSpec indexSpec,
SegmentWriteOutMedium segmentWriteOutMedium,
ColumnCapabilities capabilities,
ProgressIndicator progress,
File segmentBaseDir,
Closer closer
);
)
{
return makeMerger(
outputName,
indexSpec,
segmentWriteOutMedium,
capabilities,
progress,
closer
);
}

/**
* Given an key component representing a single set of row value(s) for this dimension as an Object,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,27 @@ public interface DimensionMergerV9 extends DimensionMerger
*/
ColumnDescriptor makeColumnDescriptor();

/**
* Sets this merger as the "parent" of another merger for a "projection", allowing for this merger to preserve any
* state which might be required for the projection mergers to do their thing. This method MUST be called prior to
* performing any merge work. Typically, this method is only implemented if
* {@link #attachParent(DimensionMergerV9, List)} requires it.
*/
default void markAsParent()
{
// do nothing
}

/**
* Attaches the {@link DimensionMergerV9} of a "projection" parent column so that stuff like value dictionaries can
* be shared between parent and child
* be shared between parent and child. This method is called during merging instead of {@link #writeMergedValueDictionary(List)} if
* the parent column exists.
*
* @see IndexMergerV9#makeProjections
*/
default void attachParent(DimensionMergerV9 parent, List<IndexableAdapter> projectionAdapters) throws IOException
{
// do nothing
// by default fall through to writing merged dictionary
writeMergedValueDictionary(projectionAdapters);
}
}
Loading

0 comments on commit 24a1faf

Please sign in to comment.