Skip to content

Commit

Permalink
Merge pull request ballerina-platform#42478 from lochana-chathura/fix…
Browse files Browse the repository at this point in the history
…-worker

[Master] Fix worker receive when the receiver successfully completes prior to the sync send
  • Loading branch information
lochana-chathura authored Apr 16, 2024
2 parents 7024961 + e958607 commit b2c6457
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ public enum DiagnosticErrorCode implements DiagnosticCode {
INVALID_USE_OF_EXPERIMENTAL_FEATURE("BCE3843", "invalid.use.of.experimental.feature"),
MULTIPLE_RECEIVE_COMPATIBLE_TYPE_NOT_FOUND("BCE3844", "multiple.receive.compatible.type.not.found"),
DUPLICATE_KEY_IN_MULTIPLE_RECEIVE("BCE3845", "duplicate.key.in.multiple.receive"),
WORKER_RECEIVE_AFTER_NON_ERROR_RETURN("BCE3846", "worker.receive.after.non.error.return"),

// LangLib related error codes.
TYPE_PARAM_OUTSIDE_LANG_MODULE("BCE3900", "type.param.outside.lang.module"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1966,12 +1966,14 @@ private boolean isReceiveAllowedLocation(BLangFunctionBody enclInvokableBody, BL
}

private boolean isSendAllowedContext(BLangNode bLangNode) {
return isReceiveAllowedContext(bLangNode) || bLangNode.getKind() == NodeKind.IF;
return isReceiveAllowedContext(bLangNode) ||
bLangNode.getKind() == NodeKind.IF ||
bLangNode.getKind() == NodeKind.ON_FAIL;
}

private boolean isReceiveAllowedContext(BLangNode bLangNode) {
return switch (bLangNode.getKind()) {
case BLOCK_FUNCTION_BODY, BLOCK, ON_FAIL, DO_STMT -> true;
case BLOCK_FUNCTION_BODY, BLOCK, DO_STMT -> true;
default -> false;
};
}
Expand Down Expand Up @@ -2168,35 +2170,36 @@ public void visit(BLangMultipleWorkerReceive multipleWorkerReceive, AnalyzerData
@Override
public void visit(BLangWorkerReceive workerReceiveNode, AnalyzerData data) {
// Validate worker receive
validateActionParentNode(workerReceiveNode.pos, workerReceiveNode);
Location workerReceivePos = workerReceiveNode.pos;
validateActionParentNode(workerReceivePos, workerReceiveNode);
BSymbol sender =
symResolver.lookupSymbolInMainSpace(data.env, names.fromIdNode(workerReceiveNode.workerIdentifier));
if ((sender.tag & SymTag.VARIABLE) != SymTag.VARIABLE) {
sender = symTable.notFoundSymbol;
}
verifyPeerCommunication(workerReceiveNode.pos, sender, workerReceiveNode.workerIdentifier.value, data.env);
verifyPeerCommunication(workerReceivePos, sender, workerReceiveNode.workerIdentifier.value, data.env);

WorkerActionSystem was = data.workerActionSystemStack.peek();

if (data.withinLockBlock) {
this.dlog.error(workerReceiveNode.pos,
this.dlog.error(workerReceivePos,
DiagnosticErrorCode.WORKER_RECEIVE_ACTION_NOT_ALLOWED_IN_LOCK_STATEMENT);
was.hasErrors = true;
}

String workerName = workerReceiveNode.workerIdentifier.getValue();
if (data.withinQuery || (!isReceiveAllowedLocation(data.env.enclInvokable.body, data.env.node) &&
!data.inInternallyDefinedBlockStmt)) {
this.dlog.error(workerReceiveNode.pos, DiagnosticErrorCode.INVALID_WORKER_RECEIVE_POSITION);
this.dlog.error(workerReceivePos, DiagnosticErrorCode.INVALID_WORKER_RECEIVE_POSITION);
was.hasErrors = true;
}

if (!this.workerExists(workerReceiveNode.workerType, workerName, data.env)) {
this.dlog.error(workerReceiveNode.pos, DiagnosticErrorCode.UNDEFINED_WORKER, workerName);
this.dlog.error(workerReceivePos, DiagnosticErrorCode.UNDEFINED_WORKER, workerName);
was.hasErrors = true;
}

workerReceiveNode.matchingSendsError = createAccumulatedErrorTypeForMatchingSyncSend(data);
workerReceiveNode.matchingSendsError = createAccumulatedErrorTypeForMatchingSyncSend(data, workerReceivePos);
was.addWorkerAction(workerReceiveNode);
}

Expand Down Expand Up @@ -2234,12 +2237,23 @@ private void verifyPeerCommunication(Location pos, BSymbol otherWorker, String o
}
}

public BType createAccumulatedErrorTypeForMatchingSyncSend(AnalyzerData data) {
public BType createAccumulatedErrorTypeForMatchingSyncSend(AnalyzerData data, Location workerReceivePos) {
LinkedHashSet<BType> returnTypesUpToNow = data.returnTypes.peek();
LinkedHashSet<BType> returnTypeAndSendType = new LinkedHashSet<>();

boolean hasNonErrorReturn = false;
for (BType returnType : returnTypesUpToNow) {
addErrorTypesToSet(returnType, returnTypeAndSendType);
if (hasNonErrorType(returnType)) {
hasNonErrorReturn = true;
} else {
returnTypeAndSendType.add(returnType);
}
}

if (hasNonErrorReturn) {
dlog.error(workerReceivePos, DiagnosticErrorCode.WORKER_RECEIVE_AFTER_NON_ERROR_RETURN);
}

returnTypeAndSendType.add(symTable.nilType);
if (returnTypeAndSendType.size() > 1) {
return BUnionType.create(null, returnTypeAndSendType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ error.self.reference.var=\
error.worker.send.after.return=\
invalid worker send statement position, can not be used after a non-error return

error.worker.receive.after.return=\
error.worker.receive.after.non.error.return=\
invalid worker receive statement position, can not be used after a non-error return

error.unsupported.worker.send.position=\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public void testSendReceiveAllowedSyntacticPositions() {
"possible deadlocks", 74, 17);
validateError(result, index++, receiveNotAllowedError, 79, 15);
validateError(result, index++, receiveNotAllowedError, 81, 21);
validateError(result, index++, receiveNotAllowedError, 93, 13);
validateError(result, index++, receiveNotAllowedError, 94, 16);
validateError(result, index++, receiveNotAllowedError, 94, 19);
Assert.assertEquals(result.getErrorCount(), index);
}

Expand Down Expand Up @@ -95,9 +98,16 @@ public void invalidWorkReceiveWithoutWorker() {

@Test
public void invalidReceiveBeforeWorkers() {
CompileResult result = BCompileUtil.compile("test-src/workers/invalid-receive-before-workers.bal");
Assert.assertEquals(result.getErrorCount(), 1);
validateError(result, 0, "undefined worker 'w1'", 2, 11);
CompileResult result = BCompileUtil.compile("test-src/workers/invalid-receive-with-workers.bal");
int index = 0;
validateError(result, index++, "undefined worker 'w1'", 18, 11);
validateError(result, index++, "invalid worker receive statement position, can not be used after a non-error " +
"return", 40, 17);
validateError(result, index++, "invalid worker receive statement position, can not be used after a non-error " +
"return", 59, 20);
validateError(result, index++, "invalid worker receive statement position, can not be used after a non-error " +
"return", 59, 23);
Assert.assertEquals(result.getErrorCount(), index);
}

@Test
Expand Down Expand Up @@ -128,6 +138,16 @@ public void testSendReceiveFailureType() {
validateError(result, index++, "incompatible types: expected 'int', found '(ErrorA|ErrorB|int)'", 71, 15);
validateError(result, index++, "incompatible types: expected 'int', " +
"found '(ErrorA|ErrorB|int|ballerina/lang.error:0.0.0:NoMessage)'", 86, 15);
validateError(result, index++, "invalid worker receive statement position, can not be used after a non-error " +
"return", 99, 14);
validateError(result, index++, "invalid worker receive statement position, can not be used after a non-error " +
"return", 104, 14);
validateError(result, index++, "invalid worker receive statement position, can not be used after a non-error " +
"return", 109, 14);
validateError(result, index++, "invalid worker receive statement position, can not be used after a non-error " +
"return", 110, 14);
validateError(result, index++, "invalid worker receive statement position, can not be used after a non-error " +
"return", 111, 14);
validateError(result, index++, "incompatible types: expected '()', found 'ErrorA?'", 119, 14);
validateError(result, index++, "incompatible types: expected '()', found '(ErrorA|ErrorB)?'", 120, 14);
Assert.assertEquals(result.getErrorCount(), index);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2024 WSO2 LLC. (http://www.wso2.com).
//
// WSO2 LLC. licenses this file to you under the Apache License,
// Version 2.0 (the "License"); you may not use this file except
// in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

function invalidWorkerReceiveBeforeWorker() {
int _ = <- w1;
worker w1 {
int i = 1;
i -> function;
}
}

function workerReceiveAfterNonErrorReturn1() {
worker w1 {
1 ->> w2;
2 ->> w2;
}

worker w2 {
boolean b = true;

int _ = <- w1;

if b {
return;
}

int _ = <- w1;
}

wait w1;
}

function workerReceiveAfterNonErrorReturn2() {
worker w1 {
1 ->> w2;
2 ->> w2;
}

worker w2 {
boolean b = true;

if b {
return;
}

int _ = <- w1|w1;
}

wait w1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ public function testReceiveAllowedLocations(boolean b) returns error? {
_ = <- function; // OK
}
} on fail {
_ = <- function; // OK
_ = <- function; // error: position not allowed
_ = <- w2|function; // error: position not allowed
}

worker w2 {
1 -> w1;
}

1 -> w1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ function workerConditionalSendTest() {
2 -> w5;
}
}

worker w5 returns int|errorLib:NoMessage {
int|errorLib:NoMessage d = <- w4;
return d;
Expand Down Expand Up @@ -240,8 +240,8 @@ function sameWorkerSendMultiplePath1() {
worker w1 {
if foo {
1 -> w2;
}
2 -> w2;
}
2 -> w2;
}

worker w2 returns int|errorLib:NoMessage {
Expand All @@ -259,8 +259,8 @@ function sameWorkerSendMultiplePath2() {
worker w1 {
if foo {
1 -> w2;
}
2 -> w2;
}
2 -> w2;
}

worker w2 returns int|errorLib:NoMessage {
Expand All @@ -282,8 +282,8 @@ function sameWorkerSendMultiplePathError1() {
return error("Error in worker 1");
}
1 -> w2;
}
2 -> w2;
}
2 -> w2;
}

worker w2 returns int|error? {
Expand All @@ -303,11 +303,11 @@ function sameWorkerSendMultiplePathError2() {
int value = 10;
if foo {
1 -> w2;
}
}
if value == 10 {
return error("Error in worker 1");
}
2 -> w2;
2 -> w2;
}

worker w2 returns int|error {
Expand All @@ -334,15 +334,15 @@ function sameWorkerSendMultiplePathError3() {
return error("Error in worker 1");
}
1 -> w2;
}
2 -> w2;
}
2 -> w2;
}

worker w2 returns int|error? {
int|error a = <- w1 | w1;
return a;
}

map<error|int?> mapResult = wait { a : w1, b: w2};
test:assertTrue(mapResult["a"] is error, "Invalid error result");
test:assertTrue(mapResult["b"] is error, "Invalid error result");
Expand All @@ -359,11 +359,11 @@ function sameWorkerSendMultiplePathError4() {
int value = 10;
if foo {
1 -> w2;
}
}
if value == 10 {
return error("Error in worker 1");
}
2 -> w2;
2 -> w2;
}

worker w2 returns int|error {
Expand All @@ -387,18 +387,16 @@ function multipleReceiveConditional() {
} else {
2-> w3;
}
int y = <- w3;
return y;
return 3;
}

worker w2 returns int|errorLib:NoMessage {
int|errorLib:NoMessage y = <- w1;
return y;
}

worker w3 returns int|errorLib:NoMessage {
int|errorLib:NoMessage y = <- w1;
3 -> w1;
return y;
}

Expand Down

0 comments on commit b2c6457

Please sign in to comment.