Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
georgew5656 committed Nov 25, 2024
1 parent 64e3851 commit 1c05c2f
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
throw DruidException
.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.SERVICE_UNAVAILABLE)
.build("Cannot append segments to [%s] right now." +
.build("Cannot append segments to [%s] right now. " +
"There might be another task waiting to publish its segments. Check the overlord logs for details.",
dataSourceToInsert
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,45 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.LockResult;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TimeChunkLockRequest;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.assertj.core.api.Assertions;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;

public class SegmentTransactionalInsertActionTest
import javax.annotation.Nullable;
import java.util.Map;

@RunWith(EasyMockRunner.class)
public class SegmentTransactionalInsertActionTest extends EasyMockSupport
{
@Rule
public TaskActionTestKit actionTestKit = new TaskActionTestKit();
Expand Down Expand Up @@ -86,6 +106,12 @@ public class SegmentTransactionalInsertActionTest
1024
);

@Mock
private TaskActionToolbox taskActionToolbox;

@Mock
SupervisorManager supervisorManager;

private LockResult acquireTimeChunkLock(TaskLockType lockType, Task task, Interval interval, long timeoutMs)
throws InterruptedException
{
Expand Down Expand Up @@ -175,4 +201,87 @@ public void testFailBadVersion() throws Exception
);
Assert.assertTrue(exception.getMessage().contains("are not covered by locks"));
}

@Test
public void testStreamingTaskNotPublishable() throws Exception
{
// Mocking the config classes because they have a lot of logic in their constructors that we don't really want here.
SeekableStreamIndexTaskTuningConfig taskTuningConfig = EasyMock.createMock(SeekableStreamIndexTaskTuningConfig.class);
SeekableStreamIndexTaskIOConfig taskIOConfig = EasyMock.createMock(SeekableStreamIndexTaskIOConfig.class);

final SeekableStreamIndexTask streamingTask = new TestSeekableStreamIndexTask(
"id1",
null,
DataSchema.builder().withDataSource(DATA_SOURCE).build(),
taskTuningConfig,
taskIOConfig,
ImmutableMap.of(),
"0"
);

EasyMock.expect(taskActionToolbox.getSupervisorManager()).andReturn(supervisorManager);
EasyMock.expect(taskActionToolbox.getTaskLockbox()).andReturn(actionTestKit.getTaskLockbox());
EasyMock.expect(supervisorManager.canPublishSegments(EasyMock.anyString(), EasyMock.anyInt(), EasyMock.anyString()))
.andReturn(false);

actionTestKit.getTaskLockbox().add(streamingTask);
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, streamingTask, INTERVAL, 5000);
replayAll();

DruidException druidException = Assert.assertThrows(DruidException.class, () -> SegmentTransactionalInsertAction.appendAction(
ImmutableSet.of(SEGMENT1),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableList.of(1)),
null
).perform(
streamingTask,
taskActionToolbox
));
verifyAll();

Assert.assertEquals(503, druidException.getStatusCode());
}

private static class TestSeekableStreamIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity>
{
public TestSeekableStreamIndexTask(
String id,
@Nullable TaskResource taskResource,
DataSchema dataSchema,
SeekableStreamIndexTaskTuningConfig tuningConfig,
SeekableStreamIndexTaskIOConfig<String, String> ioConfig,
@Nullable Map<String, Object> context,
@Nullable String groupId
)
{
super(
id,
taskResource,
dataSchema,
tuningConfig,
ioConfig,
context,
groupId
);
}

@Override
protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> createTaskRunner()
{
return null;
}

@Override
protected RecordSupplier<String, String, ByteEntity> newTaskRecordSupplier(final TaskToolbox toolbox)
{
return null;
}

@Override
public String getType()
{
return "test";
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,52 @@ public void testHandoffTaskGroupsEarlyOnNonStreamSupervisor()
verifyAll();
}

@Test
public void testCanPublishSegments_returnsFalse()
{
String taskId = "id1";
String supervisorId = "supervisor-id";
Integer groupId = 1;
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
supervisorId, new TestSupervisorSpec(supervisorId, supervisor1)
);

EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
EasyMock.expect(supervisor1.canPublishSegments(groupId, taskId)).andReturn(false);

replayAll();

manager.start();

Assert.assertFalse(manager.canPublishSegments(supervisorId, groupId, taskId));

verifyAll();
}

@Test
public void testCanPublishSegments_throwsException_returnsTrue()
{
String taskId = "id1";
String supervisorId = "supervisor-id";
Integer groupId = 1;
Map<String, SupervisorSpec> existingSpecs = ImmutableMap.of(
supervisorId, new TestSupervisorSpec(supervisorId, supervisor1)
);

EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
EasyMock.expect(supervisor1.canPublishSegments(groupId, taskId)).andThrow(new RuntimeException());

replayAll();

manager.start();

Assert.assertTrue(manager.canPublishSegments(supervisorId, groupId, taskId));

verifyAll();
}

@Test
public void testStartAlreadyStarted()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,69 @@ public int getActiveTaskGroupsCount()
ex.getMessage()
);
}

@Test
public void testDefaultCanPublishSegments()
{
// Create an instance of stream supervisor without overriding handoffTaskGroupsEarly().
final StreamSupervisor streamSupervisor = new StreamSupervisor()
{

@Override
public void start()
{

}

@Override
public void stop(boolean stopGracefully)
{

}

@Override
public SupervisorReport getStatus()
{
return null;
}

@Override
public SupervisorStateManager.State getState()
{
return null;
}

@Override
public void reset(@Nullable DataSourceMetadata dataSourceMetadata)
{

}

@Override
public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{

}

@Override
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{

}

@Override
public LagStats computeLagStats()
{
return null;
}

@Override
public int getActiveTaskGroupsCount()
{
return 0;
}
};

Assert.assertTrue(streamSupervisor.canPublishSegments(1, "taskId"));
}
}

0 comments on commit 1c05c2f

Please sign in to comment.