From 2344881229dfa0f00ebf112653e1d48bb99999b1 Mon Sep 17 00:00:00 2001 From: "Jason D. McCormick" Date: Tue, 20 Jan 2026 19:16:02 -0500 Subject: initial add --- blocklist2nft.py | 130 ------------------------------------------------------- 1 file changed, 130 deletions(-) delete mode 100644 blocklist2nft.py (limited to 'blocklist2nft.py') diff --git a/blocklist2nft.py b/blocklist2nft.py deleted file mode 100644 index fc5fc4a..0000000 --- a/blocklist2nft.py +++ /dev/null @@ -1,130 +0,0 @@ -#!/usr/bin/python3 -""" blocklist2nft """ - -import argparse -import ipaddress -import logging -import re -import subprocess -import tempfile - -import requests - -blocklist_urls= [ - "https://rules.emergingthreats.net/fwrules/emerging-Block-IPs.txt", - "https://rules.emergingthreats.net/blockrules/compromised-ips.txt", - "https://iplists.firehol.org/files/firehol_level1.netset" - ] - -NFT_PRIORITY = -2 - -def nft_commands(v4_list, v6_list, priority): - """ generate nft table commands """ - return f""" -add table inet blocklist2nft -delete table inet blocklist2nft -add table inet blocklist2nft {{ - set addr-set-drop4 {{ - type ipv4_addr - flags interval - elements = {{ {v4_list} - }} - }} - set addr-set-drop6 {{ - type ipv6_addr - flags interval - elements = {{ - FEC0::/10, {v6_list} - }} - }} - chain INPUT {{ - type filter hook input priority filter {priority}; policy accept; - ip saddr @addr-set-drop4 drop - ip6 saddr @addr-set-drop6 drop - }} -}} - """ - -ipv4_list = [] -ipv6_list = [] - -def is_ip_address(a): - """ is this an IP address block """ - n = ipaddress.ip_network(a) - return n - -def get_address_feed(feed): - """ download and pack a feed """ - try: - with requests.get(feed, stream=True, timeout=30) as r: - r.raise_for_status() - for r in r.iter_lines(decode_unicode=True): - if isinstance(r, bytes): - r = r.decode('utf-8', errors="replace") - if not re.match(r'^[0-9a-fA-F]', r): - continue - net = is_ip_address(r) - if net is None: - continue - if net.version == 4: - ipv4_list.append(net) - if net.version == 6: - print(r) - ipv6_list.append(net) - except requests.exceptions.ConnectionError as e: - log.error(e) - -def main(): - """" self-explanatory """ - ipv4_list_str = "" - ipv6_list_str = "" - - try: - for feed in blocklist_urls: - get_address_feed(feed) - for net in ipaddress.collapse_addresses(ipv4_list): - ipv4_list_str += f"{net}, \n" - for net in ipaddress.collapse_addresses(ipv6_list): - ipv6_list_str += f"{net}, \n" - - ipv4_list_str = ipv4_list_str[:-3] - ipv6_list_str = ipv6_list_str[:-3] - - with tempfile.NamedTemporaryFile(delete=True) as nft_file: - nft_contents = nft_commands(ipv4_list_str, ipv6_list_str, args.priority) - nft_contents_bytes = nft_contents.encode("utf-8") - nft_file.write(nft_contents_bytes) - - nft_result = subprocess.run( - ["nft", "-f", nft_file.name], - check=True, - capture_output=True, - text=True - ) - if nft_result.returncode != 0: - print(f"nft call failed with code {nft_result.returncode}") - print(nft_result.stderr) - print(nft_result.stdout) - - except Exception as e: - log.error(e) - - -ap = argparse.ArgumentParser(description="Create an nftables blocklist from a URL data source") -ap.add_argument("--priority", type=int, - default=NFT_PRIORITY, help="nft chain hook priority (default -2)") -ap.add_argument("--debug", action="store_true", help="Enable debug logging") -args = ap.parse_args() - -log = logging.getLogger("blocklist2nft") -lh = logging.StreamHandler() -lf = logging.Formatter(fmt="%(levelname)s: %(message)s") -lh.setFormatter(lf) -log.addHandler(lh) - -log.setLevel(logging.INFO) -if args.debug: - log.setLevel(logging.DEBUG) - -if __name__ == '__main__': - main() -- cgit v1.2.3