From 2654c26d80165572fe634e7b73f783182fb7eff9 Mon Sep 17 00:00:00 2001 From: noob_xiaoyu Date: Wed, 17 Jun 2026 13:12:04 +0800 Subject: [PATCH] . --- python/26-06-17/SSH 基础1.md | 188 +++++++++++++++++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100644 python/26-06-17/SSH 基础1.md diff --git a/python/26-06-17/SSH 基础1.md b/python/26-06-17/SSH 基础1.md new file mode 100644 index 0000000..d987670 --- /dev/null +++ b/python/26-06-17/SSH 基础1.md @@ -0,0 +1,188 @@ +# SSH +- 本地模拟SSH服务、阻塞SSH客户端、非阻塞IO+多路复用并发客户端 +- Python3.8及以上版本 +- ``` pip install paramiko ``` +- 本地模拟SSH服务端、阻塞式SSH客户端、非阻塞IO多路复用并发SSH客户端 + +## SSH服务端 +- 实现本地127.0.0.1:2222端口SSH服务,内置RSA密钥、账号密码校验,独立终端运行并保持服务开启。 +``` python +# ssh_server.py +import paramiko +import socket +import threading + +# 生成服务端RSA密钥(SSH握手必备加密凭证) +HOST_KEY = paramiko.RSAKey.generate(2048) +LISTEN_ADDR = ("127.0.0.1", 2222) + +class MockSSHServer(paramiko.ServerInterface): + # 简易账号校验:本地模拟固定账号密码 + def check_auth_password(self, username: str, password: str) -> int: + if username == "账号" and password == "密码": + return paramiko.AUTH_SUCCESSFUL + return paramiko.AUTH_FAILED + + def check_channel_request(self, kind, chanid): + if kind == "session": + return paramiko.OPEN_SUCCEEDED + return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED + +def start_ssh_server(): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind(LISTEN_ADDR) + sock.listen(5) + print(f"本地模拟SSH服务启动 {LISTEN_ADDR[0]}:{LISTEN_ADDR[1]}") + print("登录账号:test 密码:123456\n") + + while True: + client_sock, addr = sock.accept() + # SSH加密传输通道 + transport = paramiko.Transport(client_sock) + transport.add_server_key(HOST_KEY) + server = MockSSHServer() + transport.start_server(server=server) + + # 等待客户端认证 + chan = transport.accept(timeout=10) + if chan is None: + print(f"{addr} 认证超时,断开连接") + transport.close() + continue + + print(f"客户端 {addr} 登录成功,创建交互式Shell") + chan.send(f"=====本地模拟SSH服务=====\r\n输入任意指令,服务端回显内容\r\n> ") + + # 循环读取客户端输入并回显 + while True: + try: + cmd = chan.recv(1024) + if not cmd: + break + cmd_str = cmd.decode("utf-8").strip() + resp = f"服务端收到指令:{cmd_str}\r\n> ".encode("utf-8") + chan.send(resp) + except Exception: + break + + chan.close() + transport.close() + print(f"{addr} 会话断开\n") + +if __name__ == "__main__": + # 后台线程运行服务端,不阻塞控制台 + server_thread = threading.Thread(target=start_ssh_server, daemon=True) + server_thread.start() + input("服务运行中,按回车关闭程序...") +``` + +## 阻塞式SSH客户端 +- 基于paramiko实现基础SSH连接、身份认证、指令收发,理解完整SSH通信流程,新开终端运行测试连通性 + +``` Python +import paramiko +import time +SSH_HOST = "127.0.0.1" +SSH_PORT = 2222 +USER = "root" +PWD = "admin" + +if __name__ == "__main__": + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + try: + client.connect( + hostname=SSH_HOST, + port=SSH_PORT, + username=USER, + password=PWD, + timeout=5 + ) + print("SSH加密通道建立完成") + shell_chan = client.invoke_shell() + time.sleep(0.3) + if shell_chan.recv_ready(): + print(shell_chan.recv(1024).decode("utf-8")) + while True: + cmd = input("> ") # 控制台输入 + if cmd.lower() == 'exit': + break + + shell_chan.send(cmd + "\r\n") + time.sleep(0.2) + + # 接收服务器响应 + if shell_chan.recv_ready(): + response = shell_chan.recv(1024).decode("utf-8") + print(response, end="") + + except Exception as e: + print("连接失败:", e) + finally: + try: + shell_chan.close() + except: + pass + client.close() +``` + +## 非阻塞IO+selectors多路复用并发客户端 +- 设置Socket非阻塞模式,结合selectors实现单线程同时监听多条SSH连接,完成多会话并发处理,测试并发效果。 + +``` Python +import paramiko +import selectors +import socket +sel = selectors.DefaultSelector() +SSH_HOST = "127.0.0.1" +SSH_PORT = 2222 +USER = "test" +PWD = "123456" + +def create_nonblock_ssh_conn(conn_name: str): + raw_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) + raw_sock.setblocking(False) + try: + raw_sock.connect((SSH_HOST,SSH_PORT)) + except BlockingIOError: + pass + trabsport = paramiko.Transport(raw_sock) + trabsport.start_client() + trabsport.auth_password(USER,PWD) + chan = trabsport.open_session() + chan.invoke_shell() + chan.setblocking(False) + sel.register(chan,selectors.EVENT_READ,{"name": conn_name}) + print(f"[{conn_name}]非阻塞SSH连接成功") + chan.send(f"test form {conn_name}\r\n".encode("utf-8")) + return trabsport,chan + +def handle_channel_data(key,mask): + chan = key.fileobj + info = key.data + if mask & selectors.EVENT_READ: + try: + data = chan.recv(1024) + if data: + print(f"\n[{info['name']}] 收到数据:\n{data.decode('utf-8')}") + else: + sel.unregister(chan) + print(f"[{info['name']}] 连接关闭") + except Exception: + sel.unregister(chan) + +if __name__ == "__main__": + trans1,chan1 = create_nonblock_ssh_conn("连接A") + trans2,chan2 = create_nonblock_ssh_conn("连接B") + try: + while True: + events = sel.select(timeout=0.3) + for key,mask in events: + handle_channel_data(key,mask) + except KeyboardInterrupt: + print("\n程序退出") + sel.close() + trans1.close() + trans2.close() +``` \ No newline at end of file