Skip to content

Commit

Permalink
add test for iptables logging
Browse files Browse the repository at this point in the history
  • Loading branch information
blotus committed Sep 23, 2024
1 parent ecd74f9 commit 5569514
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 1 deletion.
15 changes: 15 additions & 0 deletions test/backends/iptables/crowdsec-firewall-bouncer-logging.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
mode: iptables
update_frequency: 0.1s
log_mode: stdout
log_dir: ./
log_level: info
api_url: http://127.0.0.1:8081/
api_key: 1237adaf7a1724ac68a3288828820a67
disable_ipv6: false
deny_action: DROP
deny_log: true
deny_log_prefix: "blocked by crowdsec"
supported_decisions_types:
- ban
iptables_chains:
- INPUT
50 changes: 49 additions & 1 deletion test/backends/iptables/test_iptables.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@
from time import sleep

from test.backends.mock_lapi import MockLAPI
from test.backends.utils import generate_n_decisions, run_cmd
from test.backends.utils import generate_n_decisions, run_cmd, new_decision


SCRIPT_DIR = Path(os.path.dirname(os.path.realpath(__file__)))
PROJECT_ROOT = SCRIPT_DIR.parent.parent.parent
BINARY_PATH = PROJECT_ROOT.joinpath("crowdsec-firewall-bouncer")
CONFIG_PATH = SCRIPT_DIR.joinpath("crowdsec-firewall-bouncer.yaml")
CONFIG_PATH_LOGGING = SCRIPT_DIR.joinpath("crowdsec-firewall-bouncer-logging.yaml")

SET_NAME_IPV4 = "crowdsec-blacklists-0"
SET_NAME_IPV6 = "crowdsec6-blacklists-0"

RULES_CHAIN_NAME = "CROWDSEC_CHAIN"
LOGGING_CHAIN_NAME = "CROWDSEC_LOG"
CHAIN_NAME = "INPUT"

class TestIPTables(unittest.TestCase):
Expand Down Expand Up @@ -175,3 +177,49 @@ def get_set_elements(set_name, with_timeout=False):
to_add = member.find("elem").text
elements.add(to_add)
return elements


class TestIPTablesLogging(unittest.TestCase):
def setUp(self):
self.fb = subprocess.Popen([BINARY_PATH, "-c", CONFIG_PATH])
self.lapi = MockLAPI()
self.lapi.start()
return super().setUp()

def tearDown(self):
self.fb.kill()
self.fb.wait()
self.lapi.stop()

def testLogging(self):
#We use 1.1.1.1 because we want to see some dropped packets in the logs
#We know this IP responds to ping, and the response will be dropped by the firewall
d = new_decision("1.1.1.1")
self.lapi.ds.insert_decisions([d])
sleep(3)

#Check if our logging chain is in place

output = run_cmd("iptables", "-L", LOGGING_CHAIN_NAME)
rules = [line for line in output.split("\n") if LOGGING_CHAIN_NAME in line]

#2 rules: one logging, one generic drop
self.assertEqual(len(rules), 2)

#Check if the logging chain is called from the main chain

output = run_cmd("iptables", "-L", CHAIN_NAME)

rules = [line for line in output.split("\n") if LOGGING_CHAIN_NAME in line]

self.assertEqual(len(rules), 1)

#Now, try to ping the IP

run_cmd("ping", "-c", "1", "1.1.1.1") #We don't care about the output, we just want to trigger the rule

#Check if the firewall has logged the dropped response

output = run_cmd("dmesg")

assert 'blocked by crowdsec' in output
10 changes: 10 additions & 0 deletions test/backends/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,13 @@ def generate_n_decisions(n: int, action="ban", dup_count=0, ipv4=True, duration=
decisions += decisions[: n % unique_decision_count]
decisions *= n // unique_decision_count
return decisions

def new_decision(ip: str):
return {
"value": ip,
"scope": "ip",
"type": "ban",
"origin": "script",
"duration": "4h",
"reason": "for testing",
}

0 comments on commit 5569514

Please sign in to comment.