diff options
| author | Jason McCormick <jason@mfamily.org> | 2026-01-20 19:05:46 -0500 |
|---|---|---|
| committer | Jason McCormick <jason@mfamily.org> | 2026-01-20 19:05:46 -0500 |
| commit | 0b090aa51de563049f6108c17b6b063cac5c15c6 (patch) | |
| tree | 980d15669782777fbf91de82e6391a4e031a2769 /blocklist2nft.py | |
added
Diffstat (limited to 'blocklist2nft.py')
| -rw-r--r-- | blocklist2nft.py | 130 |
1 files changed, 130 insertions, 0 deletions
diff --git a/blocklist2nft.py b/blocklist2nft.py new file mode 100644 index 0000000..fc5fc4a --- /dev/null +++ b/blocklist2nft.py @@ -0,0 +1,130 @@ +#!/usr/bin/python3 +""" blocklist2nft """ + +import argparse +import ipaddress +import logging +import re +import subprocess +import tempfile + +import requests + +blocklist_urls= [ + "https://rules.emergingthreats.net/fwrules/emerging-Block-IPs.txt", + "https://rules.emergingthreats.net/blockrules/compromised-ips.txt", + "https://iplists.firehol.org/files/firehol_level1.netset" + ] + +NFT_PRIORITY = -2 + +def nft_commands(v4_list, v6_list, priority): + """ generate nft table commands """ + return f""" +add table inet blocklist2nft +delete table inet blocklist2nft +add table inet blocklist2nft {{ + set addr-set-drop4 {{ + type ipv4_addr + flags interval + elements = {{ {v4_list} + }} + }} + set addr-set-drop6 {{ + type ipv6_addr + flags interval + elements = {{ + FEC0::/10, {v6_list} + }} + }} + chain INPUT {{ + type filter hook input priority filter {priority}; policy accept; + ip saddr @addr-set-drop4 drop + ip6 saddr @addr-set-drop6 drop + }} +}} + """ + +ipv4_list = [] +ipv6_list = [] + +def is_ip_address(a): + """ is this an IP address block """ + n = ipaddress.ip_network(a) + return n + +def get_address_feed(feed): + """ download and pack a feed """ + try: + with requests.get(feed, stream=True, timeout=30) as r: + r.raise_for_status() + for r in r.iter_lines(decode_unicode=True): + if isinstance(r, bytes): + r = r.decode('utf-8', errors="replace") + if not re.match(r'^[0-9a-fA-F]', r): + continue + net = is_ip_address(r) + if net is None: + continue + if net.version == 4: + ipv4_list.append(net) + if net.version == 6: + print(r) + ipv6_list.append(net) + except requests.exceptions.ConnectionError as e: + log.error(e) + +def main(): + """" self-explanatory """ + ipv4_list_str = "" + ipv6_list_str = "" + + try: + for feed in blocklist_urls: + get_address_feed(feed) + for net in ipaddress.collapse_addresses(ipv4_list): + ipv4_list_str += f"{net}, \n" + for net in ipaddress.collapse_addresses(ipv6_list): + ipv6_list_str += f"{net}, \n" + + ipv4_list_str = ipv4_list_str[:-3] + ipv6_list_str = ipv6_list_str[:-3] + + with tempfile.NamedTemporaryFile(delete=True) as nft_file: + nft_contents = nft_commands(ipv4_list_str, ipv6_list_str, args.priority) + nft_contents_bytes = nft_contents.encode("utf-8") + nft_file.write(nft_contents_bytes) + + nft_result = subprocess.run( + ["nft", "-f", nft_file.name], + check=True, + capture_output=True, + text=True + ) + if nft_result.returncode != 0: + print(f"nft call failed with code {nft_result.returncode}") + print(nft_result.stderr) + print(nft_result.stdout) + + except Exception as e: + log.error(e) + + +ap = argparse.ArgumentParser(description="Create an nftables blocklist from a URL data source") +ap.add_argument("--priority", type=int, + default=NFT_PRIORITY, help="nft chain hook priority (default -2)") +ap.add_argument("--debug", action="store_true", help="Enable debug logging") +args = ap.parse_args() + +log = logging.getLogger("blocklist2nft") +lh = logging.StreamHandler() +lf = logging.Formatter(fmt="%(levelname)s: %(message)s") +lh.setFormatter(lf) +log.addHandler(lh) + +log.setLevel(logging.INFO) +if args.debug: + log.setLevel(logging.DEBUG) + +if __name__ == '__main__': + main() |
