Skip to content

Commit

Permalink
nbworker add feature to reroute y-flow for switch/link evacuation pro…
Browse files Browse the repository at this point in the history
…cess
  • Loading branch information
IvanChupin committed Nov 14, 2024
1 parent 6a85e5d commit 3fbeb00
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,14 @@ public FlowPath(@NonNull PathId pathId, @NonNull Switch srcSwitch, @NonNull Swit
long latency, long bandwidth,
boolean ignoreBandwidth, FlowPathStatus status, List<PathSegment> segments,
Set<FlowApplication> applications,
String sharedBandwidthGroupId, HaFlowPath haFlowPath) {
String sharedBandwidthGroupId, HaFlowPath haFlowPath, Flow flow) {
data = FlowPathDataImpl.builder().pathId(pathId).srcSwitch(srcSwitch).destSwitch(destSwitch)
.cookie(cookie).meterId(meterId).ingressMirrorGroupId(ingressMirrorGroupId)
.latency(latency).bandwidth(bandwidth)
.ignoreBandwidth(ignoreBandwidth).status(status)
.applications(applications)
.sharedBandwidthGroupId(sharedBandwidthGroupId).haFlowPath(haFlowPath)
.flow(flow)
.build();
// The reference is used to link path segments back to the path. See {@link #setSegments(List)}.
((FlowPathDataImpl) data).flowPath = this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import org.openkilda.messaging.Message;
import org.openkilda.messaging.MessageData;
import org.openkilda.messaging.command.flow.FlowRerouteRequest;
import org.openkilda.messaging.command.BaseRerouteRequest;
import org.openkilda.messaging.command.switches.SwitchValidateRequest;
import org.openkilda.messaging.error.ErrorData;
import org.openkilda.wfm.CommandContext;
Expand All @@ -38,7 +38,7 @@ protected void handleInput(Tuple input) throws Exception {
CommandContext commandContext = pullContext(input);
Message message = wrap(commandContext, payload);

if (payload instanceof FlowRerouteRequest) {
if (payload instanceof BaseRerouteRequest) {
getOutput().emit(input.getSourceStreamId(), input, new Values(message));
} else if (payload instanceof SwitchValidateRequest) {
getOutput().emit(input.getSourceStreamId(), input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.openkilda.messaging.command.flow.FlowRequest;
import org.openkilda.messaging.command.flow.FlowRerouteRequest;
import org.openkilda.messaging.command.haflow.HaFlowRerouteRequest;
import org.openkilda.messaging.command.yflow.YFlowRerouteRequest;
import org.openkilda.messaging.error.ErrorType;
import org.openkilda.messaging.error.InvalidFlowException;
import org.openkilda.messaging.error.MessageException;
Expand Down Expand Up @@ -83,6 +84,7 @@
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import net.jodah.failsafe.RetryPolicy;
import org.apache.commons.lang3.StringUtils;

import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -713,10 +715,20 @@ public List<BaseRerouteRequest> makeRerouteRequests(
}
}
} else {
if (processed.add(flow.getFlowId())) {
FlowRerouteRequest request = new FlowRerouteRequest(
flow.getFlowId(), false, false, affectedIslEndpoints, reason, false);
results.add(request);
if (StringUtils.isNotBlank(flow.getYFlowId())) {
if (yFlowRepository.exists(flow.getYFlowId())) {
if (processed.add(flow.getYFlowId())) {
YFlowRerouteRequest req = new YFlowRerouteRequest(flow.getYFlowId(), affectedIslEndpoints,
reason, false);
results.add(req);
}
}
} else {
if (processed.add(flow.getFlowId())) {
FlowRerouteRequest request = new FlowRerouteRequest(
flow.getFlowId(), false, false, affectedIslEndpoints, reason, false);
results.add(request);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.openkilda.messaging.command.BaseRerouteRequest;
import org.openkilda.messaging.command.flow.FlowRequest;
import org.openkilda.messaging.command.flow.FlowRerouteRequest;
import org.openkilda.messaging.command.yflow.YFlowRerouteRequest;
import org.openkilda.messaging.error.InvalidFlowException;
import org.openkilda.messaging.info.InfoData;
import org.openkilda.messaging.model.FlowPatch;
Expand All @@ -39,12 +42,15 @@
import org.openkilda.model.Switch;
import org.openkilda.model.SwitchId;
import org.openkilda.model.SwitchStatus;
import org.openkilda.model.YFlow;
import org.openkilda.model.YFlow.SharedEndpoint;
import org.openkilda.model.cookie.FlowSegmentCookie;
import org.openkilda.persistence.inmemory.InMemoryGraphBasedTest;
import org.openkilda.persistence.repositories.FlowPathRepository;
import org.openkilda.persistence.repositories.FlowRepository;
import org.openkilda.persistence.repositories.PathSegmentRepository;
import org.openkilda.persistence.repositories.SwitchRepository;
import org.openkilda.persistence.repositories.YFlowRepository;
import org.openkilda.wfm.error.FlowNotFoundException;
import org.openkilda.wfm.error.SwitchNotFoundException;
import org.openkilda.wfm.share.flow.TestFlowBuilder;
Expand All @@ -71,6 +77,7 @@ public class FlowOperationsServiceTest extends InMemoryGraphBasedTest {
private static final String FLOW_ID_1 = "flow_1";
private static final String FLOW_ID_2 = "flow_2";
private static final String FLOW_ID_3 = "flow_3";
private static final String Y_FLOW_ID_1 = "y_flow_1";
private static final PathId FORWARD_PATH_1 = new PathId("forward_path_1");
private static final PathId FORWARD_PATH_2 = new PathId("forward_path_2");
private static final PathId FORWARD_PATH_3 = new PathId("forward_path_3");
Expand All @@ -91,6 +98,7 @@ public class FlowOperationsServiceTest extends InMemoryGraphBasedTest {

private static FlowOperationsService flowOperationsService;
private static FlowRepository flowRepository;
private static YFlowRepository yFlowRepository;
private static FlowPathRepository flowPathRepository;
private static PathSegmentRepository pathSegmentRepository;
private static SwitchRepository switchRepository;
Expand All @@ -103,6 +111,7 @@ public class FlowOperationsServiceTest extends InMemoryGraphBasedTest {
@BeforeAll
public static void setUpOnce() {
flowRepository = persistenceManager.getRepositoryFactory().createFlowRepository();
yFlowRepository = persistenceManager.getRepositoryFactory().createYFlowRepository();
flowPathRepository = persistenceManager.getRepositoryFactory().createFlowPathRepository();
pathSegmentRepository = persistenceManager.getRepositoryFactory().createPathSegmentRepository();
switchRepository = persistenceManager.getRepositoryFactory().createSwitchRepository();
Expand Down Expand Up @@ -789,7 +798,7 @@ public void whenFlowWithMaxLatency_patchFlowWithLatencyTier2OnlyTest()
}

@Test
void whenPartialUpdate_dumpBeforeAndDumpAfterIsSaved() throws FlowNotFoundException, InvalidFlowException {
public void whenPartialUpdate_dumpBeforeAndDumpAfterIsSaved() throws FlowNotFoundException, InvalidFlowException {
Flow createdFlow = createFlow(FLOW_ID_1, switchA, 1, switchC, 2,
FORWARD_PATH_1, REVERSE_PATH_1, switchB, false,
100_500L, 0L);
Expand All @@ -815,7 +824,7 @@ void whenPartialUpdate_dumpBeforeAndDumpAfterIsSaved() throws FlowNotFoundExcept
}

@Test
void whenFullUpdateIsRequired_historyActionIsSaved() throws FlowNotFoundException, InvalidFlowException {
public void whenFullUpdateIsRequired_historyActionIsSaved() throws FlowNotFoundException, InvalidFlowException {
Flow createdFlow = createFlow(FLOW_ID_1, switchA, 1, switchC, 2,
FORWARD_PATH_1, REVERSE_PATH_1, switchB, false,
100_500L, 0L);
Expand All @@ -839,6 +848,91 @@ void whenFullUpdateIsRequired_historyActionIsSaved() throws FlowNotFoundExceptio
assertEquals(action, carrier.getHistoryHolderList().get(2).getFlowHistoryData().getAction());
}

@Test
public void makeRerouteRequests() {
YFlow yFlow = buildYFlow(Y_FLOW_ID_1, switchA, 1, switchD);
yFlowRepository.add(yFlow);

Flow ySubflow1 = buildFlow(null, Y_FLOW_ID_1, switchA, 1, 10,
switchB, 2, 11, "subFlow1", yFlow);
FlowPath yFlowForwardPath1 = FlowPath.builder()
.pathId(new PathId("subPath1"))
.srcSwitch(switchA)
.destSwitch(switchB)
.flow(ySubflow1)
.build();

Flow ySubflow2 = buildFlow(null, Y_FLOW_ID_1, switchA, 1, 20,
switchC, 2, 22, "subFlow2", yFlow);
FlowPath yFlowForwardPath2 = FlowPath.builder()
.pathId(new PathId("subPath2"))
.srcSwitch(switchA)
.destSwitch(switchB)
.flow(ySubflow2)
.build();

Flow flow1 = buildFlow(FLOW_ID_1, null, switchA, 1, 100, switchB,
2, 111, "regularFlow", null);
FlowPath flowPath1 = FlowPath.builder()
.pathId(new PathId("path1"))
.srcSwitch(switchA)
.destSwitch(switchB)
.flow(flow1)
.build();

List<FlowPath> flowPaths = Arrays.asList(yFlowForwardPath1, flowPath1, yFlowForwardPath2);

// 3 flow path: 1 for regular flow and 2 for y-flow
List<BaseRerouteRequest> actualResult =
flowOperationsService.makeRerouteRequests(flowPaths, new HashSet<>(), "Great reason to reroute");

Assertions.assertEquals(2, actualResult.size());
Assertions.assertInstanceOf(YFlowRerouteRequest.class, actualResult.get(0));
Assertions.assertEquals(Y_FLOW_ID_1, actualResult.get(0).getFlowId());
Assertions.assertInstanceOf(FlowRerouteRequest.class, actualResult.get(1));
Assertions.assertEquals(FLOW_ID_1, actualResult.get(1).getFlowId());

// y-flow does not exist in the repository
transactionManager.doInTransaction(() -> yFlowRepository.remove(yFlow));
actualResult = flowOperationsService.makeRerouteRequests(flowPaths, new HashSet<>(), "Great reason to reroute");
Assertions.assertEquals(1, actualResult.size());
Assertions.assertInstanceOf(FlowRerouteRequest.class, actualResult.get(0));
Assertions.assertEquals(FLOW_ID_1, actualResult.get(0).getFlowId());
}

private YFlow buildYFlow(String yFlowId, Switch sharedEndpoint, int portNumber, Switch yPoint) {
return YFlow.builder()
.yFlowId(yFlowId)
.sharedEndpoint(new SharedEndpoint(sharedEndpoint.getSwitchId(), portNumber))
.yPoint(yPoint.getSwitchId())
.status(FlowStatus.UP)
.build();
}

private Flow buildFlow(String flowId, String yflowId, Switch srcSwitch, int srcPort, int srcVlan, Switch destSwitch,
int destPort, int destVlan, String desc, YFlow yflow) {
TestFlowBuilder builder = new TestFlowBuilder()
.yFlow(yflow)
.srcSwitch(srcSwitch)
.srcPort(srcPort)
.srcVlan(srcVlan)
.destSwitch(destSwitch)
.destPort(destPort)
.destVlan(destVlan)
.encapsulationType(FlowEncapsulationType.TRANSIT_VLAN)
.pathComputationStrategy(PathComputationStrategy.COST)
.description(desc)
.status(FlowStatus.UP);

if (flowId != null) {
builder.flowId(flowId);
}
if (yflowId != null) {
builder.yFlowId(yflowId);
}
return builder.build();
}

private void assertFlows(Collection<Flow> actualFlows, String... expectedFlowIds) {
assertEquals(expectedFlowIds.length, actualFlows.size());
assertEquals(new HashSet<>(Arrays.asList(expectedFlowIds)),
Expand Down

0 comments on commit 3fbeb00

Please sign in to comment.