Rename host files to hostlist
This commit is contained in:
parent
6c268b3608
commit
857ea8b368
373
main.py
373
main.py
@ -1,129 +1,322 @@
|
||||
import paramiko
|
||||
from dataclasses import dataclass
|
||||
import argparse
|
||||
import re
|
||||
import json
|
||||
from dataclasses import dataclass, field
|
||||
import paramiko
|
||||
from paramiko.ssh_exception import AuthenticationException
|
||||
from paramiko import SSHClient
|
||||
from scp import SCPClient
|
||||
from os import path
|
||||
from os import get_terminal_size
|
||||
from colored import fg, bg, attr
|
||||
|
||||
# import colors
|
||||
"""
|
||||
Author: Oriol Filter
|
||||
Date 06/03/2021
|
||||
"""
|
||||
|
||||
version = '0.9'
|
||||
|
||||
|
||||
def sent_message(filename: str, sent: bool) -> None:
|
||||
print(f'\t[{filename}] ==> [{(fg(10), fg(9))[sent]}{("Sent", "ERROR")[sent]}{attr("reset")}]')
|
||||
|
||||
|
||||
class LOADING:
|
||||
pass
|
||||
|
||||
|
||||
def sending_file_progress(filename: bytes, size: int, sent: int) -> None:
|
||||
# float(sent) / float(size) * 100
|
||||
green_scale = [28, 34, 40, 76, 82, 118]
|
||||
loading_chars = "|/-\\"
|
||||
_filename = filename.decode()
|
||||
screen_size = get_terminal_size()
|
||||
cols = int(screen_size.columns / 100 * 50)
|
||||
|
||||
percent = int(float(sent) / float(size) * 100)
|
||||
_n = percent % len(loading_chars)
|
||||
|
||||
_color: int
|
||||
_color_picker = int((percent / 10) * (len(green_scale) / 10))
|
||||
|
||||
space_filling = " " * int(100 - percent / (100 * cols)) + " "
|
||||
load_bar = f'{"=" * int(percent / 100 * cols)}=>'
|
||||
|
||||
if int(percent) >= 100:
|
||||
_color = green_scale[len(green_scale) - 1]
|
||||
else:
|
||||
_color = green_scale[_color_picker]
|
||||
print(
|
||||
f'\t[{loading_chars[_n]}] {fg(_color)}{load_bar}{attr("reset")} [{percent}%] {space_filling} [{(_filename[:75] + "..") if len(_filename) > 75 else _filename}]',
|
||||
end='\r')
|
||||
|
||||
|
||||
def return_list(txt: str) -> [str]:
|
||||
_list = []
|
||||
if type(txt) in (tuple, list):
|
||||
_list = txt
|
||||
elif type(txt) in (str, int):
|
||||
_list = [str(txt)]
|
||||
return _list
|
||||
|
||||
|
||||
@dataclass
|
||||
class CONFIG:
|
||||
scp_dest="/tmp"
|
||||
files_folder = 'C:/Users/OriolFilterAnson/PycharmProjects/sshtest'
|
||||
files_to_copy = ['t1']
|
||||
files_to_send: str or [str]
|
||||
host_lists: str or [str]
|
||||
credentials_files: str or [str]
|
||||
|
||||
@dataclass
|
||||
class USER:
|
||||
name: str
|
||||
password: str
|
||||
|
||||
# def __init__(self, name, password):
|
||||
# self.name = name
|
||||
# self.password = password
|
||||
|
||||
|
||||
@dataclass
|
||||
class VALIDATIONS:
|
||||
loged_in: bool = False
|
||||
_files_to_send: str or [str] = field(default=None, init=False)
|
||||
_host_lists: str or [str] = field(default="./hostlist", init=False)
|
||||
_credentials_files: str or [str] = field(default="./users.json", init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
pass
|
||||
self.files_to_send = self._files_to_send or CONFIG._files_to_send
|
||||
self.host_lists = self._host_lists or CONFIG._host_lists
|
||||
self.credentials_files = self._credentials_files or CONFIG._credentials_files
|
||||
|
||||
@property
|
||||
def files_to_send(self) -> str or [str]:
|
||||
return self._files_to_send
|
||||
|
||||
@files_to_send.setter
|
||||
def files_to_send(self, files):
|
||||
self._files_to_send = return_list(files)
|
||||
|
||||
@property
|
||||
def host_lists(self) -> str or [str]:
|
||||
return self._host_lists
|
||||
|
||||
@host_lists.setter
|
||||
def host_lists(self, files):
|
||||
self._host_lists = return_list(files)
|
||||
|
||||
@property
|
||||
def credentials_files(self) -> str or [str]:
|
||||
return self._credentials_files
|
||||
|
||||
@credentials_files.setter
|
||||
def credentials_files(self, files):
|
||||
self._credentials_files = return_list(files)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ITEM:
|
||||
class HOST:
|
||||
hostname: str
|
||||
client: paramiko.SSHClient
|
||||
# login: USER
|
||||
validations: VALIDATIONS = None
|
||||
client: paramiko.SSHClient or bool = False
|
||||
files_sent: dict = False
|
||||
|
||||
# def __init__(self, hostname, login):
|
||||
# self.hostname = hostname
|
||||
# self.login = login
|
||||
def __post_init__(self):
|
||||
self.validations = VALIDATIONS()
|
||||
if not self.files_sent: self.files_sent = {}
|
||||
|
||||
@property
|
||||
def username(self) -> str or None:
|
||||
if hasattr(self, "client") and self.client:
|
||||
return self.client.get_transport().get_username()
|
||||
|
||||
|
||||
def attempt_to_login(hostname, user: USER) -> SSHClient or bool:
|
||||
def return_connection(hostname, user: str, password: str = "") -> SSHClient or bool:
|
||||
try:
|
||||
client = paramiko.SSHClient()
|
||||
client.load_system_host_keys()
|
||||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
client.connect(hostname, username=user.name, password=user.password)
|
||||
# ssh_stdin, ssh_stdout, ssh_stderr = client.exec_command("uptime")
|
||||
# print(ssh_stdout.read().decode())
|
||||
client.connect(hostname, username=user, password=password, timeout=2)
|
||||
return client
|
||||
except AuthenticationException as e:
|
||||
except (AuthenticationException, TimeoutError) as e:
|
||||
pass
|
||||
# ssh = paramiko.SSHClient()
|
||||
return False
|
||||
|
||||
|
||||
config=CONFIG()
|
||||
class MANAGER:
|
||||
host_list: [HOST]
|
||||
user_dict: {}
|
||||
config: CONFIG
|
||||
|
||||
bs2 = USER(name='bs2cloud', password='xxxx')
|
||||
ibmuser = USER(name='ibmuser', password='xxxx')
|
||||
home = USER(name='testing', password='testing')
|
||||
pi = USER(name='pifail', password='pifail')
|
||||
def __init__(self, config: CONFIG = None):
|
||||
self.update_config(config)
|
||||
|
||||
## found= user[]
|
||||
result_dic = {
|
||||
"found": {},
|
||||
"discards": [],
|
||||
}
|
||||
def update_config(self, config: CONFIG = None):
|
||||
self.host_list = []
|
||||
self.user_dict = {}
|
||||
if config and type(config) is CONFIG:
|
||||
self.config = config
|
||||
else:
|
||||
self.config = CONFIG(**{})
|
||||
self.load_hosts()
|
||||
self.load_users()
|
||||
|
||||
user_list: [USER] = [home, bs2, ibmuser, pi]
|
||||
host_list = ['192.168.1.3', '192.168.1.2']
|
||||
@property
|
||||
def _files_to_send(self) -> [str]:
|
||||
return self.config.files_to_send
|
||||
|
||||
@property
|
||||
def _hostlist_files(self) -> [str]:
|
||||
return self.config.host_lists
|
||||
|
||||
@property
|
||||
def _credentials_files(self) -> [str]:
|
||||
return self.config.credentials_files
|
||||
|
||||
def load_config(self):
|
||||
self.load_hosts()
|
||||
self.load_users()
|
||||
|
||||
def load_hosts(self):
|
||||
host_list: MANAGER.host_list = []
|
||||
for file in self._hostlist_files:
|
||||
with open(file or "hostlist") as f:
|
||||
lines = f.read().splitlines()
|
||||
split_chars = ";,-"
|
||||
regex_rule = "|".join(split_chars)
|
||||
for line in lines:
|
||||
result = re.split(regex_rule, line)
|
||||
if result:
|
||||
for entry in result:
|
||||
host_list.append(HOST(hostname=entry))
|
||||
self.host_list = host_list
|
||||
|
||||
def load_users(self):
|
||||
user_dict = {}
|
||||
for file in self._credentials_files:
|
||||
with open(file or "users.json") as f:
|
||||
data = json.load(f)
|
||||
for user in data:
|
||||
user_dict[user] = data[user]
|
||||
self.user_dict = user_dict
|
||||
|
||||
def print_connections(self):
|
||||
print("\nPrinting connections:")
|
||||
print("\tHOSTNAME\tUSER")
|
||||
for host in self.host_list:
|
||||
print(f"\t{host.hostname}\t{host.username or 'Ns/Nc'}")
|
||||
|
||||
def test_connections(self, _print=True):
|
||||
print("Attempting to login:")
|
||||
for host in self.host_list:
|
||||
host.client = False
|
||||
print(f"{host.hostname}:", end="\t")
|
||||
for user in self.user_dict:
|
||||
connection = return_connection(hostname=host.hostname, user=user, password=self.user_dict[user])
|
||||
if connection:
|
||||
host.client = connection
|
||||
print("YES")
|
||||
if not host.client: print("NO")
|
||||
if _print: self.print_connections()
|
||||
|
||||
@property
|
||||
def _accesible_hosts(self) -> [HOST]:
|
||||
list = [host for host in self.host_list if host.username]
|
||||
return list
|
||||
|
||||
@property
|
||||
def _not_accesible_hosts(self) -> [HOST]:
|
||||
list = [host for host in self.host_list if not host.username]
|
||||
return list
|
||||
|
||||
def _send_file(self, scp_session: SCPClient, file: str, remote_path: str = "/tmp") -> bool:
|
||||
try:
|
||||
scp_session.put(file, remote_path=remote_path)
|
||||
print()
|
||||
return True
|
||||
except Exception as e:
|
||||
print()
|
||||
print(e)
|
||||
return False
|
||||
|
||||
def send_files(self, load_hosts=False):
|
||||
if load_hosts:
|
||||
self.load_hosts()
|
||||
|
||||
accessible_hosts = self._accesible_hosts
|
||||
|
||||
if len(accessible_hosts) < 1 and load_hosts:
|
||||
self.send_files(load_hosts=True)
|
||||
elif len(accessible_hosts) < 1 and not load_hosts:
|
||||
print("No accessible hosts available")
|
||||
for host in accessible_hosts:
|
||||
print(f"{host.hostname}:")
|
||||
if not self._files_to_send or len(self._files_to_send) < 1:
|
||||
print("No files to send")
|
||||
else:
|
||||
for file in self._files_to_send:
|
||||
host.files_sent: dict
|
||||
if not host.files_sent.get(file):
|
||||
host.files_sent[file] = False
|
||||
scp_session = SCPClient(host.client.get_transport(), progress=sending_file_progress)
|
||||
x = self._send_file(scp_session=scp_session, file=file)
|
||||
print(x)
|
||||
print(
|
||||
f'\t{file} -------------> {["No", "Yes"][x]}')
|
||||
host.files_sent[file] = True
|
||||
|
||||
|
||||
|
||||
for user in user_list:
|
||||
result_dic['found'][user.name] = []
|
||||
|
||||
for host in host_list:
|
||||
for user in user_list:
|
||||
result = attempt_to_login(hostname=host, user=user)
|
||||
if result:
|
||||
session = ITEM(hostname=host, client=result)
|
||||
result_dic['found'][user.name].append(session)
|
||||
host_list.remove(host)
|
||||
session.validations.loged_in = True
|
||||
|
||||
print(result_dic['found'])
|
||||
|
||||
result_dic['discards'] = set(host_list)
|
||||
|
||||
print('discards')
|
||||
print(result_dic['discards'])
|
||||
|
||||
print('SCP')
|
||||
|
||||
for user in user_list:
|
||||
for session in result_dic['found'][user.name]:
|
||||
session: ITEM
|
||||
|
||||
scp = SCPClient(session.client.get_transport())
|
||||
for file in config.files_to_copy:
|
||||
print(f"\t{file}")
|
||||
scp.put(file, remote_path='/tmp')
|
||||
|
||||
|
||||
def test_connections():
|
||||
class MENU:
|
||||
pass
|
||||
|
||||
def menu():
|
||||
|
||||
def print_help():
|
||||
menu_dict = {
|
||||
"0": "Exits the script",
|
||||
"1": "Attempts to login using the loaded credentials",
|
||||
"2": "Attemps to send the files",
|
||||
"-1": "Prints help",
|
||||
"-2": "Reload files"
|
||||
}
|
||||
for key in menu_dict:
|
||||
print(f'{key}:\n\t{menu_dict[key]}')
|
||||
pass
|
||||
|
||||
|
||||
def menu(manager: MANAGER):
|
||||
while True:
|
||||
menu_dict = {
|
||||
"0": exit,
|
||||
"1": manager.test_connections,
|
||||
"2": manager.send_files,
|
||||
"-1": print_help,
|
||||
"-2": manager.update_config
|
||||
}
|
||||
msg = "\nSelect 1 of the following options:\n\
|
||||
1) Test connections\n\
|
||||
2) Send files\n\
|
||||
-1) Help\n\
|
||||
-2) Reload files\n\
|
||||
0) Exit"
|
||||
print(msg)
|
||||
_input = input() or ""
|
||||
if _input in menu_dict:
|
||||
option = menu_dict[_input]
|
||||
option()
|
||||
else:
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
pass
|
||||
# print(json.dumps(result_dic, indent=2, sort_keys=True))
|
||||
# Get ARGV
|
||||
parser = argparse.ArgumentParser(
|
||||
description=f'Python script for testing SSH connections and sending files (does not validate given routes) v{version}')
|
||||
parser.add_argument('-v', '--version', action='store_true', help='Prints the version')
|
||||
parser.add_argument('-hl', '--hosts_lists', action='append',
|
||||
help='File that contains a list of hosts (multiple usage allowed)')
|
||||
parser.add_argument('-f', '--file', action='append',
|
||||
help='File to send through SSH (multiple usage allowed)')
|
||||
parser.add_argument('-c', '--credentials_file', action='append',
|
||||
help='Json file that contains a list of users to use and their password ie: ' \
|
||||
'{"u1":"p1","u2":"p2"}')
|
||||
parser.add_argument('-thi', '--threading_instances', action='store', help='Number of threading instances (NOT ' \
|
||||
'ENABLED)')
|
||||
args = parser.parse_args()
|
||||
if args.version:
|
||||
print(f'Connection tester: v{version}')
|
||||
else:
|
||||
args_dic = {"files_to_send": args.file, "host_list": args.hosts_lists,
|
||||
"credentials_files": args.credentials_file}
|
||||
conf = CONFIG(**args_dic)
|
||||
manager = MANAGER(config=conf)
|
||||
|
||||
# server = "192.168.1.3"
|
||||
# username = "fadm"
|
||||
# password = ""
|
||||
# ssh = paramiko.SSHClient()
|
||||
# ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
# ssh.connect(server, username=username, password=password)
|
||||
# ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command("uptime")
|
||||
# print(ssh_stdout.read().decode())
|
||||
menu(manager=manager)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
331
main_factory.py
331
main_factory.py
@ -1,331 +0,0 @@
|
||||
import argparse
|
||||
import re
|
||||
import json
|
||||
from dataclasses import dataclass, field
|
||||
import paramiko
|
||||
from paramiko.ssh_exception import AuthenticationException
|
||||
from paramiko import SSHClient
|
||||
from scp import SCPClient
|
||||
from os import get_terminal_size
|
||||
from colored import fg, bg, attr
|
||||
|
||||
# import colors
|
||||
"""
|
||||
Author: Oriol Filter
|
||||
Date 01/03/2021
|
||||
"""
|
||||
|
||||
version = '0.8.9.3'
|
||||
|
||||
|
||||
def sent_message(filename: str, sent: bool) -> None:
|
||||
print(f'\t[{filename}] ==> [{(fg(10), fg(9))[sent]}{("Sent", "ERROR")[sent]}{attr("reset")}]')
|
||||
|
||||
|
||||
# def sending_file_progress(filename, size, sent):
|
||||
# print("%s\'s progress: %.2f%% \r" % (filename, float(sent) / float(size) * 100))
|
||||
|
||||
class LOADING:
|
||||
pass
|
||||
|
||||
def sending_file_progress(filename: bytes, size: int, sent: int) -> None:
|
||||
# float(sent) / float(size) * 100
|
||||
green_scale = [28, 34, 40, 76, 82, 118]
|
||||
loading_chars = "|/-\\"
|
||||
_filename = filename.decode()
|
||||
screen_size = get_terminal_size()
|
||||
cols = int(screen_size.columns / 100 * 50)
|
||||
|
||||
percent = int(float(sent) / float(size) * 100)
|
||||
_n = percent % len(loading_chars)
|
||||
|
||||
_color: int
|
||||
_color_picker = int((percent / 10) * (len(green_scale) / 10))
|
||||
|
||||
|
||||
space_filling = " " * int(100 - percent / 100 * cols) + " "
|
||||
load_bar = f'{"=" * int(percent / 100 * cols)}=>'
|
||||
|
||||
if int(percent) == 100:
|
||||
_color = green_scale[len(green_scale)-1]
|
||||
else:
|
||||
_color = green_scale[_color_picker]
|
||||
# print(int(percent / 100 * cols),end="\r")
|
||||
# print("+"*int(percent / 100 * cols),end="")
|
||||
# print(" ", end="")
|
||||
# print("+" * int(100 - percent / 100 * cols), end="\r")
|
||||
print(f'\t[{loading_chars[_n]}] {fg(_color)}{load_bar}{attr("reset")} [{percent}%] {space_filling} [{(_filename[:75] + "..") if len(_filename) > 75 else _filename}]',
|
||||
end='\r')
|
||||
#
|
||||
|
||||
def return_list(txt: str) -> [str]:
|
||||
_list = []
|
||||
if type(txt) in (tuple, list):
|
||||
_list = txt
|
||||
elif type(txt) in (str, int):
|
||||
_list = [str(txt)]
|
||||
return _list
|
||||
|
||||
|
||||
@dataclass
|
||||
class CONFIG:
|
||||
files_to_send: str or [str]
|
||||
host_files: str or [str]
|
||||
credentials_files: str or [str]
|
||||
|
||||
_files_to_send: str or [str] = field(default=None, init=False)
|
||||
_host_files: str or [str] = field(default="./hostlist", init=False)
|
||||
_credentials_files: str or [str] = field(default="./users.json", init=False)
|
||||
|
||||
def __post_init__(self):
|
||||
self.files_to_send = self._files_to_send or CONFIG._files_to_send
|
||||
self.host_files = self._host_files or CONFIG._host_files
|
||||
self.credentials_files = self._credentials_files or CONFIG._credentials_files
|
||||
|
||||
@property
|
||||
def files_to_send(self) -> str or [str]:
|
||||
return self._files_to_send
|
||||
|
||||
@files_to_send.setter
|
||||
def files_to_send(self, files):
|
||||
self._files_to_send = return_list(files)
|
||||
|
||||
@property
|
||||
def host_files(self) -> str or [str]:
|
||||
return self._host_files
|
||||
|
||||
@host_files.setter
|
||||
def host_files(self, files):
|
||||
self._host_files = return_list(files)
|
||||
|
||||
@property
|
||||
def credentials_files(self) -> str or [str]:
|
||||
return self._credentials_files
|
||||
|
||||
@credentials_files.setter
|
||||
def credentials_files(self, files):
|
||||
self._credentials_files = return_list(files)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HOST:
|
||||
hostname: str
|
||||
client: paramiko.SSHClient or bool = False
|
||||
files_sent: dict = False
|
||||
|
||||
def __post_init__(self):
|
||||
if not self.files_sent: self.files_sent = {}
|
||||
|
||||
@property
|
||||
def username(self) -> str or None:
|
||||
if hasattr(self, "client") and self.client:
|
||||
return self.client.get_transport().get_username()
|
||||
|
||||
|
||||
def return_connection(hostname, user: str, password: str = "") -> SSHClient or bool:
|
||||
try:
|
||||
client = paramiko.SSHClient()
|
||||
client.load_system_host_keys()
|
||||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
client.connect(hostname, username=user, password=password, timeout=2)
|
||||
return client
|
||||
except (AuthenticationException, TimeoutError) as e:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
class MANAGER:
|
||||
host_list: [HOST]
|
||||
user_dict: {}
|
||||
config: CONFIG
|
||||
|
||||
def __init__(self, config: CONFIG = None):
|
||||
self.update_config(config)
|
||||
|
||||
def update_config(self, config: CONFIG = None):
|
||||
self.host_list = []
|
||||
self.user_dict = {}
|
||||
if config and type(config) is CONFIG:
|
||||
self.config = config
|
||||
else:
|
||||
self.config = CONFIG(**{})
|
||||
self.load_hosts()
|
||||
self.load_users()
|
||||
|
||||
@property
|
||||
def _files_to_send(self) -> [str]:
|
||||
return self.config.files_to_send
|
||||
|
||||
@property
|
||||
def _hostlist_files(self) -> [str]:
|
||||
return self.config.host_files
|
||||
|
||||
@property
|
||||
def _credentials_files(self) -> [str]:
|
||||
return self.config.credentials_files
|
||||
|
||||
def load_config(self):
|
||||
self.load_hosts()
|
||||
self.load_users()
|
||||
|
||||
def load_hosts(self):
|
||||
host_list: MANAGER.host_list = []
|
||||
for file in self._hostlist_files:
|
||||
with open(file or "hostlist") as f:
|
||||
lines = f.read().splitlines()
|
||||
split_chars = ";,-"
|
||||
regex_rule = "|".join(split_chars)
|
||||
for line in lines:
|
||||
result = re.split(regex_rule, line)
|
||||
if result:
|
||||
for entry in result:
|
||||
host_list.append(HOST(hostname=entry))
|
||||
self.host_list = host_list
|
||||
|
||||
def load_users(self):
|
||||
user_dict = {}
|
||||
for file in self._credentials_files:
|
||||
with open(file or "users.json") as f:
|
||||
data = json.load(f)
|
||||
for user in data:
|
||||
user_dict[user] = data[user]
|
||||
self.user_dict = user_dict
|
||||
|
||||
def print_connections(self):
|
||||
print("\nPrinting connections:")
|
||||
print("\tHOSTNAME\tUSER")
|
||||
for host in self.host_list:
|
||||
print(f"\t{host.hostname}\t{host.username or 'Ns/Nc'}")
|
||||
|
||||
def test_connections(self, _print=True):
|
||||
print("Attempting to login:")
|
||||
for host in self.host_list:
|
||||
host.client = False
|
||||
print(f"{host.hostname}:", end="\t")
|
||||
for user in self.user_dict:
|
||||
connection = return_connection(hostname=host.hostname, user=user, password=self.user_dict[user])
|
||||
if connection:
|
||||
host.client = connection
|
||||
print("YES")
|
||||
if not host.client: print("NO")
|
||||
if _print: self.print_connections()
|
||||
|
||||
@property
|
||||
def _accesible_hosts(self) -> [HOST]:
|
||||
list = [host for host in self.host_list if host.username]
|
||||
return list
|
||||
|
||||
@property
|
||||
def _not_accesible_hosts(self) -> [HOST]:
|
||||
list = [host for host in self.host_list if not host.username]
|
||||
return list
|
||||
|
||||
def _send_file(self, scp_session: SCPClient, file: str, remote_path: str = "/tmp") -> bool:
|
||||
try:
|
||||
scp_session.put(file, remote_path=remote_path)
|
||||
print()
|
||||
return True
|
||||
except Exception as e:
|
||||
print()
|
||||
print(e)
|
||||
return False
|
||||
|
||||
def send_files(self, load_hosts=False):
|
||||
if load_hosts: self.load_hosts()
|
||||
|
||||
accessible_hosts = self._accesible_hosts
|
||||
|
||||
if len(accessible_hosts) < 1 and load_hosts:
|
||||
self.send_files(load_hosts=True)
|
||||
elif len(accessible_hosts) < 1 and not load_hosts:
|
||||
print("No accessible hosts available")
|
||||
for host in accessible_hosts:
|
||||
print(f"{host.hostname}:")
|
||||
if not self._files_to_send or len(self._files_to_send) < 1:
|
||||
print("No files to send")
|
||||
else:
|
||||
for file in self._files_to_send:
|
||||
host.files_sent: dict
|
||||
if not host.files_sent.get(file):
|
||||
host.files_sent[file] = False
|
||||
scp_session = SCPClient(host.client.get_transport(), progress=sending_file_progress)
|
||||
x=self._send_file(scp_session=scp_session, file=file)
|
||||
print(x)
|
||||
print(
|
||||
f'\t{file} -------------> {["No", "Yes"][x]}')
|
||||
host.files_sent[file] = True
|
||||
|
||||
|
||||
class MENU:
|
||||
pass
|
||||
|
||||
|
||||
def print_help():
|
||||
menu_dict = {
|
||||
0: "Exits the script",
|
||||
1: "Attempts to login using the loaded credentials",
|
||||
2: "Attemps to send the files",
|
||||
-1: "Prints help",
|
||||
-2: "Reload files"
|
||||
}
|
||||
for key in menu_dict:
|
||||
print(f'{key}:\n\t{menu_dict[key]}')
|
||||
pass
|
||||
|
||||
|
||||
def menu(manager: MANAGER):
|
||||
while True:
|
||||
menu_dict = {
|
||||
0: exit,
|
||||
1: manager.test_connections,
|
||||
2: manager.send_files,
|
||||
-1: print_help,
|
||||
-2: manager.update_config
|
||||
}
|
||||
msg = "\nSelect 1 of the following options:\n\
|
||||
1) Test connections\n\
|
||||
2) Send files\n\
|
||||
-1) Help\n\
|
||||
-2) Reload files\n\
|
||||
0) Exit"
|
||||
print(msg)
|
||||
_input = int(input() or 999999)
|
||||
if _input in menu_dict:
|
||||
option = menu_dict[_input]
|
||||
option()
|
||||
else:
|
||||
pass
|
||||
|
||||
|
||||
def main():
|
||||
# Get ARGV
|
||||
parser = argparse.ArgumentParser(
|
||||
description=f'Python script for testing SSH connections and sending files (does not validate given routes) v{version}')
|
||||
parser.add_argument('-v', '--version', action='store_true', help='Prints the version')
|
||||
parser.add_argument('-hf', '--hosts_file', action='append',
|
||||
help='File that contains a list of hosts (multiple usage allowed)')
|
||||
parser.add_argument('-f', '--file', action='append',
|
||||
help='File to send through SSH (multiple usage allowed)')
|
||||
parser.add_argument('-c', '--credentials_file', action='append',
|
||||
help='Json file that contains a list of users to use and their password ie: ' \
|
||||
'{"u1":"p1","u2":"p2"}')
|
||||
parser.add_argument('-thi', '--threading_instances', action='store', help='Number of threading instances (NOT ' \
|
||||
'ENABLED)')
|
||||
args = parser.parse_args()
|
||||
if args.version:
|
||||
print(f'Connection tester: v{version}')
|
||||
else:
|
||||
args_dic = {"files_to_send": args.file, "host_files": args.hosts_file,
|
||||
"credentials_files": args.credentials_file}
|
||||
conf = CONFIG(**args_dic)
|
||||
manager = MANAGER(config=conf)
|
||||
# manager.test_connections()
|
||||
|
||||
menu(manager=manager)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# screen_size = get_terminal_size()
|
||||
# print("|"*screen_size.columns)
|
||||
main()
|
||||
# sending_file_progress(filename=b"HA", size=10, sent=10)
|
BIN
requirements.txt
BIN
requirements.txt
Binary file not shown.
6
t4.py
6
t4.py
@ -18,17 +18,13 @@ def sending_file_progress(filename: bytes, size: int, sent: int) -> None:
|
||||
_color: int
|
||||
_color_picker = int((percent / 10) * (len(green_scale) / 10))
|
||||
|
||||
space_filling = " " * int(100 - percent / 100 * cols) + " "
|
||||
space_filling = " " * int(100 - percent / (100 * cols)) + " "
|
||||
load_bar = f'{"=" * int(percent / 100 * cols)}=>'
|
||||
|
||||
if int(percent) >= 100:
|
||||
_color = green_scale[len(green_scale) - 1]
|
||||
else:
|
||||
_color = green_scale[_color_picker]
|
||||
# print(int(percent / 100 * cols),end="\r")
|
||||
# print("+"*int(percent / 100 * cols),end="")
|
||||
# print(" ", end="")
|
||||
# print("+" * int(100 - percent / 100 * cols), end="\r")
|
||||
print(
|
||||
f'\t[{loading_chars[_n]}] {fg(_color)}{load_bar}{attr("reset")} [{percent}%] {space_filling} [{(_filename[:75] + "..") if len(_filename) > 75 else _filename}]',
|
||||
end='\r')
|
||||
|
Loading…
x
Reference in New Issue
Block a user