最近因为折腾宿迁服务器,没有宝塔等环境,上传下载文件较为麻烦,使用Python写了个上传下载文件的软件。

代码使用了Tkinter做UI,可同时添加8个服务器(本来想做CMD,但是使用太麻烦了

UI

完整代码

import os
import tkinter as tk
import webbrowser
from tkinter import filedialog,messagebox
import easygui

root = tk.Tk()
import re
import paramiko
from scp import SCPClient


def download_file_pkey(DOWNLOAD_SERVER_REMOTE_PATH,DOWNLOAD_LOCAL_FILE_PATH,HOST,PORT,USERNAME):
    ssh_client = paramiko.SSHClient()
    ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
    try:
        private_key = paramiko.RSAKey.from_private_key_file('pkey')
    except Exception as e:
        try:
            private_key = paramiko.Ed25519Key.from_private_key_file('pkey')
        except Exception as e:
            messagebox.showerror('下载失败', '不支持的密钥类型(必须为RSA或Ed25519)')
    try:
        ssh_client.connect(HOST, PORT, USERNAME, pkey=private_key)
        scp_client = SCPClient(ssh_client.get_transport(), socket_timeout=15.0)
        scp_client.get(DOWNLOAD_SERVER_REMOTE_PATH, DOWNLOAD_LOCAL_FILE_PATH)
    except FileNotFoundError as e:
        print(e)
        print("系统找不到指定文件" + DOWNLOAD_SERVER_REMOTE_PATH)
        ssh_client.close()
        return [False,'系统找不到指定文件:'+ DOWNLOAD_SERVER_REMOTE_PATH]
    except Exception as e:
        ssh_client.close()
        return [False, str(e)]
    else:
        print("文件下载成功")
        ssh_client.close()
        return [True, '']

def upload_file_pkey(UPLOAD_LOCAL_FILE_PATG,UPLOAD_SERVER_REMOTE_PATH,HOST,PORT,USERNAME):
    ssh_client = paramiko.SSHClient()
    ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
    try:
        private_key = paramiko.RSAKey.from_private_key_file('pkey')
    except Exception as e:
        try:
            private_key = paramiko.Ed25519Key.from_private_key_file('pkey')
        except Exception as e:
            messagebox.showerror('上传失败','不支持的密钥类型(必须为RSA或Ed25519)')
    try:
        ssh_client.connect(HOST, PORT, USERNAME, pkey=private_key)
        scp_client = SCPClient(ssh_client.get_transport(), socket_timeout=15.0)
        scp_client.put(UPLOAD_LOCAL_FILE_PATG, UPLOAD_SERVER_REMOTE_PATH)
    except FileNotFoundError as e:
        print(e)
        print("系统找不到指定文件:" + UPLOAD_LOCAL_FILE_PATG)
        ssh_client.close()
        return [False,("系统找不到指定文件:" + UPLOAD_LOCAL_FILE_PATG)]
    except TimeoutError as e:
        print(e)
        print("连接超时")
        return [False, '链接超时,请检查SSH配置和IP']
    except Exception as e:
        print(e)
        ssh_client.close()
        return [False, str(e)]
    else:
        print("文件上传成功")
        ssh_client.close()
        return [True,'文件上传成功']


def upload_file(UPLOAD_LOCAL_FILE_PATG,UPLOAD_SERVER_REMOTE_PATH,HOST,PORT,USERNAME,PASSWORD):

    ssh_client = paramiko.SSHClient()
    ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
    try:
        ssh_client.connect(HOST, PORT, USERNAME, PASSWORD)
        scp_client = SCPClient(ssh_client.get_transport(), socket_timeout=15.0)
        scp_client.put(UPLOAD_LOCAL_FILE_PATG, UPLOAD_SERVER_REMOTE_PATH)
    except FileNotFoundError as e:
        print(e)
        print("系统找不到指定文件:" + UPLOAD_LOCAL_FILE_PATG)
        ssh_client.close()
        return [False,("系统找不到指定文件:" + UPLOAD_LOCAL_FILE_PATG)]
    except TimeoutError as e:
        print(e)
        print("连接超时")
        return [False, '链接超时,请检查SSH配置和IP']
    except Exception as e:
        print(e)
        ssh_client.close()
        return [False, str(e)]
    else:
        print("文件上传成功")
        ssh_client.close()
        return [True,'文件上传成功']

def download_file(DOWNLOAD_SERVER_REMOTE_PATH,DOWNLOAD_LOCAL_FILE_PATH,HOST,PORT,USERNAME,PASSWORD):
    ssh_client = paramiko.SSHClient()
    ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy)
    try:
        ssh_client.connect(HOST, PORT, USERNAME, PASSWORD)
        scp_client = SCPClient(ssh_client.get_transport(), socket_timeout=15.0)
        scp_client.get(DOWNLOAD_SERVER_REMOTE_PATH, DOWNLOAD_LOCAL_FILE_PATH)
    except FileNotFoundError as e:
        print(e)
        print("系统找不到指定文件" + DOWNLOAD_SERVER_REMOTE_PATH)
        ssh_client.close()
        return [False,'系统找不到指定文件:'+ DOWNLOAD_SERVER_REMOTE_PATH]
    except Exception as e:
        ssh_client.close()
        return [False, str(e)]
    else:
        print("文件下载成功")
        ssh_client.close()
        return [True, '']



def check_ip(ipAddr):
  compile_ip=re.compile('^(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|[1-9])\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]\d|\d)$')
  if compile_ip.match(ipAddr):
    return True
  else:
    return False

def changecof():
    with open('./config.json') as f:
        config = f.read()
        config = eval(config)
    backgui=easygui.buttonbox('请选择要对选中服务器进行的操作:',config[str(v.get())][0]+' 设置',['查看当前各项信息','更改ip','更改SSH信息','密钥/密码切换'])
    print(backgui)
    if backgui==None:
        pass
    elif backgui=='查看当前各项信息':
        modole='密钥'
        password=config[str(v.get())][2]
        if config[str(v.get())][4]=='mm':
            modole='密码'
        elif config[str(v.get())][4]=='my':
            password='密钥过长,请点击显示密钥查看当前密钥'
        buildTEXT='IP:%s\nSSH端口:%s\nSSH用户名:%s\nSSH验证模式:%s\nSSH密码:%s\n'%(config[str(v.get())][0],config[str(v.get())][3],config[str(v.get())][1],modole,password)
        backgui_rec=easygui.buttonbox(buildTEXT,config[str(v.get())][0]+'服务器信息',['完成','查看密钥'])
        if backgui_rec=='查看密钥' and modole=='密钥':
            easygui.msgbox(config[str(v.get())][2],config[str(v.get())][0]+' 密钥',ok_button='完成')
        else:
            if modole=='密码':
                messagebox.showwarning('错误','当前SSH验证模式为密码')
    elif backgui=='更改ip':
        changip=easygui.enterbox('请输入新的服务器ip','更改ip')
        if changip!=None:
            print(check_ip(changip))
            if check_ip(changip):
                config[str(v.get())][0]=changip
                with open('./config.json','w') as f:
                    f.write(str(config))
                    messagebox.showinfo('完成','操作完成,UI更新可能有延迟,请重启程序')
            else:
                messagebox.showwarning('IP不合法', '输入的IP不合法,请检查后再试')
        else:
            messagebox.showinfo('取消', '操作取消')


    elif backgui=='更改SSH信息':
        backguisec = easygui.buttonbox('请选择要对选中服务器进行的操作:', '更改SSH信息',
                                    ['更改SSH密码/密钥', '更改SSH用户名', '更改SSH端口'])
        if backguisec==None:
            pass
        elif backguisec=='更改SSH密码/密钥':
            if config[str(v.get())][4]=='mm':
                config[str(v.get())][2] = easygui.enterbox('当前是密码模式,请输入新的SSH密码', '更改SSH密码')
                if config[str(v.get())][2]!='' and config[str(v.get())][2]!=None:
                    with open('./config.json', 'w') as f:
                        f.write(str(config))
                        messagebox.showinfo('完成','操作完成')
                else:
                    #messagebox.showinfo('取消','操作取消')
                    pass
            elif config[str(v.get())][4]=='my':
                config[str(v.get())][2]=easygui.enterbox('当前是密钥模式,请直接将密钥粘贴于此:','更改SSH密钥')
                if config[str(v.get())][2] != '' and config[str(v.get())][2] != None:
                    with open('./config.json', 'w') as f:
                        f.write(str(config))
                    messagebox.showinfo('完成','操作完成')
                else:
                    #messagebox.showinfo('取消','操作取消')
                    pass



        elif backguisec == '更改SSH用户名':
            config[str(v.get())][1] = easygui.enterbox('请输入新的SSH用户名', '更改SSH用户名')
            if config[str(v.get())][1]!='' and config[str(v.get())][1]!=None:
                with open('./config.json', 'w') as f:
                    f.write(str(config))
                    messagebox.showinfo('完成','操作完成')


        elif backguisec == '更改SSH端口':
            config[str(v.get())][3] = easygui.enterbox('请输入新的SSH端口', '更改SSH端口')
            if config[str(v.get())][3] != '' and config[str(v.get())][3] != None:
                with open('./config.json', 'w') as f:
                    f.write(str(config))
                    messagebox.showinfo('完成','操作完成')
    elif backgui=='密钥/密码切换':
        if config[str(v.get())][4]=='mm':
            config[str(v.get())][4]='my'
            with open('./config.json', 'w') as f:
                f.write(str(config))
            messagebox.showinfo('完成','密码类型已更改为密钥,请前往SSH信息更改更改密钥')
        elif config[str(v.get())][4] == 'my':
            config[str(v.get())][4] = 'mm'
            with open('./config.json', 'w') as f:
                f.write(str(config))
            messagebox.showinfo('完成', '密码类型已更改为密码,请前往SSH信息更改更改密码')
        else:
            messagebox.showerror('错误', '配置文件异常(Service-Code:17)')

def showinfo():
    with open('./topmost') as f:
        tpn=f.read()
    if str(tpn)=='1':
        tpns='开'
    else:
        tpns='关'
    backgui=easygui.buttonbox('关于本程序','关于',['是否置顶(当前%s)'%tpns,'本程序介绍&版权声明'])
    if backgui!=None:
        if backgui.find('是否置顶')!=-1:
            if tpns=='开':
                with open('./topmost','w') as f:
                    f.write('0')
                    messagebox.showinfo('完成','操作完成,重启见效果')
            else:
                with open('./topmost','w') as f:
                    f.write('1')
                    messagebox.showinfo('完成','操作完成,重启见效果')
        else:
            webbrowser.open('https://www.lowion.cn/shellfile-v1-0%e9%9a%86%e9%87%8d%e6%8e%a8%e5%87%ba/', new=0, autoraise=True)


def upload():
    uploadfile_path = filedialog.askopenfilename()
    if uploadfile_path!='':
        remote_path = easygui.enterbox('请输入文件上传到服务器的目录,如:/root/upload/','文件上传')
        if not remote_path==None and not remote_path=='':
            with open('./config.json') as f:
                config = f.read()
                config = eval(config)
            config=config[str(v.get())]
            print(config)
            laste_check=easygui.boolbox('确认上传?\n从'+uploadfile_path+' 到 '+config[0]+"@"+remote_path+'\n\n上传期间软件可能会卡死,稍等即可','上传确认',choices=['确认(开始上传)','取消'])
            if laste_check:
                with open('./config.json') as f:
                    config_key = f.read()
                    config_key = eval(config_key)
                if config_key[str(v.get())][4]=='mm':
                    upload_status = upload_file(uploadfile_path, remote_path, config[0], config[3], config[1], config[2])
                else:
                    with open('./pkey','w') as f:
                        f.write(config_key[str(v.get())][2])
                    upload_status=upload_file_pkey(uploadfile_path, remote_path, config[0], config[3], config[1])
                if upload_status[0]:
                    messagebox.showinfo('成功', '上传成功')
                else:
                    messagebox.showerror('上传失败','文件上传失败:'+upload_status[1])
            else:
                messagebox.showinfo('取消', '用户取消')
        else:
            if remote_path == None:
                messagebox.showinfo('取消', '用户取消')
            else:
                messagebox.showinfo('提示', '请不要输入空路径,操作取消')
    else:
        messagebox.showinfo('提示', '请选择文件,操作取消')

def download():
    with open('./config.json') as f:
        config = f.read()
        config = eval(config)
    config = config[str(v.get())]
    with open('./DIR_config.json') as f:
        download_dir=f.read()
    serverfile_path=easygui.enterbox('请输入要下载文件在服务器上的地址,如:/root/123.txt','文件下载')
    if serverfile_path!="" and serverfile_path!=None:
        print(config)
        laste_check = easygui.boolbox('确认下载?\n从'+config[0]+'@'+serverfile_path+'到本地'+download_dir+'\n\n下载期间软件可能会卡死,稍等即可', choices=['确认', '取消'])
        if laste_check:
            with open('./config.json') as f:
                config_key = f.read()
                config_key = eval(config_key)
            if config_key[str(v.get())][4] == 'mm':
                upload_status=download_file(serverfile_path,download_dir,config[0],config[3],config[1],config[2])
            else:
                with open('./pkey', 'w') as f:
                    f.write(config_key[str(v.get())][2])
                upload_status = download_file_pkey(serverfile_path,download_dir,config[0],config[3],config[1])

            if upload_status[0]:
                messagebox.showinfo('成功', '下载成功,文件已下载到:'+download_dir)
            else:
                messagebox.showerror('上传失败', '文件下载失败:' + upload_status[1])
    else:
        if serverfile_path=="":
            messagebox.showwarning('错误', '路径输入错误')
        else:
            messagebox.showinfo('取消', '用户取消')
def setdir():
    cdir=filedialog.askdirectory()
    if cdir=='':
        messagebox.showinfo('取消', '用户取消')
    else:
        with open('./DIR_config.json','w') as f:
            f.write(cdir)
        messagebox.showinfo('完成', '操作完成')
root.title('ShellFile v1.0')

root.geometry('279x320+500+500')
root.resizable(False, False)

tk.Button(root, text='上传文件', width=12, height=2,command=lambda:upload()).place(x=0, y=0)
tk.Button(root, text='下载文件', width=12, height=2,command=lambda:download()).place(x=93, y=0)
tk.Button(root, text='设定下载目录', width=12, height=2,command=lambda:setdir()).place(x=186, y=0)
tk.Button(root, text='编辑选中的服务器', width=39, height=2,command=lambda:changecof()).place(x=0, y=50)
tk.Button(root, text='关于本程序', width=45, height=1,command=lambda:showinfo()).place(x=-27, y=291)

v = tk.IntVar()
v.set(0)


find=False
for i in os.listdir('./'):
    if i=='topmost':
        find=True

default=1
if not find:
    with open('topmost','w') as f:
         f.write(str(default))
with open('topmost','r') as f:
     nstu=f.read()
root.attributes("-topmost",int(nstu))



find=False
for i in os.listdir('./'):
    if i=='config.json':
        find=True

default={'0':['255.255.255.255','root','1234567','22','mm'],'1':['255.255.255.255','root','1234567','22','mm'],'2':['255.255.255.255','root','1234567','22','mm'],'3':['255.255.255.255','root','1234567','22','mm'],'4':['255.255.255.255','root','1234567','22','mm'],'5':['255.255.255.255','root','1234567','22','mm'],'6':['255.255.255.255','root','1234567','22','mm']}
if not find:
    with open('./config.json','w') as f:
         f.write(str(default))

with open('./config.json') as f:
    config=f.read()
    config=eval(config)



find=False
for i in os.listdir('./'):
    if i=='DIR_config.json':
        find=True

default="C:/Users/%s/Downloads" %os.getlogin()
print(default)
if not find:
    with open('./DIR_config.json','w') as f:
         f.write(str(default))



bt1=tk.Radiobutton(root, text=config['0'][0],activebackground='red',variable=v, value=0)
bt1.place(x=80, y=120,anchor='w')

bt2=tk.Radiobutton(root, text=config['1'][0],activebackground='red',variable=v, value=1)
bt2.place(x=80, y=145,anchor='w')

bt3=tk.Radiobutton(root, text=config['2'][0],activebackground='red',variable=v, value=2)
bt3.place(x=80, y=170,anchor='w')

bt4=tk.Radiobutton(root, text=config['3'][0],activebackground='red',variable=v, value=3)
bt4.place(x=80, y=195,anchor='w')

bt5=tk.Radiobutton(root, text=config['4'][0],activebackground='red',variable=v, value=4)
bt5.place(x=80, y=220,anchor='w')

bt6=tk.Radiobutton(root, text=config['5'][0],activebackground='red',variable=v, value=5)
bt6.place(x=80, y=245,anchor='w')

bt7=tk.Radiobutton(root, text=config['6'][0],activebackground='red',variable=v, value=6)
bt7.place(x=80, y=270,anchor='w')


root.mainloop()

Exe版

Preview v1.0:

ShellFile Preview v1.0.zip

Final version v1.0:

ShellFile Final v1.0.zip