Skip to content

Commit

Permalink
styling
Browse files Browse the repository at this point in the history
  • Loading branch information
joshschmelzle committed Dec 4, 2024
1 parent b80d482 commit f651887
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 18 deletions.
2 changes: 1 addition & 1 deletion ctx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@

""" a Wi-Fi client capability ctx for the WLAN Pi. """

__version__ = "1.0.1"
__version__ = "1.0.1"
20 changes: 13 additions & 7 deletions ctx/fakeap.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
import signal
import sys
from time import sleep
from typing import Dict

# suppress scapy warnings
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)

# third party imports
try:
from scapy.all import LLC, SNAP, Dot11, Dot11QoS, RadioTap, Raw, Scapy_Exception
from scapy.all import conf as scapyconf # type: ignore
from scapy.all import get_if_hwaddr
from scapy.arch.unix import get_if_raw_hwaddr
from scapy.all import RadioTap, Dot11, Dot11QoS, LLC, SNAP, Raw, Scapy_Exception, conf as scapyconf, get_if_hwaddr # type: ignore
except ModuleNotFoundError as error:
if error.name == "scapy":
print("required module scapy not found.")
Expand All @@ -41,16 +42,19 @@
DOT11_SUBTYPE_DATA = 0x00
DOT11_SUBTYPE_QOS_DATA = 0x08


class TxData(multiprocessing.Process):
"""Handle Tx of fake AP frames"""

def __init__(self, config):
super(TxData, self).__init__()
self.log = logging.getLogger(inspect.stack()[0][1].split("/")[-1])
self.log.debug("ctx pid: %s; parent pid: %s", os.getpid(), os.getppid())
self.log.debug("config passed to ctx: %s", config)
if not isinstance(config, dict):
raise ValueError("configuration received in the TxData process is not a dictionary")
raise ValueError(
"configuration received in the TxData process is not a dictionary"
)
self.config = config
self.interface: "str" = config.get("GENERAL").get("interface")

Expand Down Expand Up @@ -102,7 +106,7 @@ def __init__(self, config):
addr2=self.mac,
addr3=self.mac,
)

self.data_frame = RadioTap() / dot11 / Dot11QoS() / LLC() / SNAP()

self.log.info("starting QoS data frame transmissions")
Expand All @@ -120,7 +124,7 @@ def generate_random_data(self, min=64, max=512):
"""Generate random payload data."""
length = random.randint(min, max)
return os.urandom(length)

def every(self, interval, task) -> None:
"""Attempt to address beacon drift"""
while True:
Expand All @@ -129,7 +133,9 @@ def every(self, interval, task) -> None:

def tx_data(self) -> None:
"""Update and Tx QoS Data Frame"""
payload = self.generate_random_data(min=self.tx_payload_min, max=self.tx_payload_max)
payload = self.generate_random_data(
min=self.tx_payload_min, max=self.tx_payload_max
)
data_frame = self.data_frame / Raw(load=payload)
try:
self.l2socket.send(data_frame) # type: ignore
Expand Down
23 changes: 14 additions & 9 deletions ctx/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from dataclasses import dataclass
from typing import Any, Dict


__tools = [
"tcpdump",
"iw",
Expand Down Expand Up @@ -88,6 +87,7 @@ def channel(value: str) -> int:
return ch
raise argparse.ArgumentTypeError(f"{ch} is not a valid channel")


def interval(value: str) -> float:
"""Check if the given string can be converted to a float."""
interval = float(value)
Expand All @@ -96,13 +96,15 @@ def interval(value: str) -> float:
except ValueError:
raise argparse.ArgumentTypeError(f"{interval} is not a valid interval")


def payload_size(value: str) -> int:
"""Check if the value is an integer and between 1 and 4096."""
size = int(value)
if isinstance(size, int) and 1 <= size <= 4096:
return int(size)
raise argparse.ArgumentTypeError(f"{size} is not an integer between 1 and 4096")


def frequency(freq: str) -> int:
"""Check if the provided frequency is valid"""
try:
Expand All @@ -117,7 +119,9 @@ def frequency(freq: str) -> int:
if band[0] <= freq <= band[1]:
return freq

raise argparse.ArgumentTypeError(f"{freq} not found in these frequency ranges: {freq_ranges}")
raise argparse.ArgumentTypeError(
f"{freq} not found in these frequency ranges: {freq_ranges}"
)


def setup_parser() -> argparse.ArgumentParser:
Expand Down Expand Up @@ -269,13 +273,13 @@ def setup_config(args):

if "interface" not in config["GENERAL"]:
config["GENERAL"]["interface"] = "wlan0"

if "tx_interval" not in config["GENERAL"]:
config["GENERAL"]["tx_interval"] = 0.001

if "tx_payload_max" not in config["GENERAL"]:
config["GENERAL"]["tx_payload_max"] = 512

if "tx_payload_min" not in config["GENERAL"]:
config["GENERAL"]["tx_payload_min"] = 64

Expand Down Expand Up @@ -373,7 +377,7 @@ def validate(config) -> bool:
if freq:
log.debug("validating config for freq...")
frequency(freq)

intv = config.get("GENERAL").get("tx_interval")
if intv:
log.debug("validating config for tx_interval...")
Expand All @@ -383,12 +387,12 @@ def validate(config) -> bool:
if tx_payload_max:
log.debug("validating config for tx_payload_max...")
payload_size(tx_payload_max)

tx_payload_min = config.get("GENERAL").get("tx_payload_min")
if tx_payload_min:
log.debug("validating config for tx_payload_min...")
payload_size(tx_payload_min)

except ValueError:
log.error("%s", sys.exc_info())
sys.exit(signal.SIGABRT)
Expand Down Expand Up @@ -447,6 +451,7 @@ def get_frequency_bytes(channel: int) -> bytes:

class Base64Encoder(json.JSONEncoder):
"""A Base64 encoder for JSON"""

# example usage: json.dumps(bytes(frame), cls=Base64Encoder)

# pylint: disable=method-hidden
Expand All @@ -473,4 +478,4 @@ def generate_run_message(config: Dict) -> None:
f" - Payload is os.urandom(length) where length is a random integer between {config['GENERAL']['tx_payload_min']} and {config['GENERAL']['tx_payload_max']}"
)
print("#/~>")
print()
print()
2 changes: 2 additions & 0 deletions ctx/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .constants import _20MHZ_FREQUENCY_CHANNEL_MAP
from .helpers import run_command


def flag_last_object(seq):
"""Treat the last object in an iterable differently"""
seq = iter(seq) # ensure seq is an iterator
Expand All @@ -32,6 +33,7 @@ def flag_last_object(seq):
_a = _b
yield _a, True


class InterfaceError(Exception):
"""Custom exception used when there are problems staging the interface for injection"""

Expand Down
4 changes: 3 additions & 1 deletion ctx/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ def start(args: argparse.Namespace):
txdata = mp.Process(
name="txdata",
target=TxData,
args=(config,), # What this looks like with one element because this param needs to be a tuple: args=(config,),
args=(
config,
), # What this looks like with one element because this param needs to be a tuple: args=(config,),
)
running_processes.append(txdata)
txdata.start()
Expand Down

0 comments on commit f651887

Please sign in to comment.