import paramiko, os, dns.resolver import contextvars import argparse router_ip = os.getenv("ROUTER_ADDRESS", "192.168.0.1") router_username = os.getenv("ROUTER_USER", "admin") router_password = os.getenv("ROUTER_PASSWORD") interface_name = os.getenv("INTERFACE_NAME") auto_vpn_route_comment = contextvars.ContextVar('auto_vpn_route_comment') ssh_client = contextvars.ContextVar('ssh_client') def get_ips_for_domain_name(domain_name): resolver = dns.resolver.Resolver() result = resolver.resolve(domain_name, 'A') return [ip.to_text() for ip in result] def exec_command(command): # Run command. ssh_stdin, ssh_stdout, ssh_stderr = ssh_client.get().exec_command(command) ssh_stdin.close() # убираем лишние переносы строки из вывода clean_lines = [line.strip() for line in ssh_stdout.readlines()] return clean_lines def get_current_settings(): return exec_command("more running-config") def get_current_rules(): output = get_current_settings() return [rule for rule in output if auto_vpn_route_comment.get() in rule] def get_all_rules_by_interface(): output = get_current_settings() return [rule for rule in output if interface_name in rule] def add_vpn_route(ip): if not ip == "127.0.0.1": command = f"ip route {ip} {interface_name} auto {auto_vpn_route_comment.get()}" output = "".join(exec_command(command)).strip() return output else: return False def delete_vpn_route(ip): command = f"no ip route {ip} {interface_name} {auto_vpn_route_comment.get()}" output = "".join(exec_command(command)).strip() return output def main(): parser = argparse.ArgumentParser( prog = 'Keenetic auto vpn', description = '', epilog = '') parser.add_argument('-i', '--ip-list') parser.add_argument('-d', '--drop', help='Whether drop or not old rules by that tag', action='store_true') parser.add_argument('-D', '--drop-all', help='Drops ALL current custom routes to given interface') args = parser.parse_args() print(type(args.ip_list)) ssh = paramiko.SSHClient() # Load SSH host keys. ssh.load_system_host_keys() # Add SSH host key automatically if needed. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # Connect to router using username/password authentication. ssh.connect(router_ip, username=router_username, password=router_password, look_for_keys=False ) ssh_client.set(ssh) print(get_all_rules_by_interface()) if args.drop: for line in get_current_rules(): ip = line.split(" ")[2] print(delete_vpn_route(ip)) if args.ip_list: print("processing ips list") if os.path.exists(args.ip_list): print(f"using file: {args.ip_list}") with open(args.ip_list) as f: auto_vpn_route_comment.set(f"!{os.path.basename(args.ip_list)}") for line in f.readlines(): print(line.strip()) print(add_vpn_route(line.strip())) print(os.path.basename(args.ip_list)) else: print("ip list file not found") else: print("processing domains list") if os.path.exists('domains.txt'): with open('domains.txt') as f: auto_vpn_route_comment.set("!auto-vpn") for line in f.readlines(): print(line.strip() + ":") try: ips_list = get_ips_for_domain_name(line.strip()) except dns.resolver.NoAnswer: pass print(ips_list) for ip in ips_list: print(add_vpn_route(ip)) else: print("Please, create 'domains.txt' file filled with list of domain names to resolve") # print("Getting current auto-vpn routes:") # print(get_current_rules()) # print() # Close connection. ssh.close() if __name__ == '__main__': main()