Skip to content

Commit

Permalink
CS144 Lab 4 assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
keithw committed Oct 15, 2020
1 parent 710f804 commit 5dfca2f
Show file tree
Hide file tree
Showing 51 changed files with 5,472 additions and 11 deletions.
8 changes: 8 additions & 0 deletions apps/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1 +1,9 @@
add_library (stream_copy STATIC bidirectional_stream_copy.cc)

add_sponge_exec (udp_tcpdump ${LIBPCAP})
add_sponge_exec (tcp_native stream_copy)
add_sponge_exec (tun)
add_sponge_exec (tcp_udp stream_copy)
add_sponge_exec (tcp_ipv4 stream_copy)
add_sponge_exec (webget)
add_sponge_exec (tcp_benchmark)
91 changes: 91 additions & 0 deletions apps/bidirectional_stream_copy.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#include "bidirectional_stream_copy.hh"

#include "byte_stream.hh"
#include "eventloop.hh"

#include <algorithm>
#include <iostream>
#include <unistd.h>

using namespace std;

void bidirectional_stream_copy(Socket &socket) {
constexpr size_t max_copy_length = 65536;
constexpr size_t buffer_size = 1048576;

EventLoop _eventloop{};
FileDescriptor _input{STDIN_FILENO};
FileDescriptor _output{STDOUT_FILENO};
ByteStream _outbound{buffer_size};
ByteStream _inbound{buffer_size};
bool _outbound_shutdown{false};
bool _inbound_shutdown{false};

socket.set_blocking(false);
_input.set_blocking(false);
_output.set_blocking(false);

// rule 1: read from stdin into outbound byte stream
_eventloop.add_rule(
_input,
Direction::In,
[&] {
_outbound.write(_input.read(_outbound.remaining_capacity()));
if (_input.eof()) {
_outbound.end_input();
}
},
[&] { return (not _outbound.error()) and (_outbound.remaining_capacity() > 0) and (not _inbound.error()); },
[&] { _outbound.end_input(); });

// rule 2: read from outbound byte stream into socket
_eventloop.add_rule(socket,
Direction::Out,
[&] {
const size_t bytes_to_write = min(max_copy_length, _outbound.buffer_size());
const size_t bytes_written = socket.write(_outbound.peek_output(bytes_to_write), false);
_outbound.pop_output(bytes_written);
if (_outbound.eof()) {
socket.shutdown(SHUT_WR);
_outbound_shutdown = true;
}
},
[&] { return (not _outbound.buffer_empty()) or (_outbound.eof() and not _outbound_shutdown); },
[&] { _outbound.set_error(); });

// rule 3: read from socket into inbound byte stream
_eventloop.add_rule(
socket,
Direction::In,
[&] {
_inbound.write(socket.read(_inbound.remaining_capacity()));
if (socket.eof()) {
_inbound.end_input();
}
},
[&] { return (not _inbound.error()) and (_inbound.remaining_capacity() > 0) and (not _outbound.error()); },
[&] { _inbound.end_input(); });

// rule 4: read from inbound byte stream into stdout
_eventloop.add_rule(_output,
Direction::Out,
[&] {
const size_t bytes_to_write = min(max_copy_length, _inbound.buffer_size());
const size_t bytes_written = _output.write(_inbound.peek_output(bytes_to_write), false);
_inbound.pop_output(bytes_written);

if (_inbound.eof()) {
_output.close();
_inbound_shutdown = true;
}
},
[&] { return (not _inbound.buffer_empty()) or (_inbound.eof() and not _inbound_shutdown); },
[&] { _inbound.set_error(); });

// loop until completion
while (true) {
if (EventLoop::Result::Exit == _eventloop.wait_next_event(-1)) {
return;
}
}
}
9 changes: 9 additions & 0 deletions apps/bidirectional_stream_copy.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#ifndef SPONGE_APPS_BIDIRECTIONAL_STREAM_COPY_HH
#define SPONGE_APPS_BIDIRECTIONAL_STREAM_COPY_HH

#include "socket.hh"

//! Copy socket input/output to stdin/stdout until finished
void bidirectional_stream_copy(Socket &socket);

#endif // SPONGE_APPS_BIDIRECTIONAL_STREAM_COPY_HH
116 changes: 116 additions & 0 deletions apps/tcp_benchmark.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#include "tcp_connection.hh"

#include <chrono>
#include <cstdlib>
#include <iomanip>
#include <iostream>
#include <string>

using namespace std;
using namespace std::chrono;

constexpr size_t len = 100 * 1024 * 1024;

void move_segments(TCPConnection &x, TCPConnection &y, vector<TCPSegment> &segments, const bool reorder) {
while (not x.segments_out().empty()) {
segments.emplace_back(move(x.segments_out().front()));
x.segments_out().pop();
}
if (reorder) {
for (auto it = segments.rbegin(); it != segments.rend(); ++it) {
y.segment_received(move(*it));
}
} else {
for (auto it = segments.begin(); it != segments.end(); ++it) {
y.segment_received(move(*it));
}
}
segments.clear();
}

void main_loop(const bool reorder) {
TCPConfig config;
TCPConnection x{config}, y{config};

string string_to_send(len, 'x');
for (auto &ch : string_to_send) {
ch = rand();
}

Buffer bytes_to_send{string(string_to_send)};
x.connect();
y.end_input_stream();

bool x_closed = false;

string string_received;
string_received.reserve(len);

const auto first_time = high_resolution_clock::now();

auto loop = [&] {
// write input into x
while (bytes_to_send.size() and x.remaining_outbound_capacity()) {
const auto want = min(x.remaining_outbound_capacity(), bytes_to_send.size());
const auto written = x.write(string(bytes_to_send.str().substr(0, want)));
if (want != written) {
throw runtime_error("want = " + to_string(want) + ", written = " + to_string(written));
}
bytes_to_send.remove_prefix(written);
}

if (bytes_to_send.size() == 0 and not x_closed) {
x.end_input_stream();
x_closed = true;
}

// exchange segments between x and y but in reverse order
vector<TCPSegment> segments;
move_segments(x, y, segments, reorder);
move_segments(y, x, segments, false);

// read output from y
const auto available_output = y.inbound_stream().buffer_size();
if (available_output > 0) {
string_received.append(y.inbound_stream().read(available_output));
}

// time passes
x.tick(1000);
y.tick(1000);
};

while (not y.inbound_stream().eof()) {
loop();
}

if (string_received != string_to_send) {
throw runtime_error("strings sent vs. received don't match");
}

const auto final_time = high_resolution_clock::now();

const auto duration = duration_cast<nanoseconds>(final_time - first_time).count();

const auto gigabits_per_second = len * 8.0 / double(duration);

cout << fixed << setprecision(2);
cout << "CPU-limited throughput" << (reorder ? " with reordering: " : " : ") << gigabits_per_second
<< " Gbit/s\n";

while (x.active() or y.active()) {
loop();
}
}

int main() {
try {
main_loop(false);
main_loop(true);
} catch (const exception &e) {
cerr << e.what() << "\n";
return EXIT_FAILURE;
}

return EXIT_SUCCESS;
}
162 changes: 162 additions & 0 deletions apps/tcp_ipv4.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#include "bidirectional_stream_copy.hh"
#include "tcp_config.hh"
#include "tcp_sponge_socket.hh"
#include "tun.hh"

#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <random>
#include <string>
#include <tuple>

using namespace std;

constexpr const char *TUN_DFLT = "tun144";
const string LOCAL_ADDRESS_DFLT = "169.254.144.9";

static void show_usage(const char *argv0, const char *msg) {
cout << "Usage: " << argv0 << " [options] <host> <port>\n\n"
<< " Option Default\n"
<< " -- --\n\n"

<< " -l Server (listen) mode. (client mode)\n"
<< " In server mode, <host>:<port> is the address to bind.\n\n"

<< " -a <addr> Set source address (client mode only) " << LOCAL_ADDRESS_DFLT << "\n"
<< " -s <port> Set source port (client mode only) (random)\n\n"

<< " -w <winsz> Use a window of <winsz> bytes " << TCPConfig::MAX_PAYLOAD_SIZE
<< "\n\n"

<< " -t <tmout> Set rt_timeout to tmout " << TCPConfig::TIMEOUT_DFLT << "\n\n"

<< " -d <tundev> Connect to tun <tundev> " << TUN_DFLT << "\n\n"

<< " -Lu <loss> Set uplink loss to <rate> (float in 0..1) (no loss)\n"
<< " -Ld <loss> Set downlink loss to <rate> (float in 0..1) (no loss)\n\n"

<< " -h Show this message.\n\n";

if (msg != nullptr) {
cout << msg;
}
cout << endl;
}

static void check_argc(int argc, char **argv, int curr, const char *err) {
if (curr + 3 >= argc) {
show_usage(argv[0], err);
exit(1);
}
}

static tuple<TCPConfig, FdAdapterConfig, bool, char *> get_config(int argc, char **argv) {
TCPConfig c_fsm{};
FdAdapterConfig c_filt{};
char *tundev = nullptr;

int curr = 1;
bool listen = false;

string source_address = LOCAL_ADDRESS_DFLT;
string source_port = to_string(uint16_t(random_device()()));

while (argc - curr > 2) {
if (strncmp("-l", argv[curr], 3) == 0) {
listen = true;
curr += 1;

} else if (strncmp("-a", argv[curr], 3) == 0) {
check_argc(argc, argv, curr, "ERROR: -a requires one argument.");
source_address = argv[curr + 1];
curr += 2;

} else if (strncmp("-s", argv[curr], 3) == 0) {
check_argc(argc, argv, curr, "ERROR: -s requires one argument.");
source_port = argv[curr + 1];
curr += 2;

} else if (strncmp("-w", argv[curr], 3) == 0) {
check_argc(argc, argv, curr, "ERROR: -w requires one argument.");
c_fsm.recv_capacity = strtol(argv[curr + 1], nullptr, 0);
curr += 2;

} else if (strncmp("-t", argv[curr], 3) == 0) {
check_argc(argc, argv, curr, "ERROR: -t requires one argument.");
c_fsm.rt_timeout = strtol(argv[curr + 1], nullptr, 0);
curr += 2;

} else if (strncmp("-d", argv[curr], 3) == 0) {
check_argc(argc, argv, curr, "ERROR: -t requires one argument.");
tundev = argv[curr + 1];
curr += 2;

} else if (strncmp("-Lu", argv[curr], 3) == 0) {
check_argc(argc, argv, curr, "ERROR: -Lu requires one argument.");
float lossrate = strtof(argv[curr + 1], nullptr);
using LossRateUpT = decltype(c_filt.loss_rate_up);
c_filt.loss_rate_up =
static_cast<LossRateUpT>(static_cast<float>(numeric_limits<LossRateUpT>::max()) * lossrate);
curr += 2;

} else if (strncmp("-Ld", argv[curr], 3) == 0) {
check_argc(argc, argv, curr, "ERROR: -Lu requires one argument.");
float lossrate = strtof(argv[curr + 1], nullptr);
using LossRateDnT = decltype(c_filt.loss_rate_dn);
c_filt.loss_rate_dn =
static_cast<LossRateDnT>(static_cast<float>(numeric_limits<LossRateDnT>::max()) * lossrate);
curr += 2;

} else if (strncmp("-h", argv[curr], 3) == 0) {
show_usage(argv[0], nullptr);
exit(0);

} else {
show_usage(argv[0], string("ERROR: unrecognized option " + string(argv[curr])).c_str());
exit(1);
}
}

// parse positional command-line arguments
if (listen) {
c_filt.source = {"0", argv[curr + 1]};
if (c_filt.source.port() == 0) {
show_usage(argv[0], "ERROR: listen port cannot be zero in server mode.");
exit(1);
}
} else {
c_filt.destination = {argv[curr], argv[curr + 1]};
c_filt.source = {source_address, source_port};
}

return make_tuple(c_fsm, c_filt, listen, tundev);
}

int main(int argc, char **argv) {
try {
if (argc < 3) {
show_usage(argv[0], "ERROR: required arguments are missing.");
return EXIT_FAILURE;
}

auto [c_fsm, c_filt, listen, tun_dev_name] = get_config(argc, argv);
LossyTCPOverIPv4SpongeSocket tcp_socket(LossyTCPOverIPv4OverTunFdAdapter(
TCPOverIPv4OverTunFdAdapter(TunFD(tun_dev_name == nullptr ? TUN_DFLT : tun_dev_name))));

if (listen) {
tcp_socket.listen_and_accept(c_fsm, c_filt);
} else {
tcp_socket.connect(c_fsm, c_filt);
}

bidirectional_stream_copy(tcp_socket);
tcp_socket.wait_until_closed();
} catch (const exception &e) {
cerr << "Exception: " << e.what() << endl;
return EXIT_FAILURE;
}

return EXIT_SUCCESS;
}
Loading

0 comments on commit 5dfca2f

Please sign in to comment.