Files
other/python/26-06-17/SSH 基础1.md
T
2026-06-17 13:12:04 +08:00

5.9 KiB

SSH

  • 本地模拟SSH服务、阻塞SSH客户端、非阻塞IO+多路复用并发客户端
  • Python3.8及以上版本
  • pip install paramiko
  • 本地模拟SSH服务端、阻塞式SSH客户端、非阻塞IO多路复用并发SSH客户端

SSH服务端

  • 实现本地127.0.0.1:2222端口SSH服务,内置RSA密钥、账号密码校验,独立终端运行并保持服务开启。
# ssh_server.py
import paramiko
import socket
import threading

# 生成服务端RSA密钥(SSH握手必备加密凭证)
HOST_KEY = paramiko.RSAKey.generate(2048)
LISTEN_ADDR = ("127.0.0.1", 2222)

class MockSSHServer(paramiko.ServerInterface):
    # 简易账号校验:本地模拟固定账号密码
    def check_auth_password(self, username: str, password: str) -> int:
        if username == "账号" and password == "密码":
            return paramiko.AUTH_SUCCESSFUL
        return paramiko.AUTH_FAILED

    def check_channel_request(self, kind, chanid):
        if kind == "session":
            return paramiko.OPEN_SUCCEEDED
        return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED

def start_ssh_server():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(LISTEN_ADDR)
    sock.listen(5)
    print(f"本地模拟SSH服务启动 {LISTEN_ADDR[0]}:{LISTEN_ADDR[1]}")
    print("登录账号:test  密码:123456\n")

    while True:
        client_sock, addr = sock.accept()
        # SSH加密传输通道
        transport = paramiko.Transport(client_sock)
        transport.add_server_key(HOST_KEY)
        server = MockSSHServer()
        transport.start_server(server=server)

        # 等待客户端认证
        chan = transport.accept(timeout=10)
        if chan is None:
            print(f"{addr} 认证超时,断开连接")
            transport.close()
            continue

        print(f"客户端 {addr} 登录成功,创建交互式Shell")
        chan.send(f"=====本地模拟SSH服务=====\r\n输入任意指令,服务端回显内容\r\n> ")

        # 循环读取客户端输入并回显
        while True:
            try:
                cmd = chan.recv(1024)
                if not cmd:
                    break
                cmd_str = cmd.decode("utf-8").strip()
                resp = f"服务端收到指令:{cmd_str}\r\n> ".encode("utf-8")
                chan.send(resp)
            except Exception:
                break

        chan.close()
        transport.close()
        print(f"{addr} 会话断开\n")

if __name__ == "__main__":
    # 后台线程运行服务端,不阻塞控制台
    server_thread = threading.Thread(target=start_ssh_server, daemon=True)
    server_thread.start()
    input("服务运行中,按回车关闭程序...")

阻塞式SSH客户端

  • 基于paramiko实现基础SSH连接、身份认证、指令收发,理解完整SSH通信流程,新开终端运行测试连通性
import paramiko
import time
SSH_HOST = "127.0.0.1"
SSH_PORT = 2222
USER = "root"
PWD = "admin"

if __name__ == "__main__":
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        client.connect(
            hostname=SSH_HOST,
            port=SSH_PORT,
            username=USER,
            password=PWD,
            timeout=5
        )
        print("SSH加密通道建立完成")
        shell_chan = client.invoke_shell()
        time.sleep(0.3)
        if shell_chan.recv_ready():
            print(shell_chan.recv(1024).decode("utf-8"))
        while True:
            cmd = input("> ")  # 控制台输入
            if cmd.lower() == 'exit':
                break
            
            shell_chan.send(cmd + "\r\n")
            time.sleep(0.2)

            # 接收服务器响应
            if shell_chan.recv_ready():
                response = shell_chan.recv(1024).decode("utf-8")
                print(response, end="")
        
    except Exception as e:
        print("连接失败:", e)
    finally:
        try:
            shell_chan.close()
        except:
            pass
        client.close()

非阻塞IO+selectors多路复用并发客户端

  • 设置Socket非阻塞模式,结合selectors实现单线程同时监听多条SSH连接,完成多会话并发处理,测试并发效果。
import paramiko
import selectors
import socket
sel = selectors.DefaultSelector()
SSH_HOST = "127.0.0.1"
SSH_PORT = 2222
USER = "test"
PWD = "123456"

def create_nonblock_ssh_conn(conn_name: str):
    raw_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    raw_sock.setblocking(False)
    try:
        raw_sock.connect((SSH_HOST,SSH_PORT))
    except BlockingIOError:
        pass
    trabsport = paramiko.Transport(raw_sock)
    trabsport.start_client()
    trabsport.auth_password(USER,PWD)
    chan = trabsport.open_session()
    chan.invoke_shell()
    chan.setblocking(False)
    sel.register(chan,selectors.EVENT_READ,{"name": conn_name})
    print(f"[{conn_name}]非阻塞SSH连接成功")
    chan.send(f"test form {conn_name}\r\n".encode("utf-8"))
    return trabsport,chan

def handle_channel_data(key,mask):
    chan = key.fileobj
    info = key.data
    if mask & selectors.EVENT_READ:
        try:
            data = chan.recv(1024)
            if data:
                print(f"\n[{info['name']}] 收到数据:\n{data.decode('utf-8')}")
            else:
                sel.unregister(chan)
                print(f"[{info['name']}] 连接关闭")
        except Exception:
            sel.unregister(chan)

if __name__ == "__main__":
    trans1,chan1 = create_nonblock_ssh_conn("连接A")
    trans2,chan2 = create_nonblock_ssh_conn("连接B")
    try:
        while True:
            events = sel.select(timeout=0.3)
            for key,mask in events:
                handle_channel_data(key,mask)
    except KeyboardInterrupt:
        print("\n程序退出")
        sel.close()
        trans1.close()
        trans2.close()