Skip to content

Commit

Permalink
Improve no_router test (#829)
Browse files Browse the repository at this point in the history
- Move common code to separate function
- Add stderr check
- Add liveliness
- Fix error code check
  • Loading branch information
sashacmc authored Dec 17, 2024
1 parent 896abd8 commit 9396627
Showing 1 changed file with 41 additions and 132 deletions.
173 changes: 41 additions & 132 deletions tests/no_router.py
Original file line number Diff line number Diff line change
@@ -1,155 +1,64 @@
import argparse
import os
from signal import SIGINT
import subprocess
import sys
import time

# Specify the directory for the binaries
DIR_EXAMPLES = "build/examples"
EXPECTED_STATUS = 255
EXPECTED_OUTPUT = """Opening session...
Unable to open session!"""


def test_all():
print("*** Pub & sub test ***")
test_status = 0

# Expected output & status
z_expected_status = 255
z_expected_output = '''Opening session...
Unable to open session!'''

print("Start subscriber")
# Start z_sub
z_sub_command = f"stdbuf -oL -eL ./{DIR_EXAMPLES}/z_sub"
z_sub_process = subprocess.Popen(
z_sub_command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
# Wait for z_sub to finish
z_sub_process.wait()

print("Start publisher")
# Start z_pub
z_pub_command = f"stdbuf -oL -eL ./{DIR_EXAMPLES}/z_pub"
z_pub_process = subprocess.Popen(
z_pub_command, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
# Wait for z_pub to finish
z_pub_process.wait()

print("Start queryable")
# Start z_queryable
z_queryable_command = f"stdbuf -oL -eL ./{DIR_EXAMPLES}/z_queryable"
z_queryable_process = subprocess.Popen(
z_queryable_command,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Wait for z_queryable to finish
z_queryable_process.wait()

print("Start query")
# Start z_query
z_query_command = f"stdbuf -oL -eL ./{DIR_EXAMPLES}/z_get"
z_query_process = subprocess.Popen(
z_query_command,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Wait for z_query to finish
z_query_process.wait()

print("Check publisher status & output")
# Check the exit status of z_pub
z_pub_status = z_pub_process.returncode
if z_pub_status == z_expected_status:
print("z_pub status valid")
else:
print(f"z_pub status invalid, expected: {z_expected_status}, received: {z_pub_status}")
test_status = 1
def run_binary_test(binary_name, expected_status, expected_output):
print(f"*** Testing {binary_name} ***")
command = [f"./{DIR_EXAMPLES}/{binary_name}"]

# Check output of z_pub
z_pub_output = z_pub_process.stdout.read()
if z_expected_output in z_pub_output:
print("z_pub output valid")
else:
print("z_pub output invalid:")
print(f"Expected: \"{z_expected_output}\"")
print(f"Received: \"{z_pub_output}\"")
test_status = 1

print("Check subscriber status & output")
# Check the exit status of z_sub
z_sub_status = z_sub_process.returncode
if z_sub_status == z_expected_status:
print("z_sub status valid")
else:
print(f"z_sub status invalid, expected: {z_expected_status}, received: {z_sub_status}")
test_status = 1
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
process.wait()

# Check output of z_sub
z_sub_output = z_sub_process.stdout.read()
if z_expected_output in z_sub_output:
print("z_sub output valid")
else:
print("z_sub output invalid:")
print(f"Expected: \"{z_expected_output}\"")
print(f"Received: \"{z_sub_output}\"")
test_status = 1

print("Check query status & output")
# Check the exit status of z_query
z_query_status = z_query_process.returncode
if z_query_status == z_expected_status:
print("z_query status valid")
# Check output
output = process.stdout.read()
if expected_output in output:
print(f"{binary_name} output valid")
else:
print(f"z_query status invalid, expected: {z_expected_status}," f" received: {z_query_status}")
test_status = 1
print(f'{binary_name} output invalid:\nExpected: "{expected_output}"\nReceived: "{output}"')
return False

# Check output of z_query
z_query_output = z_query_process.stdout.read()
if z_expected_output in z_query_output:
print("z_query output valid")
else:
print("z_query output invalid:")
print(f'Expected: "{z_expected_output}"')
print(f'Received: "{z_query_output}"')
test_status = 1

print("Check queryable status & output")
# Check the exit status of z_queryable
z_queryable_status = z_queryable_process.returncode
if z_queryable_status == z_expected_status:
print("z_queryable status valid")
# Check errors
errors = process.stderr.read().strip()
if errors == "":
print(f"{binary_name} no error reported")
else:
print(f"z_queryable status invalid, expected: {z_expected_status}," f" received: {z_queryable_status}")
test_status = 1
print(f'{binary_name} errors reported:\n"{errors}"')
return False

# Check output of z_queryable
z_queryable_output = z_queryable_process.stdout.read()
if z_expected_output in z_queryable_output:
print("z_queryable output valid")
# Check exit status
status = process.returncode
if status == expected_status:
print(f"{binary_name} status valid")
else:
print("z_queryable output invalid:")
print(f'Expected: "{z_expected_output}"')
print(f'Received: "{z_queryable_output}"')
test_status = 1
print(f"{binary_name} status invalid, expected: {expected_status}, received: {status}")
return False

return True


def test_all():
binaries = ["z_sub", "z_pub", "z_queryable", "z_get", "z_liveliness", "z_get_liveliness", "z_sub_liveliness"]

all_tests_passed = True
for binary in binaries:
if not run_binary_test(binary, EXPECTED_STATUS, EXPECTED_OUTPUT):
all_tests_passed = False

# Return value
return test_status
return all_tests_passed


if __name__ == "__main__":
EXIT_STATUS = 0

# Test all examples
if test_all() == 1:
# Run all tests
if not test_all():
EXIT_STATUS = 1

# Exit
# Exit with final status
sys.exit(EXIT_STATUS)

0 comments on commit 9396627

Please sign in to comment.