Files
2026-06-17 13:12:04 +08:00

188 lines
5.9 KiB
Markdown

# SSH
- 本地模拟SSH服务、阻塞SSH客户端、非阻塞IO+多路复用并发客户端
- Python3.8及以上版本
- ``` pip install paramiko ```
- 本地模拟SSH服务端、阻塞式SSH客户端、非阻塞IO多路复用并发SSH客户端
## SSH服务端
- 实现本地127.0.0.1:2222端口SSH服务,内置RSA密钥、账号密码校验,独立终端运行并保持服务开启。
``` python
# ssh_server.py
import paramiko
import socket
import threading
# 生成服务端RSA密钥(SSH握手必备加密凭证)
HOST_KEY = paramiko.RSAKey.generate(2048)
LISTEN_ADDR = ("127.0.0.1", 2222)
class MockSSHServer(paramiko.ServerInterface):
# 简易账号校验:本地模拟固定账号密码
def check_auth_password(self, username: str, password: str) -> int:
if username == "账号" and password == "密码":
return paramiko.AUTH_SUCCESSFUL
return paramiko.AUTH_FAILED
def check_channel_request(self, kind, chanid):
if kind == "session":
return paramiko.OPEN_SUCCEEDED
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
def start_ssh_server():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(LISTEN_ADDR)
sock.listen(5)
print(f"本地模拟SSH服务启动 {LISTEN_ADDR[0]}:{LISTEN_ADDR[1]}")
print("登录账号:test 密码:123456\n")
while True:
client_sock, addr = sock.accept()
# SSH加密传输通道
transport = paramiko.Transport(client_sock)
transport.add_server_key(HOST_KEY)
server = MockSSHServer()
transport.start_server(server=server)
# 等待客户端认证
chan = transport.accept(timeout=10)
if chan is None:
print(f"{addr} 认证超时,断开连接")
transport.close()
continue
print(f"客户端 {addr} 登录成功,创建交互式Shell")
chan.send(f"=====本地模拟SSH服务=====\r\n输入任意指令,服务端回显内容\r\n> ")
# 循环读取客户端输入并回显
while True:
try:
cmd = chan.recv(1024)
if not cmd:
break
cmd_str = cmd.decode("utf-8").strip()
resp = f"服务端收到指令:{cmd_str}\r\n> ".encode("utf-8")
chan.send(resp)
except Exception:
break
chan.close()
transport.close()
print(f"{addr} 会话断开\n")
if __name__ == "__main__":
# 后台线程运行服务端,不阻塞控制台
server_thread = threading.Thread(target=start_ssh_server, daemon=True)
server_thread.start()
input("服务运行中,按回车关闭程序...")
```
## 阻塞式SSH客户端
- 基于paramiko实现基础SSH连接、身份认证、指令收发,理解完整SSH通信流程,新开终端运行测试连通性
``` Python
import paramiko
import time
SSH_HOST = "127.0.0.1"
SSH_PORT = 2222
USER = "root"
PWD = "admin"
if __name__ == "__main__":
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(
hostname=SSH_HOST,
port=SSH_PORT,
username=USER,
password=PWD,
timeout=5
)
print("SSH加密通道建立完成")
shell_chan = client.invoke_shell()
time.sleep(0.3)
if shell_chan.recv_ready():
print(shell_chan.recv(1024).decode("utf-8"))
while True:
cmd = input("> ") # 控制台输入
if cmd.lower() == 'exit':
break
shell_chan.send(cmd + "\r\n")
time.sleep(0.2)
# 接收服务器响应
if shell_chan.recv_ready():
response = shell_chan.recv(1024).decode("utf-8")
print(response, end="")
except Exception as e:
print("连接失败:", e)
finally:
try:
shell_chan.close()
except:
pass
client.close()
```
## 非阻塞IO+selectors多路复用并发客户端
- 设置Socket非阻塞模式,结合selectors实现单线程同时监听多条SSH连接,完成多会话并发处理,测试并发效果。
``` Python
import paramiko
import selectors
import socket
sel = selectors.DefaultSelector()
SSH_HOST = "127.0.0.1"
SSH_PORT = 2222
USER = "test"
PWD = "123456"
def create_nonblock_ssh_conn(conn_name: str):
raw_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
raw_sock.setblocking(False)
try:
raw_sock.connect((SSH_HOST,SSH_PORT))
except BlockingIOError:
pass
trabsport = paramiko.Transport(raw_sock)
trabsport.start_client()
trabsport.auth_password(USER,PWD)
chan = trabsport.open_session()
chan.invoke_shell()
chan.setblocking(False)
sel.register(chan,selectors.EVENT_READ,{"name": conn_name})
print(f"[{conn_name}]非阻塞SSH连接成功")
chan.send(f"test form {conn_name}\r\n".encode("utf-8"))
return trabsport,chan
def handle_channel_data(key,mask):
chan = key.fileobj
info = key.data
if mask & selectors.EVENT_READ:
try:
data = chan.recv(1024)
if data:
print(f"\n[{info['name']}] 收到数据:\n{data.decode('utf-8')}")
else:
sel.unregister(chan)
print(f"[{info['name']}] 连接关闭")
except Exception:
sel.unregister(chan)
if __name__ == "__main__":
trans1,chan1 = create_nonblock_ssh_conn("连接A")
trans2,chan2 = create_nonblock_ssh_conn("连接B")
try:
while True:
events = sel.select(timeout=0.3)
for key,mask in events:
handle_channel_data(key,mask)
except KeyboardInterrupt:
print("\n程序退出")
sel.close()
trans1.close()
trans2.close()
```