#!/usr/bin/python3 import os import sys import json import requests from datetime import datetime from OpenSSL import crypto ca_cert = 'ca_cert.pem' openvpn_pair = 'openvpn_pair.pem' client_cert = 'client_cert.pem' client_key = 'client_key.pem' providers_info = 'providers.json' providers = [ { 'name': 'riseup', 'domain_url': 'https://riseup.net', 'provider_path': '/provider.json', 'configs_path': '/1/configs.json' } ] # gateways = [ # { # "provider": "riseup", # "ip_address": "37.218.241.7", # "host": "cisne.riseup.net", # "location": { # "name": "Miami", # "country_code": "US" # }, # "protocols": [ # "tcp" # ], # "ports": [ # "1194" # ] # } # ] gateways = [] # openvpn_configurations = { # "riseup": { # "auth": "SHA1", # "cipher": "AES-128-CBC", # "keepalive": "10 30", # "tls-cipher": "DHE-RSA-AES128-SHA", # "tun-ipv6": True # } # } openvpn_configurations = {} # https://web.archive.org/web/20191001225633/http://www.zedwood.com/article/python-openssl-x509-parse-certificate def format_subject_issuer(x509Issuer): items = [] for item in x509Issuer.get_components(): items.append(str(item[1], 'utf-8')) return ', '.join(items) def format_asn1_date(d): return datetime.strptime(d.decode('ascii'), '%Y%m%d%H%M%SZ').strftime('%Y-%m-%d') def is_ca_cert_fingerprint_valid(x509_sha256, expected_ca_cert_fingerprint): sha256_prefix = 'SHA256: ' expected_ca_cert_fingerprint = expected_ca_cert_fingerprint.replace(sha256_prefix, '') x509_sha256_formatted = x509_sha256.replace(sha256_prefix, '').replace(':', '').lower() return x509_sha256_formatted == expected_ca_cert_fingerprint def is_provider_info_valid(provider_timestamp): datetime_format = '%Y-%m-%d %H:%M:%S.%f' datetime_now = str(datetime.now()) diff_in_days = (datetime.strptime(datetime_now, datetime_format) - datetime.strptime(provider_timestamp, datetime_format)).days if diff_in_days <= 1: return True else: return False def days_until_from_now(date): return (datetime.strptime(date, '%Y-%m-%d') - datetime.strptime(str(datetime.now()), '%Y-%m-%d %H:%M:%S.%f')).days def update_provider_info(provider, data): provider['api_uri'] = data['api_uri'] provider['api_version'] = data['api_version'] provider['ca_cert_fingerprint'] = data['ca_cert_fingerprint'] provider['ca_cert_uri'] = data['ca_cert_uri'] def fetch_and_save_ca_cert(ca_cert_uri, ca_cert_path): # Fetching for Calyx throws # [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate # verify=False for now if ca_cert_uri == 'https://calyx.net/ca.crt': receive = requests.get(ca_cert_uri, verify=False) else: receive = requests.get(ca_cert_uri, verify=False) with open(ca_cert_path, 'wb') as f: f.write(receive.content) def validate_cert_fingerprint(expected_ca_cert_fingerprint, ca_cert_path): x509 = crypto.load_certificate(crypto.FILETYPE_PEM, open(ca_cert_path).read()) x509_sha256_fingerprint = str(x509.digest('sha256'), 'utf-8') print('CA certificate issuer: {}'.format(format_subject_issuer(x509.get_issuer()))) print('CA certificate is valid from {} to {}'.format( format_asn1_date(x509.get_notBefore()), format_asn1_date(x509.get_notAfter()) )) return is_ca_cert_fingerprint_valid(x509_sha256_fingerprint, expected_ca_cert_fingerprint) def fetch_and_save_provider_info(providers, providers_info): for provider in providers: shouldVerify = None if provider['name'] == 'calyx': # Fetching for Calyx throws # [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate # verify=False for now shouldVerify = False else: shouldVerify = True receive = requests.get('{}{}'.format( provider['domain_url'], provider['provider_path'] ), verify=shouldVerify) data = receive.json() update_provider_info(provider, data) providers_dict = {} providers_dict['provider_info_last_checked'] = str(datetime.now()) providers_dict['providers'] = providers with open(providers_info, 'w') as f: providers_json = json.dumps(providers_dict) f.write(providers_json) def update_gateways(gateways, data, provider_name): for i, data_gateway in enumerate(data['gateways']): protocols = [] ports = [] for transport in data_gateway['capabilities']['transport']: if transport == 'openvpn': protocols = data_gateway['capabilities']['protocols'] ports = data_gateway['capabilities']['ports'] elif transport['type'] == 'openvpn': protocols = transport['protocols'] ports = transport['ports'] location_name = '' location_country_code = '' # Riseup specific due to inconsistent API (location key/name for NYC is opposite casing) if (bool(data['locations'])): if (provider_name == 'riseup' and data_gateway['location'] == 'new york city'): location_name = 'New York City' location_country_code = 'US' else: location_name = data['locations'][data_gateway['location']]['name'] location_country_code = data['locations'][data_gateway['location']]['country_code'] # Calyx specific due to empty location info if (provider_name == 'calyx' and data_gateway['host'] == 'vpn2.calyx.net'): location_name = 'New York City' location_country_code = 'US' gateways.append({ 'provider': provider_name, 'ip_address': data_gateway['ip_address'], 'host': data_gateway['host'], 'location': { 'name': location_name, 'country_code': location_country_code }, 'protocols': protocols, 'ports': ports }) return gateways def update_openvpn_configurations(openvpn_configurations, data, provider_name): openvpn_configurations[provider_name] = data return openvpn_configurations if os.path.exists(providers_info): with open(providers_info, 'r') as f: data = json.load(f) if is_provider_info_valid(data['provider_info_last_checked']): print('Re-using provider info...') providers = data['providers'] else: print('Updating provider info...') fetch_and_save_provider_info(providers, providers_info) else: # first run print('Fetching provider info...') fetch_and_save_provider_info(providers, providers_info) for provider in providers: ca_cert_path = os.path.join(provider['name'], ca_cert) if os.path.exists(ca_cert_path): print('Re-using CA certificate for {}...'.format(provider['name'])) else: print('Fetching CA certificate for {}...'.format(provider['name'])) os.makedirs(provider['name']) ca_cert_path = os.path.join(provider['name'], ca_cert) fetch_and_save_ca_cert(provider['ca_cert_uri'], ca_cert_path) print('Validating SHA256 fingerprints between CA certificate and provider info for {}...'.format( provider['name'] )) ca_cert_path = os.path.join(provider['name'], ca_cert) is_valid = validate_cert_fingerprint(provider['ca_cert_fingerprint'], ca_cert_path) if is_valid == False: print("CA certificate's SHA256 fingerprint does not match expected SHA256 fingerprint for {}, quitting...".format( provider['name'] )) raise SystemExit(0) print('Fingerprints match!') print('Fetching client certificate and private key for {}...'.format( provider['name'] )) client_cert_url = '{}/{}/cert'.format( provider['api_uri'], provider['api_version'] ) receive = requests.post(client_cert_url, verify=ca_cert_path) openvpn_pair_path = os.path.join(provider['name'], openvpn_pair) with open(openvpn_pair_path, 'wb') as f: f.write(receive.content) x509 = crypto.load_certificate(crypto.FILETYPE_PEM, open(openvpn_pair_path).read()) print('Client certificate issuer: {}'.format(format_subject_issuer(x509.get_issuer()))) print('Client certificate is valid from {} to {} and expires in {} days'.format( format_asn1_date(x509.get_notBefore()), format_asn1_date(x509.get_notAfter()), days_until_from_now(format_asn1_date(x509.get_notAfter())) )) print('Fetching encrypted internet proxy capabilities and gateways for {}...'.format( provider['name'] )) receive = requests.get('{}{}'.format( provider['api_uri'], provider['configs_path'] ), verify=ca_cert_path ) data = receive.json() receive = requests.get('{}{}'.format( provider['api_uri'], data['services']['eip'] ), verify=ca_cert_path ) data = receive.json() gateways = update_gateways( gateways, data, provider['name'] ) gateways = sorted(gateways, key=lambda k: k['location']['name']) openvpn_configurations = update_openvpn_configurations( openvpn_configurations, data['openvpn_configuration'], provider['name'] ) print('Splitting client certificate key pair file for {}...'.format( provider['name'] )) client_cert_path = os.path.join(provider['name'], client_cert) client_key_path = os.path.join(provider['name'], client_key) client_cert_file = open(client_cert_path, 'w') client_key_file = open(client_key_path, 'w') openvpn_pair_file = open(openvpn_pair_path, 'r') line = openvpn_pair_file.readline() while line != '-----END RSA PRIVATE KEY-----\n': client_key_file.write(line) line = openvpn_pair_file.readline() client_key_file.write(line) line = openvpn_pair_file.readline() while line: client_cert_file.write(line) line = openvpn_pair_file.readline() print('Ready!') ### Get user input print('\nServer:\n') for i, gateway in enumerate(gateways, start=1): print('{}. [{}] {}, {} ({} / {})'.format( i, gateway['provider'], gateway['location']['name'], gateway['location']['country_code'], gateway['host'], gateway['ip_address'] )) server_number_choice = int(input('\nEnter selection (#): ')) print('\nProtocol:\n') for i, protocol in enumerate(gateways[server_number_choice-1]['protocols'], start=1): print('{}. {}'.format(i, protocol.upper())) protocol_number_choice = int(input('\nEnter selection (#): ')) print('\nPort:\n') for i, port in enumerate(gateways[server_number_choice-1]['ports'], start=1): print('{}. {}'.format(i, port)) port_number_choice = int(input('\nEnter selection (#): ')) ovpn = 'bitmask-{}-{}-ip-{}-{}-{}.ovpn'.format( gateways[server_number_choice-1]['provider'], gateways[server_number_choice-1]['protocols'][protocol_number_choice-1], gateways[server_number_choice-1]['location']['name'].lower(), gateways[server_number_choice-1]['location']['country_code'].lower(), gateways[server_number_choice-1]['ports'][port_number_choice-1], ) bitmask_ovpns = 'bitmask_ovpns' ovpn_file_path = os.path.join(bitmask_ovpns, ovpn) if os.path.exists(bitmask_ovpns) is False: os.makedirs(bitmask_ovpns) print('\nGenerating OpenVPN configuration and writing to {}'.format(ovpn_file_path)) ovpn_file = open(ovpn_file_path, 'w') ovpn_file.write('client') ovpn_file.write('\n') ovpn_file.write('tls-client') ovpn_file.write('\n') ovpn_file.write('dev tun') ovpn_file.write('\n') ovpn_file.write('proto {}'.format( gateways[server_number_choice-1]['protocols'][protocol_number_choice-1] )) ovpn_file.write('\n') ovpn_file.write('remote {} {} # {} / {}, {}'.format( gateways[server_number_choice-1]['ip_address'], gateways[server_number_choice-1]['ports'][port_number_choice-1], gateways[server_number_choice-1]['host'], gateways[server_number_choice-1]['location']['name'], gateways[server_number_choice-1]['location']['country_code'] )) ovpn_file.write('\n') for k, v in openvpn_configurations[gateways[server_number_choice-1]['provider']].items(): if type(v) is bool: ovpn_file.write('{}'.format(k)) elif k == 'tls-cipher' and v == 'DHE-RSA-AES128-SHA': ovpn_file.write('{} {}'.format(k, 'TLS-DHE-RSA-WITH-AES-128-CBC-SHA')) else: ovpn_file.write('{} {}'.format(k, v)) ovpn_file.write('\n') ovpn_file.write('resolv-retry infinite') ovpn_file.write('\n') ovpn_file.write('nobind') ovpn_file.write('\n') ovpn_file.write('verb 3') ovpn_file.write('\n') ovpn_file.write('persist-key') ovpn_file.write('\n') ovpn_file.write('persist-tun') ovpn_file.write('\n') ovpn_file.write('reneg-sec 0') ovpn_file.write('\n') ovpn_file.write('pull') ovpn_file.write('\n') ovpn_file.write('auth-nocache') ovpn_file.write('\n') ovpn_file.write('script-security 2') ovpn_file.write('\n') ovpn_file.write('up /etc/openvpn/update-resolv-conf') ovpn_file.write('\n') ovpn_file.write('down /etc/openvpn/update-resolv-conf') ovpn_file.write('\n') ovpn_file.write('tls-version-min 1.2') ovpn_file.write('\n') ovpn_file.write('redirect-gateway ipv6') ovpn_file.write('\n') ovpn_file.write('remote-cert-tls server') ovpn_file.write('\n') ovpn_file.write('remote-cert-eku "TLS Web Server Authentication"') ovpn_file.write('\n') ovpn_file.write('verify-x509-name {} name'.format( gateways[server_number_choice-1]['host'] )) ovpn_file.write('\n') ovpn_file.write('') ovpn_file.write('\n') ca_cert_path = os.path.join( gateways[server_number_choice-1]['provider'], ca_cert ) ca_cert_file = open(ca_cert_path, 'r') line = ca_cert_file.readline() while line: ovpn_file.write(line) line = ca_cert_file.readline() ovpn_file.write('') ovpn_file.write('\n') ovpn_file.write('') ovpn_file.write('\n') client_cert_path = os.path.join( gateways[server_number_choice-1]['provider'], client_cert ) client_cert_file = open(client_cert_path, 'r') line = client_cert_file.readline() while line: ovpn_file.write(line) line = client_cert_file.readline() ovpn_file.write('') ovpn_file.write('\n') ovpn_file.write('') ovpn_file.write('\n') client_key_path = os.path.join( gateways[server_number_choice-1]['provider'], client_key ) client_key_file = open(client_key_path, 'r') line = client_key_file.readline() while line: ovpn_file.write(line) line = client_key_file.readline() ovpn_file.write('') print('Done!') print('Cleaning client certificate and private keys...') for provider in providers: client_cert_path = os.path.join(provider['name'], client_cert) client_key_path = os.path.join(provider['name'], client_key) openvpn_pair_path = os.path.join(provider['name'], openvpn_pair) os.remove(client_cert_path) os.remove(client_key_path) os.remove(openvpn_pair_path)