mirror of
https://github.com/mynodebtc/mynode.git
synced 2024-11-15 18:02:49 +00:00
1337 lines
41 KiB
Python
1337 lines
41 KiB
Python
from config import *
|
|
from threading import Timer
|
|
from werkzeug.routing import RequestRedirect
|
|
from flask import flash, redirect
|
|
from utilities import *
|
|
from enable_disable_functions import *
|
|
from systemctl_info import *
|
|
import time
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import random
|
|
import string
|
|
import re
|
|
import psutil
|
|
|
|
try:
|
|
import qrcode
|
|
except:
|
|
pass
|
|
try:
|
|
import subprocess32
|
|
except:
|
|
pass
|
|
|
|
# Globals
|
|
local_ip = "unknown"
|
|
cached_data = {}
|
|
warning_data = {}
|
|
ui_settings = None
|
|
|
|
#==================================
|
|
# Manage Device
|
|
#==================================
|
|
def reboot_device():
|
|
os.system("sync")
|
|
os.system("/usr/bin/mynode_stop_critical_services.sh")
|
|
os.system("reboot")
|
|
|
|
def shutdown_device():
|
|
os.system("sync")
|
|
os.system("/usr/bin/mynode_stop_critical_services.sh")
|
|
os.system("shutdown -h now")
|
|
|
|
def is_shutting_down():
|
|
return os.path.isfile("/tmp/shutting_down")
|
|
|
|
def factory_reset():
|
|
# Try and make sure drive is r/w
|
|
os.system("mount -o remount,rw /mnt/hdd")
|
|
|
|
# Reset subsystems that have local data
|
|
delete_quicksync_data()
|
|
|
|
# Delete LND data
|
|
delete_lnd_data()
|
|
|
|
# Delete Tor data
|
|
reset_tor()
|
|
|
|
# Disable services
|
|
os.system("systemctl disable electrs --no-pager")
|
|
os.system("systemctl disable lndhub --no-pager")
|
|
os.system("systemctl disable btcrpcexplorer --no-pager")
|
|
os.system("systemctl disable vpn --no-pager")
|
|
|
|
# Trigger drive to be reformatted on reboot
|
|
os.system("rm -f /mnt/hdd/.mynode")
|
|
|
|
# Reset password
|
|
os.system("/usr/bin/mynode_chpasswd.sh bolt")
|
|
|
|
# Reboot
|
|
reboot_device()
|
|
|
|
def check_and_mark_reboot_action(tmp_marker):
|
|
if os.path.isfile("/tmp/{}".format(tmp_marker)):
|
|
flash(u'Refresh prevented - action already triggered', category="error")
|
|
raise RequestRedirect("/")
|
|
touch("/tmp/{}".format(tmp_marker))
|
|
|
|
def reload_throttled_data():
|
|
global cached_data
|
|
if os.path.isfile("/tmp/get_throttled_data"):
|
|
cached_data["get_throttled_data"] = to_string( get_file_contents("/tmp/get_throttled_data") )
|
|
|
|
def get_throttled_data():
|
|
global cached_data
|
|
if "get_throttled_data" in cached_data:
|
|
data = cached_data["get_throttled_data"]
|
|
hex_data = int(data, 16)
|
|
r = {}
|
|
r["RAW_DATA"] = data
|
|
r["UNDERVOLTED"] = 1 if hex_data & 0x1 else 0
|
|
r["CAPPED"] = 1 if hex_data & 0x2 else 0
|
|
r["THROTTLED"] = 1 if hex_data & 0x4 else 0
|
|
r["SOFT_TEMPLIMIT"] = 1 if hex_data & 0x8 else 0
|
|
r["HAS_UNDERVOLTED"] = 1 if hex_data & 0x10000 else 0
|
|
r["HAS_CAPPED"] = 1 if hex_data & 0x20000 else 0
|
|
r["HAS_THROTTLED"] = 1 if hex_data & 0x40000 else 0
|
|
r["HAS_SOFT_TEMPLIMIT"] = 1 if hex_data & 0x80000 else 0
|
|
return r
|
|
else:
|
|
r = {}
|
|
r["RAW_DATA"] = "MISSING"
|
|
return r
|
|
|
|
#==================================
|
|
# Manage Versions and Upgrades
|
|
#==================================
|
|
def get_current_version():
|
|
current_version = "0.0"
|
|
try:
|
|
with open("/usr/share/mynode/version", "r") as f:
|
|
current_version = f.read().strip()
|
|
except:
|
|
current_version = "error"
|
|
return current_version
|
|
|
|
def get_current_beta_version():
|
|
current_beta_version = "0.0"
|
|
try:
|
|
with open("/usr/share/mynode/beta_version", "r") as f:
|
|
current_beta_version = f.read().strip()
|
|
except:
|
|
current_beta_version = "beta_not_installed"
|
|
return current_beta_version
|
|
|
|
def update_latest_version():
|
|
os.system("/usr/bin/mynode_get_latest_version.sh")
|
|
return True
|
|
|
|
def get_latest_version():
|
|
latest_version = "0.0"
|
|
try:
|
|
with open("/usr/share/mynode/latest_version", "r") as f:
|
|
latest_version = f.read().strip()
|
|
if latest_version == "":
|
|
latest_version = get_current_version()
|
|
except:
|
|
latest_version = get_current_version()
|
|
return latest_version
|
|
|
|
def get_latest_beta_version():
|
|
beta_version = ""
|
|
try:
|
|
with open("/usr/share/mynode/latest_beta_version", "r") as f:
|
|
beta_version = f.read().strip()
|
|
except:
|
|
beta_version = ""
|
|
return beta_version
|
|
|
|
def mark_upgrade_started():
|
|
touch("/tmp/upgrade_started")
|
|
|
|
def is_upgrade_running():
|
|
return os.path.isfile("/tmp/upgrade_started")
|
|
|
|
def upgrade_device():
|
|
if not is_upgrade_running():
|
|
mark_upgrade_started()
|
|
|
|
# Upgrade
|
|
os.system("mkdir -p /home/admin/upgrade_logs")
|
|
os.system("cp -f /usr/bin/mynode_upgrade.sh /usr/bin/mynode_upgrade_running.sh")
|
|
file1 = "/home/admin/upgrade_logs/upgrade_log_from_{}_upgrade.txt".format(get_current_version())
|
|
file2 = "/home/admin/upgrade_logs/upgrade_log_latest.txt"
|
|
cmd = "/usr/bin/mynode_upgrade_running.sh 2>&1 | tee {} {}".format(file1, file2)
|
|
ret = subprocess.call(cmd, shell=True)
|
|
if ret != 0:
|
|
# Try one more time....
|
|
subprocess.call(cmd, shell=True)
|
|
|
|
# Sync
|
|
os.system("sync")
|
|
time.sleep(1)
|
|
|
|
# Reboot
|
|
reboot_device()
|
|
|
|
def upgrade_device_beta():
|
|
if not is_upgrade_running():
|
|
mark_upgrade_started()
|
|
|
|
# Upgrade
|
|
os.system("mkdir -p /home/admin/upgrade_logs")
|
|
os.system("cp -f /usr/bin/mynode_upgrade.sh /usr/bin/mynode_upgrade_running.sh")
|
|
file1 = "/home/admin/upgrade_logs/upgrade_log_from_{}_upgrade.txt".format(get_current_version())
|
|
file2 = "/home/admin/upgrade_logs/upgrade_log_latest.txt"
|
|
cmd = "/usr/bin/mynode_upgrade_running.sh beta 2>&1 | tee {} {}".format(file1, file2)
|
|
ret = subprocess.call(cmd, shell=True)
|
|
if ret != 0:
|
|
# Try one more time....
|
|
subprocess.call(cmd, shell=True)
|
|
|
|
# Sync
|
|
os.system("sync")
|
|
time.sleep(1)
|
|
|
|
# Reboot
|
|
reboot_device()
|
|
|
|
def did_upgrade_fail():
|
|
return os.path.isfile("/mnt/hdd/mynode/settings/upgrade_error")
|
|
|
|
def cleanup_log(log):
|
|
log = re.sub("(product_key|PRODUCT_KEY)=[0-9A-Z]+", "product_key=************", log)
|
|
return log
|
|
|
|
def get_recent_upgrade_log():
|
|
log = to_string( get_file_contents("/home/admin/upgrade_logs/upgrade_log_latest.txt") )
|
|
return cleanup_log(log)
|
|
|
|
def get_all_upgrade_logs():
|
|
log_list = []
|
|
folder = "/home/admin/upgrade_logs/"
|
|
log_id = 0
|
|
|
|
try:
|
|
# Add latest upgrade log
|
|
if os.path.isfile( os.path.join(folder, "upgrade_log_latest.txt") ):
|
|
log = {}
|
|
log["id"] = log_id
|
|
log["name"] = "Latest Upgrade"
|
|
modTimeSeconds = os.path.getmtime( os.path.join(folder, "upgrade_log_latest.txt") )
|
|
log["date"] = time.strftime('%Y-%m-%d', time.localtime(modTimeSeconds))
|
|
log["log"] = get_recent_upgrade_log()
|
|
log_list.append( log )
|
|
log_id += 1
|
|
|
|
# Add file logs
|
|
if os.path.isdir(folder):
|
|
for f in os.listdir(folder):
|
|
fullpath = os.path.join(folder, f)
|
|
if os.path.isfile( fullpath ):
|
|
log = {}
|
|
log["id"] = log_id
|
|
log["name"] = f
|
|
modTimeSeconds = os.path.getmtime(fullpath)
|
|
log["date"] = time.strftime('%Y-%m-%d', time.localtime(modTimeSeconds))
|
|
log["log"] = cleanup_log( to_string( get_file_contents(fullpath) ) )
|
|
log_list.append( log )
|
|
log_id += 1
|
|
except Exception as e:
|
|
pass
|
|
|
|
return log_list
|
|
|
|
def has_checkin_error():
|
|
return os.path.isfile("/tmp/check_in_error")
|
|
|
|
|
|
#==================================
|
|
# Reseller Info
|
|
#==================================
|
|
def is_device_from_reseller():
|
|
return os.path.isfile("/opt/mynode/custom/reseller")
|
|
|
|
|
|
#==================================
|
|
# Device Info
|
|
#==================================
|
|
def get_system_uptime():
|
|
uptime = to_string(subprocess.check_output('awk \'{print int($1/86400)" days "int($1%86400/3600)" hour(s) "int(($1%3600)/60)" minute(s)"}\' /proc/uptime', shell=True))
|
|
uptime = uptime.strip()
|
|
return uptime
|
|
|
|
def get_system_uptime_in_seconds():
|
|
uptime = to_string(subprocess.check_output('awk \'{print $1}\' /proc/uptime', shell=True))
|
|
uptime = int(float(uptime.strip()))
|
|
return uptime
|
|
|
|
def get_system_time_in_ms():
|
|
return int(round(time.time() * 1000))
|
|
|
|
def get_system_date():
|
|
date = to_string(subprocess.check_output('date', shell=True))
|
|
date = date.strip()
|
|
return date
|
|
|
|
def get_device_serial():
|
|
global cached_data
|
|
if "serial" in cached_data:
|
|
return cached_data["serial"]
|
|
|
|
serial = to_string(subprocess.check_output("mynode-get-device-serial", shell=True))
|
|
serial = serial.strip()
|
|
|
|
cached_data["serial"] = serial
|
|
return serial
|
|
|
|
def get_device_type():
|
|
global cached_data
|
|
if "device_type" in cached_data:
|
|
return cached_data["device_type"]
|
|
|
|
device = to_string(subprocess.check_output("mynode-get-device-type", shell=True).strip())
|
|
cached_data["device_type"] = device
|
|
return device
|
|
|
|
def get_device_arch():
|
|
global cached_data
|
|
if "device_arch" in cached_data:
|
|
return cached_data["device_arch"]
|
|
|
|
arch = to_string(subprocess.check_output("uname -m", shell=True).decode("utf-8").strip())
|
|
cached_data["device_arch"] = arch
|
|
return arch
|
|
|
|
def get_device_ram():
|
|
global cached_data
|
|
if "ram" in cached_data:
|
|
return cached_data["ram"]
|
|
|
|
ram = to_string(subprocess.check_output("free --giga | grep Mem | awk '{print $2}'", shell=True).strip())
|
|
cached_data["ram"] = ram
|
|
return ram
|
|
|
|
def get_device_temp():
|
|
if is_cached("device_temp", 60):
|
|
return get_cached_data("device_temp")
|
|
|
|
device_temp = "..."
|
|
try:
|
|
results = to_string(subprocess.check_output("cat /sys/class/thermal/thermal_zone0/temp", shell=True))
|
|
temp = int(results) / 1000
|
|
device_temp = "{:.1f}".format(temp)
|
|
update_cached_data("device_temp", device_temp)
|
|
except:
|
|
return device_temp
|
|
return device_temp
|
|
|
|
def get_ram_usage():
|
|
if is_cached("ram_usage", 120):
|
|
return get_cached_data("ram_usage")
|
|
|
|
ram_usage = "..."
|
|
try:
|
|
ram_info = psutil.virtual_memory()
|
|
ram_usage = "{:.1f}%".format(ram_info.percent)
|
|
except:
|
|
return ram_usage
|
|
return ram_usage
|
|
|
|
def get_swap_usage():
|
|
if is_cached("swap_usage", 120):
|
|
return get_cached_data("swap_usage")
|
|
|
|
swap_usage = "..."
|
|
try:
|
|
swap_info = psutil.swap_memory()
|
|
swap_usage = "{:.1f}%".format(swap_info.percent)
|
|
except:
|
|
return swap_usage
|
|
return swap_usage
|
|
|
|
def get_local_ip():
|
|
local_ip = "unknown"
|
|
try:
|
|
local_ip = to_string(subprocess.check_output('/usr/bin/get_local_ip.py', shell=True).strip())
|
|
except:
|
|
local_ip = "error"
|
|
|
|
return local_ip
|
|
|
|
def get_local_ip_subnet_conflict():
|
|
ip = get_local_ip()
|
|
if ip.startswith("172."):
|
|
return True
|
|
return False
|
|
|
|
def get_device_changelog():
|
|
changelog = ""
|
|
try:
|
|
changelog = to_string(subprocess.check_output(["cat", "/usr/share/mynode/changelog"]))
|
|
except:
|
|
changelog = "ERROR"
|
|
return changelog
|
|
|
|
def has_changed_password():
|
|
try:
|
|
with open("/home/bitcoin/.mynode/.hashedpw", "r") as f:
|
|
hashedpw = f.read().strip()
|
|
if hashedpw != "d0b3cba71f725563d316ea3516099328042095d10f4571be25c07f9ce31985a5":
|
|
return True
|
|
except:
|
|
return False
|
|
return False
|
|
|
|
def hide_password_warning():
|
|
if os.path.isfile("/mnt/hdd/mynode/settings/hide_password_warning"):
|
|
return True
|
|
return False
|
|
|
|
def is_mount_read_only(mnt):
|
|
with open('/proc/mounts') as f:
|
|
for line in f:
|
|
device, mount_point, filesystem, flags, __, __ = line.split()
|
|
flags = flags.split(',')
|
|
if mount_point == mnt:
|
|
return 'ro' in flags
|
|
return False
|
|
|
|
def set_swap_size(size):
|
|
size_mb = int(size) * 1024
|
|
os.system("sed -i 's|CONF_SWAPSIZE=.*|CONF_SWAPSIZE={}|' /etc/dphys-swapfile".format(size_mb))
|
|
return set_file_contents("/mnt/hdd/mynode/settings/swap_size", size)
|
|
|
|
def get_swap_size():
|
|
return to_string( get_file_contents("/mnt/hdd/mynode/settings/swap_size") )
|
|
|
|
#==================================
|
|
# myNode Status
|
|
#==================================
|
|
STATE_DRIVE_MISSING = "drive_missing"
|
|
STATE_DRIVE_CONFIRM_FORMAT = "drive_format_confirm"
|
|
STATE_DRIVE_FORMATTING = "drive_formatting"
|
|
STATE_DRIVE_MOUNTED = "drive_mounted"
|
|
STATE_DRIVE_CLONE = "drive_clone"
|
|
STATE_DRIVE_FULL = "drive_full"
|
|
STATE_DOCKER_RESET = "docker_reset"
|
|
STATE_GEN_DHPARAM = "gen_dhparam"
|
|
STATE_QUICKSYNC_DOWNLOAD = "quicksync_download"
|
|
STATE_QUICKSYNC_COPY = "quicksync_copy"
|
|
STATE_QUICKSYNC_RESET = "quicksync_reset"
|
|
STATE_STABLE = "stable"
|
|
STATE_ROOTFS_READ_ONLY = "rootfs_read_only"
|
|
STATE_HDD_READ_ONLY = "hdd_read_only"
|
|
STATE_SHUTTING_DOWN = "shutting_down"
|
|
STATE_UPGRADING = "upgrading"
|
|
STATE_UNKNOWN = "unknown"
|
|
|
|
def get_mynode_status():
|
|
try:
|
|
status_file = "/tmp/.mynode_status"
|
|
status = STATE_UNKNOWN
|
|
|
|
# Get status
|
|
if (os.path.isfile(status_file)):
|
|
try:
|
|
with open(status_file, "r") as f:
|
|
status = f.read().strip()
|
|
except:
|
|
status = STATE_DRIVE_MISSING
|
|
else:
|
|
status = STATE_DRIVE_MISSING
|
|
|
|
# If its been a while, check for error conditions
|
|
uptime_in_sec = get_system_uptime_in_seconds()
|
|
if uptime_in_sec > 120:
|
|
# Check for read-only sd card
|
|
if is_mount_read_only("/"):
|
|
return STATE_ROOTFS_READ_ONLY
|
|
# Check for read-only drive (unless cloning - it purposefully mounts read only)
|
|
if is_mount_read_only("/mnt/hdd") and status != STATE_DRIVE_CLONE:
|
|
return STATE_HDD_READ_ONLY
|
|
except:
|
|
status = STATE_UNKNOWN
|
|
return status
|
|
|
|
#==================================
|
|
# myNode Clone Tool
|
|
#==================================
|
|
CLONE_STATE_DETECTING = "detecting"
|
|
CLONE_STATE_ERROR = "error"
|
|
CLONE_STATE_NEED_CONFIRM = "need_confirm"
|
|
CLONE_STATE_IN_PROGRESS = "in_progress"
|
|
CLONE_STATE_COMPLETE = "complete"
|
|
|
|
def get_clone_state():
|
|
return to_string( get_file_contents("/tmp/.clone_state") )
|
|
|
|
def get_clone_error():
|
|
return to_string( get_file_contents("/tmp/.clone_error") )
|
|
|
|
def get_clone_progress():
|
|
return to_string( get_file_contents("/tmp/.clone_progress") )
|
|
|
|
def get_clone_source_drive():
|
|
return to_string( get_file_contents("/tmp/.clone_source") )
|
|
|
|
def get_clone_target_drive():
|
|
return to_string( get_file_contents("/tmp/.clone_target") )
|
|
|
|
def get_clone_target_drive_has_mynode():
|
|
return os.path.isfile("/tmp/.clone_target_drive_has_mynode")
|
|
|
|
def get_drive_info(drive):
|
|
data = {}
|
|
data["name"] = "NOT_FOUND"
|
|
try:
|
|
lsblk_output = to_string(subprocess.check_output("lsblk -io KNAME,TYPE,SIZE,MODEL,VENDOR /dev/{} | grep disk".format(drive), shell=True).decode("utf-8"))
|
|
parts = lsblk_output.split()
|
|
data["name"] = parts[0]
|
|
data["size"] = parts[2]
|
|
data["model"] = parts[3]
|
|
data["vendor"] = parts[4]
|
|
except:
|
|
pass
|
|
return data
|
|
|
|
|
|
#==================================
|
|
# UI Functions
|
|
#==================================
|
|
def init_ui_setting_defaults(ui_settings):
|
|
if "darkmode" not in ui_settings:
|
|
ui_settings["darkmode"] = False
|
|
if "price_ticker" not in ui_settings:
|
|
ui_settings["price_ticker"] = False
|
|
if "pinned_lightning_details" not in ui_settings:
|
|
ui_settings["pinned_lightning_details"] = False
|
|
if "background" not in ui_settings:
|
|
ui_settings["background"] = "none"
|
|
return ui_settings
|
|
|
|
def read_ui_settings():
|
|
global ui_settings
|
|
if ui_settings != None:
|
|
return ui_settings
|
|
|
|
ui_hdd_file = '/mnt/hdd/mynode/settings/ui.json'
|
|
ui_mynode_file = '/home/bitcoin/.mynode/ui.json'
|
|
|
|
# read ui.json from HDD
|
|
try:
|
|
if os.path.isfile(ui_hdd_file):
|
|
with open(ui_hdd_file, 'r') as fp:
|
|
ui_settings = json.load(fp)
|
|
# read ui.json from mynode
|
|
elif os.path.isfile(ui_mynode_file):
|
|
with open(ui_mynode_file, 'r') as fp:
|
|
ui_settings = json.load(fp)
|
|
except Exception as e:
|
|
# Error reading ui settings
|
|
pass
|
|
|
|
# If no files were read, init variable and mark we need to write files
|
|
need_file_write = False
|
|
if ui_settings == None:
|
|
ui_settings = {}
|
|
need_file_write = True
|
|
|
|
# Set reseller
|
|
ui_settings["reseller"] = is_device_from_reseller()
|
|
|
|
# Load defaults
|
|
ui_settings = init_ui_setting_defaults(ui_settings)
|
|
|
|
if need_file_write:
|
|
write_ui_settings(ui_settings)
|
|
|
|
return ui_settings
|
|
|
|
def write_ui_settings(ui_settings_new):
|
|
global ui_settings
|
|
ui_settings = ui_settings_new
|
|
|
|
ui_hdd_file = '/mnt/hdd/mynode/settings/ui.json'
|
|
ui_mynode_file = '/home/bitcoin/.mynode/ui.json'
|
|
try:
|
|
with open(ui_hdd_file, 'w') as fp:
|
|
json.dump(ui_settings, fp)
|
|
|
|
with open(ui_mynode_file, 'w') as fp:
|
|
json.dump(ui_settings, fp)
|
|
except:
|
|
pass
|
|
|
|
def get_ui_setting(name):
|
|
ui_settings = read_ui_settings()
|
|
return ui_settings[name]
|
|
|
|
def set_ui_setting(name, value):
|
|
ui_settings = read_ui_settings()
|
|
ui_settings[name] = value
|
|
write_ui_settings(ui_settings)
|
|
|
|
def toggle_ui_setting(name):
|
|
set_ui_setting(name, not get_ui_setting(name))
|
|
|
|
|
|
# def get_background_choices():
|
|
# choices = []
|
|
# choices.append("none")
|
|
# for filename in os.listdir("/var/www/mynode/static/images/backgrounds/"):
|
|
# if filename.endswith(".png") or filename.endswith(".jpg"):
|
|
# name = filename.replace(".png","").replace(".jpg","")
|
|
# choices.append(name)
|
|
# return choices
|
|
|
|
def is_https_forced():
|
|
return os.path.isfile('/home/bitcoin/.mynode/https_forced')
|
|
|
|
def force_https(force):
|
|
if force:
|
|
touch("/home/bitcoin/.mynode/https_forced")
|
|
else:
|
|
delete_file("/home/bitcoin/.mynode/https_forced")
|
|
|
|
# Regen cert
|
|
def regen_https_cert():
|
|
os.system("rm -rf /home/bitcoin/.mynode/https")
|
|
os.system("rm -rf /mnt/hdd/mynode/settings/https")
|
|
os.system("/usr/bin/mynode_gen_cert.sh https 825")
|
|
os.system("sync")
|
|
os.system("systemctl restart nginx")
|
|
|
|
def get_flask_secret_key():
|
|
if os.path.isfile("/home/bitcoin/.mynode/flask_secret_key"):
|
|
key = to_string( get_file_contents("/home/bitcoin/.mynode/flask_secret_key") )
|
|
else:
|
|
letters = string.ascii_letters
|
|
key = ''.join(random.choice(letters) for i in range(32))
|
|
set_file_contents("/home/bitcoin/.mynode/flask_secret_key", key)
|
|
return key
|
|
|
|
def get_flask_session_timeout():
|
|
try:
|
|
if os.path.isfile("/home/bitcoin/.mynode/flask_session_timeout"):
|
|
timeout = to_string( get_file_contents("/home/bitcoin/.mynode/flask_session_timeout") )
|
|
parts = timeout.split(",")
|
|
d = parts[0]
|
|
h = parts[1]
|
|
return int(d),int(h)
|
|
else:
|
|
set_file_contents("/home/bitcoin/.mynode/flask_session_timeout", "7,0")
|
|
return 7,0
|
|
except:
|
|
return 7,0
|
|
|
|
def set_flask_session_timeout(days, hours):
|
|
set_file_contents("/home/bitcoin/.mynode/flask_session_timeout", "{},{}".format(days, hours))
|
|
os.system("sync")
|
|
|
|
def is_www_python3():
|
|
return os.path.isfile('/home/bitcoin/.mynode/.www_use_python3')
|
|
|
|
def set_www_python3(use_python3):
|
|
if use_python3:
|
|
touch("/home/bitcoin/.mynode/.www_use_python3")
|
|
else:
|
|
delete_file("/home/bitcoin/.mynode/.www_use_python3")
|
|
|
|
|
|
#==================================
|
|
# Web Server Functions
|
|
#==================================
|
|
def restart_flask():
|
|
os.system("systemctl restart www")
|
|
|
|
|
|
#==================================
|
|
# Uploader Functions
|
|
#==================================
|
|
def is_uploader():
|
|
return os.path.isfile("/mnt/hdd/mynode/settings/uploader")
|
|
def set_uploader():
|
|
touch("/mnt/hdd/mynode/settings/uploader")
|
|
def unset_uploader():
|
|
delete_file("/mnt/hdd/mynode/settings/uploader")
|
|
|
|
|
|
#==================================
|
|
# QuickSync Functions
|
|
#==================================
|
|
def is_quicksync_enabled():
|
|
return not os.path.isfile("/mnt/hdd/mynode/settings/quicksync_disabled")
|
|
def disable_quicksync():
|
|
touch("/mnt/hdd/mynode/settings/quicksync_disabled")
|
|
def enable_quicksync():
|
|
delete_file("/mnt/hdd/mynode/settings/quicksync_disabled")
|
|
|
|
def settings_disable_quicksync():
|
|
disable_quicksync()
|
|
stop_bitcoin()
|
|
stop_quicksync()
|
|
disable_quicksync() # Try disable again (some users had disable fail)
|
|
delete_quicksync_data()
|
|
reboot_device()
|
|
|
|
def settings_enable_quicksync():
|
|
stop_bitcoin()
|
|
stop_quicksync()
|
|
enable_quicksync()
|
|
delete_quicksync_data()
|
|
reboot_device()
|
|
|
|
def delete_quicksync_data():
|
|
os.system("rm -rf /mnt/hdd/mynode/quicksync")
|
|
os.system("rm -rf /home/bitcoin/.config/transmission") # Old dir
|
|
os.system("rm -rf /mnt/hdd/mynode/.config/transmission")
|
|
|
|
def stop_quicksync():
|
|
os.system("systemctl stop quicksync")
|
|
|
|
def restart_quicksync():
|
|
os.system('echo "quicksync_reset" > /tmp/.mynode_status')
|
|
stop_bitcoin()
|
|
stop_quicksync()
|
|
delete_bitcoin_data()
|
|
delete_quicksync_data()
|
|
enable_quicksync()
|
|
reboot_device()
|
|
|
|
def get_quicksync_log():
|
|
log = "UNKNOWN"
|
|
if is_quicksync_enabled():
|
|
try:
|
|
log = to_string(subprocess.check_output(["mynode-get-quicksync-status"]).decode("utf8"))
|
|
except:
|
|
log = "ERROR"
|
|
else:
|
|
log = "Disabled"
|
|
return log
|
|
|
|
|
|
#==================================
|
|
# Product Key Functions
|
|
#==================================
|
|
def set_skipped_product_key():
|
|
touch("/home/bitcoin/.mynode/.product_key_skipped")
|
|
touch("/mnt/hdd/mynode/settings/.product_key_skipped")
|
|
def unset_skipped_product_key():
|
|
delete_file("/home/bitcoin/.mynode/.product_key_skipped")
|
|
delete_file("/mnt/hdd/mynode/settings/.product_key_skipped")
|
|
def skipped_product_key():
|
|
return os.path.isfile("/home/bitcoin/.mynode/.product_key_skipped") or \
|
|
os.path.isfile("/mnt/hdd/mynode/settings/.product_key_skipped")
|
|
def is_community_edition():
|
|
return skipped_product_key()
|
|
|
|
def delete_product_key():
|
|
delete_file("/home/bitcoin/.mynode/.product_key")
|
|
delete_file("/mnt/hdd/mynode/settings/.product_key")
|
|
def has_product_key():
|
|
return os.path.isfile("/home/bitcoin/.mynode/.product_key")
|
|
def get_product_key():
|
|
product_key = "no_product_key"
|
|
if skipped_product_key():
|
|
return "community_edition"
|
|
|
|
if not has_product_key():
|
|
return "product_key_missing"
|
|
|
|
try:
|
|
with open("/home/bitcoin/.mynode/.product_key", "r") as f:
|
|
product_key = f.read().strip()
|
|
except:
|
|
product_key = "product_key_error"
|
|
return product_key
|
|
def is_valid_product_key():
|
|
return not os.path.isfile("/home/bitcoin/.mynode/.product_key_error")
|
|
def save_product_key(product_key):
|
|
pk = product_key.replace("-","")
|
|
os.system("echo '{}' > /home/bitcoin/.mynode/.product_key".format(pk))
|
|
os.system("echo '{}' > /mnt/hdd/mynode/settings/.product_key".format(pk))
|
|
def delete_product_key_error():
|
|
os.system("rm -rf /home/bitcoin/.mynode/.product_key_error")
|
|
os.system("rm -rf /mnt/hdd/mynode/settings/.product_key_error")
|
|
|
|
def recheck_product_key():
|
|
delete_product_key_error()
|
|
os.system("systemctl restart check_in")
|
|
|
|
|
|
#==================================
|
|
# Premium+ Token Functions
|
|
#==================================
|
|
def delete_premium_plus_token():
|
|
delete_file("/home/bitcoin/.mynode/.premium_plus_token")
|
|
delete_file("/mnt/hdd/mynode/settings/.premium_plus_token")
|
|
def has_premium_plus_token():
|
|
return os.path.isfile("/home/bitcoin/.mynode/.premium_plus_token") or \
|
|
os.path.isfile("/mnt/hdd/mynode/settings/.premium_plus_token")
|
|
def get_premium_plus_token():
|
|
token = "error_1"
|
|
if not has_premium_plus_token():
|
|
return ""
|
|
|
|
try:
|
|
if os.path.isfile("/home/bitcoin/.mynode/.premium_plus_token"):
|
|
with open("/home/bitcoin/.mynode/.premium_plus_token", "r") as f:
|
|
token = f.read().strip()
|
|
elif os.path.isfile("/mnt/hdd/mynode/settings/.premium_plus_token"):
|
|
with open("/mnt/hdd/mynode/settings/.premium_plus_token", "r") as f:
|
|
token = f.read().strip()
|
|
except:
|
|
token = "error_2"
|
|
return token
|
|
def reset_premium_plus_token_status():
|
|
delete_file("/home/bitcoin/.mynode/.premium_plus_token_status")
|
|
def set_premium_plus_token_status(msg):
|
|
os.system("echo '{}' > /home/bitcoin/.mynode/.premium_plus_token_status".format(msg))
|
|
def get_premium_plus_token_status():
|
|
status = "UNKNOWN"
|
|
if not has_premium_plus_token():
|
|
return "No Token Set"
|
|
if not os.path.isfile("/home/bitcoin/.mynode/.premium_plus_token_status"):
|
|
return "Updating..."
|
|
try:
|
|
with open("/home/bitcoin/.mynode/.premium_plus_token_status", "r") as f:
|
|
status = f.read().strip()
|
|
except:
|
|
status = "STATUS_ERROR_2"
|
|
return status
|
|
def get_premium_plus_is_connected():
|
|
status = get_premium_plus_token_status()
|
|
if status == "OK":
|
|
return True
|
|
return False
|
|
def update_premium_plus_last_sync_time():
|
|
t = int(round(time.time()))
|
|
os.system("echo '{}' > /home/bitcoin/.mynode/.premium_plus_last_sync".format(t))
|
|
def get_premium_plus_last_sync():
|
|
try:
|
|
now = int(round(time.time()))
|
|
last = int(get_file_contents("/home/bitcoin/.mynode/.premium_plus_last_sync"))
|
|
diff_min = int((now - last) / 60)
|
|
if diff_min == 0:
|
|
return "Now"
|
|
else:
|
|
return "{} minutes(s) ago".format(diff_min)
|
|
except Exception as e:
|
|
return "Unknown"
|
|
def save_premium_plus_token(token):
|
|
set_file_contents("/home/bitcoin/.mynode/.premium_plus_token", token)
|
|
set_file_contents("/mnt/hdd/mynode/settings/.premium_plus_token", token)
|
|
|
|
def recheck_premium_plus_token():
|
|
reset_premium_plus_token_status()
|
|
os.system("systemctl restart premium_plus_connect")
|
|
|
|
def get_premium_plus_setting_names():
|
|
return ["sync_status","sync_bitcoin_and_lightning","backup_scb","watchtower"]
|
|
def get_premium_plus_settings():
|
|
names = get_premium_plus_setting_names()
|
|
settings = {}
|
|
for n in names:
|
|
settings[n] = False
|
|
for n in names:
|
|
settings[n] = settings_file_exists(n)
|
|
return settings
|
|
|
|
#==================================
|
|
# Drive Repair Functions
|
|
#==================================
|
|
def is_drive_being_repaired():
|
|
return os.path.isfile("/tmp/repairing_drive")
|
|
def has_fsck_error():
|
|
return os.path.isfile("/tmp/fsck_error")
|
|
def clear_fsck_error():
|
|
os.system("rm -f /tmp/fsck_error")
|
|
def get_fsck_results():
|
|
try:
|
|
with open("/tmp/fsck_results", "r") as f:
|
|
return f.read()
|
|
except:
|
|
return "ERROR"
|
|
return "ERROR"
|
|
|
|
def set_skip_fsck(skip):
|
|
if skip:
|
|
touch("/home/bitcoin/.mynode/skip_fsck")
|
|
else:
|
|
delete_file("/home/bitcoin/.mynode/skip_fsck")
|
|
def skip_fsck():
|
|
return os.path.isfile("/home/bitcoin/.mynode/skip_fsck")
|
|
|
|
def has_sd_rw_error():
|
|
return os.path.isfile("/tmp/sd_rw_error")
|
|
|
|
|
|
#==================================
|
|
# Out of Memory Error Functions
|
|
#==================================
|
|
def has_oom_error():
|
|
return os.path.isfile("/tmp/oom_error")
|
|
def clear_oom_error():
|
|
delete_file("/tmp/oom_error")
|
|
delete_file("/tmp/oom_info")
|
|
def set_oom_error(oom_error):
|
|
touch("/tmp/oom_error")
|
|
set_file_contents("/tmp/oom_info", oom_error)
|
|
def get_oom_error_info():
|
|
try:
|
|
with open("/tmp/oom_info", "r") as f:
|
|
return f.read()
|
|
except:
|
|
return "ERROR"
|
|
return "ERROR"
|
|
|
|
|
|
#==================================
|
|
# Docker Functions
|
|
#==================================
|
|
def is_installing_docker_images():
|
|
return os.path.isfile("/tmp/installing_docker_images")
|
|
|
|
def get_docker_image_build_status():
|
|
status_code = get_service_status_code("docker_images")
|
|
|
|
if status_code != 0:
|
|
return "Failed... Retrying Later"
|
|
|
|
if is_installing_docker_images():
|
|
return "Installing..."
|
|
else:
|
|
return "Installation Complete"
|
|
|
|
return "Unknown"
|
|
|
|
def get_docker_image_build_status_color():
|
|
status_code = get_service_status_code("docker_images")
|
|
if status_code != 0:
|
|
return "red"
|
|
return "green"
|
|
|
|
def reset_docker():
|
|
# Delete docker data
|
|
touch("/home/bitcoin/reset_docker")
|
|
|
|
# Reset marker files
|
|
os.system("rm -f /mnt/hdd/mynode/settings/webssh2_version")
|
|
os.system("rm -f /mnt/hdd/mynode/settings/mempool_version")
|
|
os.system("rm -f /mnt/hdd/mynode/settings/dojo_version")
|
|
|
|
# Delete Dojo files
|
|
os.system("rm -rf /opt/download/dojo")
|
|
os.system("rm -rf /mnt/hdd/mynode/dojo")
|
|
|
|
os.system("sync")
|
|
reboot_device()
|
|
|
|
def get_docker_running_containers():
|
|
containers = []
|
|
|
|
# If docker not running, return empty
|
|
if get_service_status_code("docker") != 0:
|
|
return containers
|
|
|
|
try:
|
|
text = subprocess.check_output("docker ps --format '{{.Names}}'", shell=True, timeout=3).decode("utf8")
|
|
containers = text.splitlines()
|
|
except Exception as e:
|
|
containers = [str(e)]
|
|
return containers
|
|
|
|
|
|
#==================================
|
|
# USB Extras Functions
|
|
#==================================
|
|
def get_usb_extras():
|
|
devices = []
|
|
json_path = "/tmp/usb_extras.json"
|
|
if os.path.isfile(json_path):
|
|
try:
|
|
with open(json_path) as f:
|
|
devices = json.load(f)
|
|
except Exception as e:
|
|
log_message("EXCEPTION in get_usb_extras: " + str(e))
|
|
devices = []
|
|
log_message("get_usb_extras: {}".format(len(devices)))
|
|
return devices
|
|
|
|
|
|
#==================================
|
|
# Bitcoin Functions
|
|
#==================================
|
|
def get_bitcoin_rpc_password():
|
|
try:
|
|
with open("/mnt/hdd/mynode/settings/.btcrpcpw", "r") as f:
|
|
return f.read()
|
|
except:
|
|
return "ERROR"
|
|
return "ERROR"
|
|
|
|
def stop_bitcoin():
|
|
os.system("systemctl stop bitcoin")
|
|
|
|
def get_bitcoin_log_file():
|
|
if is_testnet_enabled():
|
|
return "/mnt/hdd/mynode/bitcoin/testnet3/debug.log"
|
|
return "/mnt/hdd/mynode/bitcoin/debug.log"
|
|
|
|
def reset_bitcoin_env_file():
|
|
os.system("echo 'BTCARGS=' > "+BITCOIN_ENV_FILE)
|
|
|
|
def delete_bitcoin_data():
|
|
os.system("rm -rf /mnt/hdd/mynode/bitcoin")
|
|
os.system("rm -rf /mnt/hdd/mynode/quicksync/.quicksync_complete")
|
|
#os.system("rm -rf /mnt/hdd/mynode/settings/.btcrpc_environment")
|
|
#os.system("rm -rf /mnt/hdd/mynode/settings/.btcrpcpw")
|
|
|
|
def reset_blockchain():
|
|
stop_bitcoin()
|
|
delete_bitcoin_data()
|
|
reboot_device()
|
|
|
|
|
|
#==================================
|
|
# LND Functions
|
|
#==================================
|
|
def delete_lnd_data():
|
|
os.system("rm -rf "+LND_DATA_FOLDER)
|
|
os.system("rm -rf /tmp/lnd_deposit_address")
|
|
os.system("rm -rf /home/bitcoin/.lnd-admin/credentials.json")
|
|
os.system("rm -rf /mnt/hdd/mynode/settings/.lndpw")
|
|
os.system("rm -rf /home/admin/.lnd/")
|
|
return True
|
|
|
|
def delete_lnd_watchtower_client_data():
|
|
os.system("rm -rf /mnt/hdd/mynode/lnd/data/graph/mainnet/wtclient.db")
|
|
os.system("rm -rf /mnt/hdd/mynode/lnd/data/graph/mainnet/wtclient.db.last-compacted")
|
|
return True
|
|
|
|
def delete_lnd_watchtower_server_data():
|
|
os.system("rm -rf /mnt/hdd/mynode/lnd/data/watchtower")
|
|
return True
|
|
|
|
|
|
#==================================
|
|
# Mainnet / Testnet Functions
|
|
#==================================
|
|
def is_testnet_enabled():
|
|
return os.path.isfile("/mnt/hdd/mynode/settings/.testnet_enabled")
|
|
def enable_testnet():
|
|
touch("/mnt/hdd/mynode/settings/.testnet_enabled")
|
|
def disable_testnet():
|
|
delete_file("/mnt/hdd/mynode/settings/.testnet_enabled")
|
|
def toggle_testnet():
|
|
if is_testnet_enabled():
|
|
disable_testnet()
|
|
else:
|
|
enable_testnet()
|
|
|
|
#==================================
|
|
# Electrum Server Functions
|
|
#==================================
|
|
def stop_electrs():
|
|
os.system("systemctl stop electrs")
|
|
|
|
def restart_electrs_actual():
|
|
os.system("systemctl restart electrs")
|
|
|
|
def restart_electrs():
|
|
t = Timer(0.1, restart_electrs_actual)
|
|
t.start()
|
|
|
|
def delete_electrs_data():
|
|
os.system("rm -rf /mnt/hdd/mynode/electrs/bitcoin")
|
|
os.system("rm -rf /mnt/hdd/mynode/electrs/testnet")
|
|
|
|
def reset_electrs():
|
|
stop_electrs()
|
|
delete_electrs_data()
|
|
restart_electrs()
|
|
|
|
#==================================
|
|
# RTL Functions
|
|
#==================================
|
|
def reset_rtl_config():
|
|
os.system("rm -rf /mnt/hdd/mynode/rtl/RTL-Config.json")
|
|
os.system("systemctl restart rtl")
|
|
|
|
#==================================
|
|
# Sphinx Relay Server Functions
|
|
#==================================
|
|
def stop_sphinxrelay():
|
|
os.system("systemctl stop sphinxrelay")
|
|
|
|
def restart_sphinxrelay_actual():
|
|
os.system("systemctl restart sphinxrelay")
|
|
|
|
def restart_sphinxrelay():
|
|
t = Timer(0.1, restart_sphinxrelay_actual)
|
|
t.start()
|
|
|
|
def delete_sphinxrelay_data():
|
|
os.system("rm -rf /mnt/hdd/mynode/sphinxrelay/sphinx.db")
|
|
os.system("rm -rf /opt/mynode/sphinxrelay/connection_string.txt")
|
|
|
|
def reset_sphinxrelay():
|
|
stop_sphinxrelay()
|
|
delete_sphinxrelay_data()
|
|
restart_sphinxrelay()
|
|
|
|
#==================================
|
|
# Mempool Functions
|
|
#==================================
|
|
def clear_mempool_cache():
|
|
os.system("rm -rf /mnt/hdd/mynode/mempool/data/*")
|
|
os.system("rm -rf /mnt/hdd/mynode/mempool/mysql/data/*")
|
|
os.system("sync")
|
|
os.system("systemctl restart mempool")
|
|
|
|
#==================================
|
|
# Specter Functions
|
|
#==================================
|
|
def reset_specter_config():
|
|
os.system("rm -rf /mnt/hdd/mynode/specter/config.json")
|
|
os.system("systemctl restart specter")
|
|
|
|
#==================================
|
|
# BTC RPC Explorer Functions
|
|
#==================================
|
|
def is_btcrpcexplorer_token_enabled():
|
|
if os.path.isfile("/mnt/hdd/mynode/settings/.btcrpcexplorer_disable_token"):
|
|
return False
|
|
return True
|
|
|
|
def enable_btcrpcexplorer_token():
|
|
delete_file("/mnt/hdd/mynode/settings/.btcrpcexplorer_disable_token")
|
|
if is_service_enabled("btcrpcexplorer"):
|
|
restart_service("btcrpcexplorer")
|
|
|
|
|
|
def disable_btcrpcexplorer_token():
|
|
touch("/mnt/hdd/mynode/settings/.btcrpcexplorer_disable_token")
|
|
if is_service_enabled("btcrpcexplorer"):
|
|
restart_service("btcrpcexplorer")
|
|
|
|
#==================================
|
|
# Tor Functions
|
|
#==================================
|
|
def reset_tor():
|
|
os.system("rm -rf /var/lib/tor/*")
|
|
os.system("rm -rf /mnt/hdd/mynode/tor_backup/*")
|
|
os.system("rm -rf /mnt/hdd/mynode/bitcoin/onion_private_key")
|
|
os.system("rm -rf /mnt/hdd/mynode/lnd/v2_onion_private_key")
|
|
os.system("rm -rf /mnt/hdd/mynode/lnd/v3_onion_private_key")
|
|
|
|
def is_btc_lnd_tor_enabled():
|
|
return settings_file_exists("btc_lnd_tor_enabled")
|
|
def enable_btc_lnd_tor():
|
|
create_settings_file("btc_lnd_tor_enabled")
|
|
def disable_btc_lnd_tor():
|
|
delete_settings_file("btc_lnd_tor_enabled")
|
|
|
|
def is_streamisolation_tor_enabled():
|
|
return not settings_file_exists("streamisolation_tor_disabled")
|
|
def enable_streamisolation_tor():
|
|
delete_settings_file("streamisolation_tor_disabled")
|
|
def disable_streamisolation_tor():
|
|
create_settings_file("streamisolation_tor_disabled")
|
|
|
|
def is_tor_repo_enabled():
|
|
return not settings_file_exists("tor_repo_disabled")
|
|
def enable_tor_repo():
|
|
delete_settings_file("tor_repo_disabled")
|
|
def disable_tor_repo():
|
|
create_settings_file("tor_repo_disabled")
|
|
|
|
def is_aptget_tor_enabled():
|
|
return os.path.isfile("/mnt/hdd/mynode/settings/torify_apt_get")
|
|
def enable_aptget_tor():
|
|
touch("/mnt/hdd/mynode/settings/torify_apt_get")
|
|
def disable_aptget_tor():
|
|
delete_file("/mnt/hdd/mynode/settings/torify_apt_get")
|
|
|
|
def get_onion_url_ssh():
|
|
if is_community_edition(): return "not_available"
|
|
try:
|
|
if os.path.isfile("/var/lib/tor/mynode_ssh/hostname"):
|
|
with open("/var/lib/tor/mynode_ssh/hostname") as f:
|
|
return f.read()
|
|
except:
|
|
pass
|
|
return "error"
|
|
|
|
def get_onion_url_general():
|
|
if is_community_edition(): return "not_available"
|
|
try:
|
|
if os.path.isfile("/var/lib/tor/mynode/hostname"):
|
|
with open("/var/lib/tor/mynode/hostname") as f:
|
|
return f.read().strip()
|
|
except:
|
|
pass
|
|
return "error"
|
|
|
|
def get_onion_url_btc():
|
|
if is_community_edition(): return "not_available"
|
|
try:
|
|
if os.path.isfile("/var/lib/tor/mynode_btc/hostname"):
|
|
with open("/var/lib/tor/mynode_btc/hostname") as f:
|
|
return f.read().strip()
|
|
except:
|
|
pass
|
|
return "error"
|
|
|
|
def get_onion_url_lnd():
|
|
if is_community_edition(): return "not_available"
|
|
try:
|
|
if os.path.isfile("/var/lib/tor/mynode_lnd/hostname"):
|
|
with open("/var/lib/tor/mynode_lnd/hostname") as f:
|
|
return f.read().strip()
|
|
except:
|
|
pass
|
|
return "error"
|
|
|
|
def get_onion_url_electrs():
|
|
if is_community_edition(): return "not_available"
|
|
try:
|
|
if os.path.isfile("/var/lib/tor/mynode_electrs/hostname"):
|
|
with open("/var/lib/tor/mynode_electrs/hostname") as f:
|
|
return f.read().strip()
|
|
except:
|
|
pass
|
|
return "error"
|
|
|
|
def get_onion_url_lndhub():
|
|
if is_community_edition(): return "not_available"
|
|
try:
|
|
if os.path.isfile("/var/lib/tor/mynode_lndhub/hostname"):
|
|
with open("/var/lib/tor/mynode_lndhub/hostname") as f:
|
|
return f.read().strip()
|
|
except:
|
|
pass
|
|
return "error"
|
|
|
|
def get_onion_url_lnbits():
|
|
if is_community_edition(): return "not_available"
|
|
try:
|
|
if os.path.isfile("/var/lib/tor/mynode_lnbits/hostname"):
|
|
with open("/var/lib/tor/mynode_lnbits/hostname") as f:
|
|
return f.read().strip()
|
|
except:
|
|
pass
|
|
return "error"
|
|
|
|
def get_onion_url_btcpay():
|
|
if is_community_edition(): return "not_available"
|
|
try:
|
|
if os.path.isfile("/var/lib/tor/mynode_btcpay/hostname"):
|
|
with open("/var/lib/tor/mynode_btcpay/hostname") as f:
|
|
return f.read().strip()
|
|
except:
|
|
pass
|
|
return "error"
|
|
|
|
def get_onion_url_sphinxrelay():
|
|
if is_community_edition(): return "not_available"
|
|
try:
|
|
if os.path.isfile("/var/lib/tor/mynode_sphinx/hostname"):
|
|
with open("/var/lib/tor/mynode_sphinx/hostname") as f:
|
|
return f.read().strip()
|
|
except:
|
|
pass
|
|
return "error"
|
|
|
|
def get_onion_url_for_service(short_name):
|
|
if is_community_edition(): return "not_available"
|
|
try:
|
|
if os.path.isfile("/var/lib/tor/mynode_{}/hostname".format(short_name)):
|
|
with open("/var/lib/tor/mynode_{}/hostname".format(short_name)) as f:
|
|
return f.read().strip()
|
|
except:
|
|
pass
|
|
return "error"
|
|
|
|
def get_onion_info_btc_v2():
|
|
info = {}
|
|
info["url"] = "unknown"
|
|
info["pass"] = "unknown"
|
|
try:
|
|
if os.path.isfile("/var/lib/tor/mynode_btc_v2/hostname"):
|
|
with open("/var/lib/tor/mynode_btc_v2/hostname") as f:
|
|
content = f.read().strip()
|
|
parts = content.split(" ")
|
|
info["url"] = parts[0]
|
|
info["pass"] = parts[1]
|
|
return info
|
|
except:
|
|
pass
|
|
return info
|
|
|
|
def get_tor_version():
|
|
global cached_data
|
|
if "tor_version" in cached_data:
|
|
return cached_data["tor_version"]
|
|
|
|
v = to_string( subprocess.check_output("tor --version | head -n 1 | egrep -o '[0-9\\.]+[0-9]'", shell=True) )
|
|
v = v.strip()
|
|
cached_data["tor_version"] = v
|
|
return cached_data["tor_version"]
|
|
|
|
|
|
#==================================
|
|
# Firewall Functions
|
|
#==================================
|
|
def reload_firewall():
|
|
os.system("ufw reload")
|
|
|
|
def get_firewall_rules():
|
|
try:
|
|
rules = to_string(subprocess.check_output("ufw status", shell=True).decode("utf8"))
|
|
except:
|
|
rules = "ERROR"
|
|
return rules
|
|
|
|
|
|
#==================================
|
|
# SSO Functions
|
|
#==================================
|
|
def get_sso_token(short_name):
|
|
if short_name == "btcrpcexplorer":
|
|
token = get_file_contents("/opt/mynode/btc-rpc-explorer/token")
|
|
elif short_name == "thunerhub":
|
|
token = get_file_contents("/opt/mynode/thunderhub/.cookie")
|
|
else:
|
|
token = "UNKNOWN_APP"
|
|
return to_string(token)
|
|
|
|
def get_sso_token_enabled(short_name):
|
|
enabled = False
|
|
if short_name == "btcrpcexplorer":
|
|
enabled = is_btcrpcexplorer_token_enabled()
|
|
return enabled
|
|
|
|
|
|
#==================================
|
|
# QR Code Functions
|
|
#==================================
|
|
def generate_qr_code(url):
|
|
try:
|
|
qr = qrcode.QRCode(version=1,
|
|
error_correction=qrcode.constants.ERROR_CORRECT_H,
|
|
box_size=5,
|
|
border=1)
|
|
|
|
qr.add_data(url)
|
|
qr.make(fit=True)
|
|
img = qr.make_image()
|
|
return img
|
|
except Exception as e:
|
|
log_message("generate_qr_code exception: {}".format(str(e)))
|
|
return None |