#!/usr/bin/python3 """ blocklist2nft """ import argparse import ipaddress import logging import re import subprocess import tempfile import requests blocklist_urls= [ "https://rules.emergingthreats.net/fwrules/emerging-Block-IPs.txt", "https://rules.emergingthreats.net/blockrules/compromised-ips.txt", "https://iplists.firehol.org/files/firehol_level1.netset" ] NFT_PRIORITY = -2 def nft_commands(v4_list, v6_list, priority): """ generate nft table commands """ return f""" add table inet blocklist2nft delete table inet blocklist2nft add table inet blocklist2nft {{ set addr-set-drop4 {{ type ipv4_addr flags interval elements = {{ {v4_list} }} }} set addr-set-drop6 {{ type ipv6_addr flags interval elements = {{ FEC0::/10, {v6_list} }} }} chain INPUT {{ type filter hook input priority filter {priority}; policy accept; ip saddr @addr-set-drop4 drop ip6 saddr @addr-set-drop6 drop }} }} """ ipv4_list = [] ipv6_list = [] def is_ip_address(a): """ is this an IP address block """ n = ipaddress.ip_network(a) return n def get_address_feed(feed): """ download and pack a feed """ try: with requests.get(feed, stream=True, timeout=30) as r: r.raise_for_status() for r in r.iter_lines(decode_unicode=True): if isinstance(r, bytes): r = r.decode('utf-8', errors="replace") if not re.match(r'^[0-9a-fA-F]', r): continue net = is_ip_address(r) if net is None: continue if net.version == 4: ipv4_list.append(net) if net.version == 6: print(r) ipv6_list.append(net) except requests.exceptions.ConnectionError as e: log.error(e) def main(): """" self-explanatory """ ipv4_list_str = "" ipv6_list_str = "" try: for feed in blocklist_urls: get_address_feed(feed) for net in ipaddress.collapse_addresses(ipv4_list): ipv4_list_str += f"{net}, \n" for net in ipaddress.collapse_addresses(ipv6_list): ipv6_list_str += f"{net}, \n" ipv4_list_str = ipv4_list_str[:-3] ipv6_list_str = ipv6_list_str[:-3] with tempfile.NamedTemporaryFile(delete=True) as nft_file: nft_contents = nft_commands(ipv4_list_str, ipv6_list_str, args.priority) nft_contents_bytes = nft_contents.encode("utf-8") nft_file.write(nft_contents_bytes) nft_result = subprocess.run( ["nft", "-f", nft_file.name], check=True, capture_output=True, text=True ) if nft_result.returncode != 0: print(f"nft call failed with code {nft_result.returncode}") print(nft_result.stderr) print(nft_result.stdout) except Exception as e: log.error(e) ap = argparse.ArgumentParser(description="Create an nftables blocklist from a URL data source") ap.add_argument("--priority", type=int, default=NFT_PRIORITY, help="nft chain hook priority (default -2)") ap.add_argument("--debug", action="store_true", help="Enable debug logging") args = ap.parse_args() log = logging.getLogger("blocklist2nft") lh = logging.StreamHandler() lf = logging.Formatter(fmt="%(levelname)s: %(message)s") lh.setFormatter(lf) log.addHandler(lh) log.setLevel(logging.INFO) if args.debug: log.setLevel(logging.DEBUG) if __name__ == '__main__': main()