mynode/rootfs/standard/var/pynode/utilities.py

314 lines
8.7 KiB
Python
Raw Normal View History

from flask import send_from_directory
import os
2022-03-07 04:02:30 +00:00
import time
import json
import subprocess
2021-11-07 04:40:57 +00:00
import sys
import codecs
import urllib
import requests
2021-11-07 04:40:57 +00:00
2021-12-07 05:26:26 +00:00
mynode_logger = None
2021-11-07 04:40:57 +00:00
#==================================
# Python Info
#==================================
def isPython3():
if (sys.version_info > (3, 0)):
return True
return False
def to_bytes(s):
if type(s) is bytes:
return s
elif type(s) is str or (not isPython3() and type(s) is unicode):
2021-11-07 04:40:57 +00:00
return codecs.encode(s, 'utf-8', 'ignore')
else:
raise TypeError("to_bytes: Expected bytes or string, but got %s." % type(s))
def to_string(s):
b = to_bytes(s)
r = b.decode("utf-8")
return r
2021-11-07 04:40:57 +00:00
def quote_plus(s):
if isPython3():
2021-11-07 04:40:57 +00:00
return urllib.parse.quote_plus(s)
else:
return urllib.quote_plus(s)
def unquote_plus(s):
if isPython3():
2021-11-07 04:40:57 +00:00
return urllib.parse.unquote_plus(s)
else:
return urllib.unquote_plus(s)
#==================================
# Utilities
#==================================
def touch(file_path):
# In rare cases, touch seems to fail, so try both python and system call
# Touch via python
if os.path.exists(file_path):
os.utime(file_path, None)
else:
open(file_path, 'a').close()
# Touch via system
os.system("touch {}".format(file_path))
# Sync
os.system("sync")
def delete_file(file_path):
try:
if os.path.exists(file_path):
os.remove(file_path)
os.system("rm -f {}".format(file_path))
except Exception as e:
log_message("FAILED TO DELETE {}".format(file_path))
def get_file_contents(filename):
contents = "UNKNOWN"
try:
with open(filename, "r") as f:
contents = to_string(f.read()).strip()
except:
contents = "ERROR"
2021-11-07 04:40:57 +00:00
return to_bytes(contents)
def set_file_contents(filename, data):
2021-06-23 04:57:32 +00:00
data = data.replace('\r\n','\n')
try:
with open(filename, "w") as f:
f.write(data)
os.system("sync")
return True
except:
return False
return False
2022-03-07 04:02:30 +00:00
#==================================
# Cache Functions
#==================================
utilities_cached_data = {}
def is_cached(key, refresh_time=3600): # refresh=1hr
global utilities_cached_data
cache_time_key = key + "_cache_time"
now_time = int(time.time())
if key in utilities_cached_data and cache_time_key in utilities_cached_data:
if utilities_cached_data[cache_time_key] + refresh_time < now_time:
return False
else:
return True
else:
return False
def get_cached_data(key):
global utilities_cached_data
if key in utilities_cached_data:
return utilities_cached_data[key]
return None
def update_cached_data(key, value):
global utilities_cached_data
cache_time_key = key + "_cache_time"
now_time = int(time.time())
utilities_cached_data[key] = value
utilities_cached_data[cache_time_key] = now_time
def set_dictionary_file_cache(data, file_path):
try:
with open(file_path, 'w') as file:
json.dump(data, file)
return True
except Exception as e:
log_message("ERROR set_dictionary_file_cache ({}):{} ".format(file_path, str(e)))
log_message(str(data))
return False
def get_dictionary_file_cache(file_path):
try:
with open(file_path) as file:
data = json.load(file)
return data
except Exception as e:
log_message("ERROR get_dictionary_file_cache ({}): {}".format(file_path, str(e)))
return None
#==================================
# Settings File Functions
#==================================
def create_settings_file(name):
from drive_info import is_mynode_drive_mounted
folder_1="/home/bitcoin/.mynode/"
folder_2="/mnt/hdd/mynode/settings/"
path_1="{}{}".format(folder_1, name)
path_2="{}{}".format(folder_2, name)
touch(path_1)
if is_mynode_drive_mounted():
touch(path_2)
def delete_settings_file(name):
folder_1="/home/bitcoin/.mynode/"
folder_2="/mnt/hdd/mynode/settings/"
path_1="{}{}".format(folder_1, name)
path_2="{}{}".format(folder_2, name)
path_3="{}.{}".format(folder_1, name)
path_4="{}.{}".format(folder_2, name)
2022-03-07 04:02:30 +00:00
delete_file(path_1)
delete_file(path_2)
delete_file(path_3)
delete_file(path_4)
2022-03-07 04:02:30 +00:00
def settings_file_exists(name):
from drive_info import is_mynode_drive_mounted
folder_1="/home/bitcoin/.mynode/"
folder_2="/mnt/hdd/mynode/settings/"
path_1="{}{}".format(folder_1, name)
path_2="{}{}".format(folder_2, name)
path_3="{}.{}".format(folder_1, name)
path_4="{}.{}".format(folder_2, name)
# Migrate hidden files to non-hidden
if os.path.isfile(path_3) or os.path.isfile(path_4):
# Make sure backup file is in place
touch(path_1)
if is_mynode_drive_mounted():
touch(path_2)
delete_file(path_3)
delete_file(path_4)
2022-03-07 04:02:30 +00:00
if os.path.isfile(path_1) and os.path.isfile(path_2):
return True
elif os.path.isfile(path_1) or os.path.isfile(path_2):
# Make sure backup file is in place
touch(path_1)
if is_mynode_drive_mounted():
touch(path_2)
return True
return False
2021-12-07 05:26:26 +00:00
#==================================
# Logging Functions
#==================================
def log_message(msg):
# Logs to www log
global mynode_logger
if mynode_logger != None:
print(msg)
2021-12-07 05:26:26 +00:00
mynode_logger.info(msg)
def set_logger(logger):
2021-12-07 05:26:26 +00:00
global mynode_logger
mynode_logger = logger
2021-12-07 05:26:26 +00:00
def get_logger():
global mynode_logger
return mynode_logger
#==================================
# Log functions (non-systemd based)
#==================================
def get_file_log(file_path):
status_log = ""
if not os.path.isfile(file_path):
return "MISSING FILE"
try:
2022-01-30 19:58:00 +00:00
status_log = to_string(subprocess.check_output(["tail","-n","250",file_path]).decode("utf8"))
lines = status_log.split('\n')
lines.reverse()
status_log = '\n'.join(lines)
except Exception as e:
status_log = "ERROR ({})".format(str(e))
return status_log
#==================================
# Data Storage Functions
#==================================
def set_data(key, value):
r = redis.Redis(host='localhost', port=6379, db=0)
mynode_key = "mynode_" + key
return r.set(mynode_key, value)
def get_data(key):
r = redis.Redis(host='localhost', port=6379, db=0)
mynode_key = "mynode_" + key
return r.get(mynode_key)
#==================================
# UI Format Functions
#==================================
def format_sat_amount(amount):
try:
r = "{:,}".format(int(amount))
except:
r = amount
return r
#==================================
# Flask Functions
#==================================
def download_file(directory, filename, downloaded_file_name=None, as_attachment=True):
if isPython3():
return send_from_directory(directory=directory, path=filename, filename=None, as_attachment=as_attachment)
else:
return send_from_directory(directory=directory, filename=filename, as_attachment=as_attachment)
#==================================
# Hashing Functions
#==================================
def get_md5_file_hash(path):
import hashlib
if not os.path.isfile(path):
return "MISSING_FILE"
try:
return hashlib.md5(open(path,'rb').read()).hexdigest()
except Exception as e:
return "ERROR ({})".format(e)
#==================================
# Network Functions
#==================================
def make_tor_request(url, data, file_data=None, max_retries=5, fallback_to_ip=True, fail_delay=5):
# Return data
r = None
# Setup tor proxy
session = requests.session()
session.proxies = {}
session.proxies['http'] = 'socks5h://localhost:9050'
session.proxies['https'] = 'socks5h://localhost:9050'
# Check In
for fail_count in range(max_retries):
try:
# Use tor for check in unless there have been tor 5 failures in a row
r = None
if fallback_to_ip and fail_count >= (max_retries - 1):
r = requests.post(url, data=data, files=file_data, timeout=20)
else:
r = session.post(url, data=data, files=file_data, timeout=20)
if r.status_code == 200:
return r
else:
log_message("Connection to {} failed. Retrying... Code {}".format(url, r.status_code))
except Exception as e:
log_message("Connection to {} failed. Retrying... Exception {}".format(url, e))
# Check in failed, try again
time.sleep(fail_delay)
return r