diff options
| -rw-r--r-- | blocklist2nft.py | 130 | ||||
| -rw-r--r-- | blocklist2nft.service | 7 | ||||
| -rw-r--r-- | blocklist2nft.timer | 9 |
3 files changed, 146 insertions, 0 deletions
diff --git a/blocklist2nft.py b/blocklist2nft.py new file mode 100644 index 0000000..fc5fc4a --- /dev/null +++ b/blocklist2nft.py @@ -0,0 +1,130 @@ +#!/usr/bin/python3 +""" blocklist2nft """ + +import argparse +import ipaddress +import logging +import re +import subprocess +import tempfile + +import requests + +blocklist_urls= [ + "https://rules.emergingthreats.net/fwrules/emerging-Block-IPs.txt", + "https://rules.emergingthreats.net/blockrules/compromised-ips.txt", + "https://iplists.firehol.org/files/firehol_level1.netset" + ] + +NFT_PRIORITY = -2 + +def nft_commands(v4_list, v6_list, priority): + """ generate nft table commands """ + return f""" +add table inet blocklist2nft +delete table inet blocklist2nft +add table inet blocklist2nft {{ + set addr-set-drop4 {{ + type ipv4_addr + flags interval + elements = {{ {v4_list} + }} + }} + set addr-set-drop6 {{ + type ipv6_addr + flags interval + elements = {{ + FEC0::/10, {v6_list} + }} + }} + chain INPUT {{ + type filter hook input priority filter {priority}; policy accept; + ip saddr @addr-set-drop4 drop + ip6 saddr @addr-set-drop6 drop + }} +}} + """ + +ipv4_list = [] +ipv6_list = [] + +def is_ip_address(a): + """ is this an IP address block """ + n = ipaddress.ip_network(a) + return n + +def get_address_feed(feed): + """ download and pack a feed """ + try: + with requests.get(feed, stream=True, timeout=30) as r: + r.raise_for_status() + for r in r.iter_lines(decode_unicode=True): + if isinstance(r, bytes): + r = r.decode('utf-8', errors="replace") + if not re.match(r'^[0-9a-fA-F]', r): + continue + net = is_ip_address(r) + if net is None: + continue + if net.version == 4: + ipv4_list.append(net) + if net.version == 6: + print(r) + ipv6_list.append(net) + except requests.exceptions.ConnectionError as e: + log.error(e) + +def main(): + """" self-explanatory """ + ipv4_list_str = "" + ipv6_list_str = "" + + try: + for feed in blocklist_urls: + get_address_feed(feed) + for net in ipaddress.collapse_addresses(ipv4_list): + ipv4_list_str += f"{net}, \n" + for net in ipaddress.collapse_addresses(ipv6_list): + ipv6_list_str += f"{net}, \n" + + ipv4_list_str = ipv4_list_str[:-3] + ipv6_list_str = ipv6_list_str[:-3] + + with tempfile.NamedTemporaryFile(delete=True) as nft_file: + nft_contents = nft_commands(ipv4_list_str, ipv6_list_str, args.priority) + nft_contents_bytes = nft_contents.encode("utf-8") + nft_file.write(nft_contents_bytes) + + nft_result = subprocess.run( + ["nft", "-f", nft_file.name], + check=True, + capture_output=True, + text=True + ) + if nft_result.returncode != 0: + print(f"nft call failed with code {nft_result.returncode}") + print(nft_result.stderr) + print(nft_result.stdout) + + except Exception as e: + log.error(e) + + +ap = argparse.ArgumentParser(description="Create an nftables blocklist from a URL data source") +ap.add_argument("--priority", type=int, + default=NFT_PRIORITY, help="nft chain hook priority (default -2)") +ap.add_argument("--debug", action="store_true", help="Enable debug logging") +args = ap.parse_args() + +log = logging.getLogger("blocklist2nft") +lh = logging.StreamHandler() +lf = logging.Formatter(fmt="%(levelname)s: %(message)s") +lh.setFormatter(lf) +log.addHandler(lh) + +log.setLevel(logging.INFO) +if args.debug: + log.setLevel(logging.DEBUG) + +if __name__ == '__main__': + main() diff --git a/blocklist2nft.service b/blocklist2nft.service new file mode 100644 index 0000000..3cdc130 --- /dev/null +++ b/blocklist2nft.service @@ -0,0 +1,7 @@ +[Unit] +Description=Generate nftable rules from blocklists +Requires=network.target + +[Service] +Type=oneshot +ExecStart=/usr/bin/blocklist2nft
\ No newline at end of file diff --git a/blocklist2nft.timer b/blocklist2nft.timer new file mode 100644 index 0000000..162b82f --- /dev/null +++ b/blocklist2nft.timer @@ -0,0 +1,9 @@ +[Unit] +Description=Email pending admin tasks + +[Timer] +OnBootSec=30 +OnUnitInactiveSec=1h + +[Install] +WantedBy=timers.target
\ No newline at end of file |
