first commit

This commit is contained in:
Oriol Filter Anson 2022-03-05 23:42:29 +01:00
commit 5140df5df6
10 changed files with 576 additions and 0 deletions

28
README.md Normal file
View File

@ -0,0 +1,28 @@
# Hi
## Deployment
### Create venv to install requiered packages
```bash
$ python3 -m venv /path/to/new/virtual/environment
```
### Use the venv created
```bash
$ source venv/bin/activate
pip install -r requirements.txt
```
USAGE
By default uses the files ./hostlist and ./users.json
>
Json example
```json
{
"user1":"passwd",
"user2":"passwd"
}
```

0
config.json Normal file
View File

1
hostlist Normal file
View File

@ -0,0 +1 @@
srv.filter.home

0
hostlist2 Normal file
View File

129
main.py Normal file
View File

@ -0,0 +1,129 @@
import paramiko
from dataclasses import dataclass
import json
from paramiko.ssh_exception import AuthenticationException
from paramiko import SSHClient
from scp import SCPClient
from os import path
class CONFIG:
scp_dest="/tmp"
files_folder = 'C:/Users/OriolFilterAnson/PycharmProjects/sshtest'
files_to_copy = ['t1']
@dataclass
class USER:
name: str
password: str
# def __init__(self, name, password):
# self.name = name
# self.password = password
@dataclass
class VALIDATIONS:
loged_in: bool = False
def __post_init__(self):
pass
@dataclass
class ITEM:
hostname: str
client: paramiko.SSHClient
# login: USER
validations: VALIDATIONS = None
# def __init__(self, hostname, login):
# self.hostname = hostname
# self.login = login
def __post_init__(self):
self.validations = VALIDATIONS()
def attempt_to_login(hostname, user: USER) -> SSHClient or bool:
try:
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, username=user.name, password=user.password)
# ssh_stdin, ssh_stdout, ssh_stderr = client.exec_command("uptime")
# print(ssh_stdout.read().decode())
return client
except AuthenticationException as e:
pass
# ssh = paramiko.SSHClient()
return False
config=CONFIG()
bs2 = USER(name='bs2cloud', password='xxxx')
ibmuser = USER(name='ibmuser', password='xxxx')
home = USER(name='testing', password='testing')
pi = USER(name='pifail', password='pifail')
## found= user[]
result_dic = {
"found": {},
"discards": [],
}
user_list: [USER] = [home, bs2, ibmuser, pi]
host_list = ['192.168.1.3', '192.168.1.2']
for user in user_list:
result_dic['found'][user.name] = []
for host in host_list:
for user in user_list:
result = attempt_to_login(hostname=host, user=user)
if result:
session = ITEM(hostname=host, client=result)
result_dic['found'][user.name].append(session)
host_list.remove(host)
session.validations.loged_in = True
print(result_dic['found'])
result_dic['discards'] = set(host_list)
print('discards')
print(result_dic['discards'])
print('SCP')
for user in user_list:
for session in result_dic['found'][user.name]:
session: ITEM
scp = SCPClient(session.client.get_transport())
for file in config.files_to_copy:
print(f"\t{file}")
scp.put(file, remote_path='/tmp')
def test_connections():
pass
def menu():
pass
def main():
pass
# print(json.dumps(result_dic, indent=2, sort_keys=True))
# server = "192.168.1.3"
# username = "fadm"
# password = ""
# ssh = paramiko.SSHClient()
# ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# ssh.connect(server, username=username, password=password)
# ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command("uptime")
# print(ssh_stdout.read().decode())

331
main_factory.py Normal file
View File

@ -0,0 +1,331 @@
import argparse
import re
import json
from dataclasses import dataclass, field
import paramiko
from paramiko.ssh_exception import AuthenticationException
from paramiko import SSHClient
from scp import SCPClient
from os import get_terminal_size
from colored import fg, bg, attr
# import colors
"""
Author: Oriol Filter
Date 01/03/2021
"""
version = '0.8.9.3'
def sent_message(filename: str, sent: bool) -> None:
print(f'\t[{filename}] ==> [{(fg(10), fg(9))[sent]}{("Sent", "ERROR")[sent]}{attr("reset")}]')
# def sending_file_progress(filename, size, sent):
# print("%s\'s progress: %.2f%% \r" % (filename, float(sent) / float(size) * 100))
class LOADING:
pass
def sending_file_progress(filename: bytes, size: int, sent: int) -> None:
# float(sent) / float(size) * 100
green_scale = [28, 34, 40, 76, 82, 118]
loading_chars = "|/-\\"
_filename = filename.decode()
screen_size = get_terminal_size()
cols = int(screen_size.columns / 100 * 50)
percent = int(float(sent) / float(size) * 100)
_n = percent % len(loading_chars)
_color: int
_color_picker = int((percent / 10) * (len(green_scale) / 10))
space_filling = " " * int(100 - percent / 100 * cols) + " "
load_bar = f'{"=" * int(percent / 100 * cols)}=>'
if int(percent) == 100:
_color = green_scale[len(green_scale)-1]
else:
_color = green_scale[_color_picker]
# print(int(percent / 100 * cols),end="\r")
# print("+"*int(percent / 100 * cols),end="")
# print(" ", end="")
# print("+" * int(100 - percent / 100 * cols), end="\r")
print(f'\t[{loading_chars[_n]}] {fg(_color)}{load_bar}{attr("reset")} [{percent}%] {space_filling} [{(_filename[:75] + "..") if len(_filename) > 75 else _filename}]',
end='\r')
#
def return_list(txt: str) -> [str]:
_list = []
if type(txt) in (tuple, list):
_list = txt
elif type(txt) in (str, int):
_list = [str(txt)]
return _list
@dataclass
class CONFIG:
files_to_send: str or [str]
host_files: str or [str]
credentials_files: str or [str]
_files_to_send: str or [str] = field(default=None, init=False)
_host_files: str or [str] = field(default="./hostlist", init=False)
_credentials_files: str or [str] = field(default="./users.json", init=False)
def __post_init__(self):
self.files_to_send = self._files_to_send or CONFIG._files_to_send
self.host_files = self._host_files or CONFIG._host_files
self.credentials_files = self._credentials_files or CONFIG._credentials_files
@property
def files_to_send(self) -> str or [str]:
return self._files_to_send
@files_to_send.setter
def files_to_send(self, files):
self._files_to_send = return_list(files)
@property
def host_files(self) -> str or [str]:
return self._host_files
@host_files.setter
def host_files(self, files):
self._host_files = return_list(files)
@property
def credentials_files(self) -> str or [str]:
return self._credentials_files
@credentials_files.setter
def credentials_files(self, files):
self._credentials_files = return_list(files)
@dataclass
class HOST:
hostname: str
client: paramiko.SSHClient or bool = False
files_sent: dict = False
def __post_init__(self):
if not self.files_sent: self.files_sent = {}
@property
def username(self) -> str or None:
if hasattr(self, "client") and self.client:
return self.client.get_transport().get_username()
def return_connection(hostname, user: str, password: str = "") -> SSHClient or bool:
try:
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname, username=user, password=password, timeout=2)
return client
except (AuthenticationException, TimeoutError) as e:
pass
return False
class MANAGER:
host_list: [HOST]
user_dict: {}
config: CONFIG
def __init__(self, config: CONFIG = None):
self.update_config(config)
def update_config(self, config: CONFIG = None):
self.host_list = []
self.user_dict = {}
if config and type(config) is CONFIG:
self.config = config
else:
self.config = CONFIG(**{})
self.load_hosts()
self.load_users()
@property
def _files_to_send(self) -> [str]:
return self.config.files_to_send
@property
def _hostlist_files(self) -> [str]:
return self.config.host_files
@property
def _credentials_files(self) -> [str]:
return self.config.credentials_files
def load_config(self):
self.load_hosts()
self.load_users()
def load_hosts(self):
host_list: MANAGER.host_list = []
for file in self._hostlist_files:
with open(file or "hostlist") as f:
lines = f.read().splitlines()
split_chars = ";,-"
regex_rule = "|".join(split_chars)
for line in lines:
result = re.split(regex_rule, line)
if result:
for entry in result:
host_list.append(HOST(hostname=entry))
self.host_list = host_list
def load_users(self):
user_dict = {}
for file in self._credentials_files:
with open(file or "users.json") as f:
data = json.load(f)
for user in data:
user_dict[user] = data[user]
self.user_dict = user_dict
def print_connections(self):
print("\nPrinting connections:")
print("\tHOSTNAME\tUSER")
for host in self.host_list:
print(f"\t{host.hostname}\t{host.username or 'Ns/Nc'}")
def test_connections(self, _print=True):
print("Attempting to login:")
for host in self.host_list:
host.client = False
print(f"{host.hostname}:", end="\t")
for user in self.user_dict:
connection = return_connection(hostname=host.hostname, user=user, password=self.user_dict[user])
if connection:
host.client = connection
print("YES")
if not host.client: print("NO")
if _print: self.print_connections()
@property
def _accesible_hosts(self) -> [HOST]:
list = [host for host in self.host_list if host.username]
return list
@property
def _not_accesible_hosts(self) -> [HOST]:
list = [host for host in self.host_list if not host.username]
return list
def _send_file(self, scp_session: SCPClient, file: str, remote_path: str = "/tmp") -> bool:
try:
scp_session.put(file, remote_path=remote_path)
print()
return True
except Exception as e:
print()
print(e)
return False
def send_files(self, load_hosts=False):
if load_hosts: self.load_hosts()
accessible_hosts = self._accesible_hosts
if len(accessible_hosts) < 1 and load_hosts:
self.send_files(load_hosts=True)
elif len(accessible_hosts) < 1 and not load_hosts:
print("No accessible hosts available")
for host in accessible_hosts:
print(f"{host.hostname}:")
if not self._files_to_send or len(self._files_to_send) < 1:
print("No files to send")
else:
for file in self._files_to_send:
host.files_sent: dict
if not host.files_sent.get(file):
host.files_sent[file] = False
scp_session = SCPClient(host.client.get_transport(), progress=sending_file_progress)
x=self._send_file(scp_session=scp_session, file=file)
print(x)
print(
f'\t{file} -------------> {["No", "Yes"][x]}')
host.files_sent[file] = True
class MENU:
pass
def print_help():
menu_dict = {
0: "Exits the script",
1: "Attempts to login using the loaded credentials",
2: "Attemps to send the files",
-1: "Prints help",
-2: "Reload files"
}
for key in menu_dict:
print(f'{key}:\n\t{menu_dict[key]}')
pass
def menu(manager: MANAGER):
while True:
menu_dict = {
0: exit,
1: manager.test_connections,
2: manager.send_files,
-1: print_help,
-2: manager.update_config
}
msg = "\nSelect 1 of the following options:\n\
1) Test connections\n\
2) Send files\n\
-1) Help\n\
-2) Reload files\n\
0) Exit"
print(msg)
_input = int(input() or 999999)
if _input in menu_dict:
option = menu_dict[_input]
option()
else:
pass
def main():
# Get ARGV
parser = argparse.ArgumentParser(
description=f'Python script for testing SSH connections and sending files (does not validate given routes) v{version}')
parser.add_argument('-v', '--version', action='store_true', help='Prints the version')
parser.add_argument('-hf', '--hosts_file', action='append',
help='File that contains a list of hosts (multiple usage allowed)')
parser.add_argument('-f', '--file', action='append',
help='File to send through SSH (multiple usage allowed)')
parser.add_argument('-c', '--credentials_file', action='append',
help='Json file that contains a list of users to use and their password ie: ' \
'{"u1":"p1","u2":"p2"}')
parser.add_argument('-thi', '--threading_instances', action='store', help='Number of threading instances (NOT ' \
'ENABLED)')
args = parser.parse_args()
if args.version:
print(f'Connection tester: v{version}')
else:
args_dic = {"files_to_send": args.file, "host_files": args.hosts_file,
"credentials_files": args.credentials_file}
conf = CONFIG(**args_dic)
manager = MANAGER(config=conf)
# manager.test_connections()
menu(manager=manager)
if __name__ == '__main__':
# screen_size = get_terminal_size()
# print("|"*screen_size.columns)
main()
# sending_file_progress(filename=b"HA", size=10, sent=10)

BIN
requirements.txt Normal file

Binary file not shown.

33
t2.py Normal file
View File

@ -0,0 +1,33 @@
from dataclasses import dataclass, field
def return_list(txt: str) -> [str]:
_list = []
if type(txt) in (tuple, list):
_list = txt
elif type(txt) in (str, int):
_list = [str(txt)]
return _list
@dataclass
class CONFIG:
host_files: str or [str]
_host_files: str or [str] = field(init=False,default="F")
def __post_init__(self):
print(">>>",self._host_files)
self.host_files = self._host_files or CONFIG._host_files
@property
def host_files(self) -> str or [str]:
return self._host_files
@host_files.setter
def host_files(self, files):
self._host_files = return_list(files)
print(self._host_files)
x=CONFIG(host_files=2)
print(x.__dict__)

12
t3.py Normal file
View File

@ -0,0 +1,12 @@
from dataclasses import dataclass
def return_list(txt: str or [str]) -> [str]:
_list = []
if type(txt) in (tuple, list):
_list = txt
elif type(txt) in (str, int):
_list = [txt]
return _list
print(return_list("xxx"))

42
t4.py Normal file
View File

@ -0,0 +1,42 @@
import time
from os import get_terminal_size
from colored import fg, bg, attr # para los colores
from time import sleep
def sending_file_progress(filename: bytes, size: int, sent: int) -> None:
# float(sent) / float(size) * 100
green_scale = [28, 34, 40, 76, 82, 118]
loading_chars = "|/-\\"
_filename = filename.decode()
screen_size = get_terminal_size()
cols = int(screen_size.columns / 100 * 50)
percent = int(float(sent) / float(size) * 100)
_n = percent % len(loading_chars)
_color: int
_color_picker = int((percent / 10) * (len(green_scale) / 10))
space_filling = " " * int(100 - percent / 100 * cols) + " "
load_bar = f'{"=" * int(percent / 100 * cols)}=>'
if int(percent) >= 100:
_color = green_scale[len(green_scale) - 1]
else:
_color = green_scale[_color_picker]
# print(int(percent / 100 * cols),end="\r")
# print("+"*int(percent / 100 * cols),end="")
# print(" ", end="")
# print("+" * int(100 - percent / 100 * cols), end="\r")
print(
f'\t[{loading_chars[_n]}] {fg(_color)}{load_bar}{attr("reset")} [{percent}%] {space_filling} [{(_filename[:75] + "..") if len(_filename) > 75 else _filename}]',
end='\r')
x = 0
while x <= 100:
sending_file_progress(b"file", 100, x) # bytes string, not flat string
time.sleep(0.05)
x += 1
print()