summaryrefslogtreecommitdiff
path: root/blocklist2nft.py
diff options
context:
space:
mode:
Diffstat (limited to 'blocklist2nft.py')
-rw-r--r--blocklist2nft.py130
1 files changed, 0 insertions, 130 deletions
diff --git a/blocklist2nft.py b/blocklist2nft.py
deleted file mode 100644
index fc5fc4a..0000000
--- a/blocklist2nft.py
+++ /dev/null
@@ -1,130 +0,0 @@
-#!/usr/bin/python3
-""" blocklist2nft """
-
-import argparse
-import ipaddress
-import logging
-import re
-import subprocess
-import tempfile
-
-import requests
-
-blocklist_urls= [
- "https://rules.emergingthreats.net/fwrules/emerging-Block-IPs.txt",
- "https://rules.emergingthreats.net/blockrules/compromised-ips.txt",
- "https://iplists.firehol.org/files/firehol_level1.netset"
- ]
-
-NFT_PRIORITY = -2
-
-def nft_commands(v4_list, v6_list, priority):
- """ generate nft table commands """
- return f"""
-add table inet blocklist2nft
-delete table inet blocklist2nft
-add table inet blocklist2nft {{
- set addr-set-drop4 {{
- type ipv4_addr
- flags interval
- elements = {{ {v4_list}
- }}
- }}
- set addr-set-drop6 {{
- type ipv6_addr
- flags interval
- elements = {{
- FEC0::/10, {v6_list}
- }}
- }}
- chain INPUT {{
- type filter hook input priority filter {priority}; policy accept;
- ip saddr @addr-set-drop4 drop
- ip6 saddr @addr-set-drop6 drop
- }}
-}}
- """
-
-ipv4_list = []
-ipv6_list = []
-
-def is_ip_address(a):
- """ is this an IP address block """
- n = ipaddress.ip_network(a)
- return n
-
-def get_address_feed(feed):
- """ download and pack a feed """
- try:
- with requests.get(feed, stream=True, timeout=30) as r:
- r.raise_for_status()
- for r in r.iter_lines(decode_unicode=True):
- if isinstance(r, bytes):
- r = r.decode('utf-8', errors="replace")
- if not re.match(r'^[0-9a-fA-F]', r):
- continue
- net = is_ip_address(r)
- if net is None:
- continue
- if net.version == 4:
- ipv4_list.append(net)
- if net.version == 6:
- print(r)
- ipv6_list.append(net)
- except requests.exceptions.ConnectionError as e:
- log.error(e)
-
-def main():
- """" self-explanatory """
- ipv4_list_str = ""
- ipv6_list_str = ""
-
- try:
- for feed in blocklist_urls:
- get_address_feed(feed)
- for net in ipaddress.collapse_addresses(ipv4_list):
- ipv4_list_str += f"{net}, \n"
- for net in ipaddress.collapse_addresses(ipv6_list):
- ipv6_list_str += f"{net}, \n"
-
- ipv4_list_str = ipv4_list_str[:-3]
- ipv6_list_str = ipv6_list_str[:-3]
-
- with tempfile.NamedTemporaryFile(delete=True) as nft_file:
- nft_contents = nft_commands(ipv4_list_str, ipv6_list_str, args.priority)
- nft_contents_bytes = nft_contents.encode("utf-8")
- nft_file.write(nft_contents_bytes)
-
- nft_result = subprocess.run(
- ["nft", "-f", nft_file.name],
- check=True,
- capture_output=True,
- text=True
- )
- if nft_result.returncode != 0:
- print(f"nft call failed with code {nft_result.returncode}")
- print(nft_result.stderr)
- print(nft_result.stdout)
-
- except Exception as e:
- log.error(e)
-
-
-ap = argparse.ArgumentParser(description="Create an nftables blocklist from a URL data source")
-ap.add_argument("--priority", type=int,
- default=NFT_PRIORITY, help="nft chain hook priority (default -2)")
-ap.add_argument("--debug", action="store_true", help="Enable debug logging")
-args = ap.parse_args()
-
-log = logging.getLogger("blocklist2nft")
-lh = logging.StreamHandler()
-lf = logging.Formatter(fmt="%(levelname)s: %(message)s")
-lh.setFormatter(lf)
-log.addHandler(lh)
-
-log.setLevel(logging.INFO)
-if args.debug:
- log.setLevel(logging.DEBUG)
-
-if __name__ == '__main__':
- main()