.
This commit is contained in:
@@ -0,0 +1,188 @@
|
||||
# SSH
|
||||
- 本地模拟SSH服务、阻塞SSH客户端、非阻塞IO+多路复用并发客户端
|
||||
- Python3.8及以上版本
|
||||
- ``` pip install paramiko ```
|
||||
- 本地模拟SSH服务端、阻塞式SSH客户端、非阻塞IO多路复用并发SSH客户端
|
||||
|
||||
## SSH服务端
|
||||
- 实现本地127.0.0.1:2222端口SSH服务,内置RSA密钥、账号密码校验,独立终端运行并保持服务开启。
|
||||
``` python
|
||||
# ssh_server.py
|
||||
import paramiko
|
||||
import socket
|
||||
import threading
|
||||
|
||||
# 生成服务端RSA密钥(SSH握手必备加密凭证)
|
||||
HOST_KEY = paramiko.RSAKey.generate(2048)
|
||||
LISTEN_ADDR = ("127.0.0.1", 2222)
|
||||
|
||||
class MockSSHServer(paramiko.ServerInterface):
|
||||
# 简易账号校验:本地模拟固定账号密码
|
||||
def check_auth_password(self, username: str, password: str) -> int:
|
||||
if username == "账号" and password == "密码":
|
||||
return paramiko.AUTH_SUCCESSFUL
|
||||
return paramiko.AUTH_FAILED
|
||||
|
||||
def check_channel_request(self, kind, chanid):
|
||||
if kind == "session":
|
||||
return paramiko.OPEN_SUCCEEDED
|
||||
return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
|
||||
|
||||
def start_ssh_server():
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
sock.bind(LISTEN_ADDR)
|
||||
sock.listen(5)
|
||||
print(f"本地模拟SSH服务启动 {LISTEN_ADDR[0]}:{LISTEN_ADDR[1]}")
|
||||
print("登录账号:test 密码:123456\n")
|
||||
|
||||
while True:
|
||||
client_sock, addr = sock.accept()
|
||||
# SSH加密传输通道
|
||||
transport = paramiko.Transport(client_sock)
|
||||
transport.add_server_key(HOST_KEY)
|
||||
server = MockSSHServer()
|
||||
transport.start_server(server=server)
|
||||
|
||||
# 等待客户端认证
|
||||
chan = transport.accept(timeout=10)
|
||||
if chan is None:
|
||||
print(f"{addr} 认证超时,断开连接")
|
||||
transport.close()
|
||||
continue
|
||||
|
||||
print(f"客户端 {addr} 登录成功,创建交互式Shell")
|
||||
chan.send(f"=====本地模拟SSH服务=====\r\n输入任意指令,服务端回显内容\r\n> ")
|
||||
|
||||
# 循环读取客户端输入并回显
|
||||
while True:
|
||||
try:
|
||||
cmd = chan.recv(1024)
|
||||
if not cmd:
|
||||
break
|
||||
cmd_str = cmd.decode("utf-8").strip()
|
||||
resp = f"服务端收到指令:{cmd_str}\r\n> ".encode("utf-8")
|
||||
chan.send(resp)
|
||||
except Exception:
|
||||
break
|
||||
|
||||
chan.close()
|
||||
transport.close()
|
||||
print(f"{addr} 会话断开\n")
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 后台线程运行服务端,不阻塞控制台
|
||||
server_thread = threading.Thread(target=start_ssh_server, daemon=True)
|
||||
server_thread.start()
|
||||
input("服务运行中,按回车关闭程序...")
|
||||
```
|
||||
|
||||
## 阻塞式SSH客户端
|
||||
- 基于paramiko实现基础SSH连接、身份认证、指令收发,理解完整SSH通信流程,新开终端运行测试连通性
|
||||
|
||||
``` Python
|
||||
import paramiko
|
||||
import time
|
||||
SSH_HOST = "127.0.0.1"
|
||||
SSH_PORT = 2222
|
||||
USER = "root"
|
||||
PWD = "admin"
|
||||
|
||||
if __name__ == "__main__":
|
||||
client = paramiko.SSHClient()
|
||||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
try:
|
||||
client.connect(
|
||||
hostname=SSH_HOST,
|
||||
port=SSH_PORT,
|
||||
username=USER,
|
||||
password=PWD,
|
||||
timeout=5
|
||||
)
|
||||
print("SSH加密通道建立完成")
|
||||
shell_chan = client.invoke_shell()
|
||||
time.sleep(0.3)
|
||||
if shell_chan.recv_ready():
|
||||
print(shell_chan.recv(1024).decode("utf-8"))
|
||||
while True:
|
||||
cmd = input("> ") # 控制台输入
|
||||
if cmd.lower() == 'exit':
|
||||
break
|
||||
|
||||
shell_chan.send(cmd + "\r\n")
|
||||
time.sleep(0.2)
|
||||
|
||||
# 接收服务器响应
|
||||
if shell_chan.recv_ready():
|
||||
response = shell_chan.recv(1024).decode("utf-8")
|
||||
print(response, end="")
|
||||
|
||||
except Exception as e:
|
||||
print("连接失败:", e)
|
||||
finally:
|
||||
try:
|
||||
shell_chan.close()
|
||||
except:
|
||||
pass
|
||||
client.close()
|
||||
```
|
||||
|
||||
## 非阻塞IO+selectors多路复用并发客户端
|
||||
- 设置Socket非阻塞模式,结合selectors实现单线程同时监听多条SSH连接,完成多会话并发处理,测试并发效果。
|
||||
|
||||
``` Python
|
||||
import paramiko
|
||||
import selectors
|
||||
import socket
|
||||
sel = selectors.DefaultSelector()
|
||||
SSH_HOST = "127.0.0.1"
|
||||
SSH_PORT = 2222
|
||||
USER = "test"
|
||||
PWD = "123456"
|
||||
|
||||
def create_nonblock_ssh_conn(conn_name: str):
|
||||
raw_sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
|
||||
raw_sock.setblocking(False)
|
||||
try:
|
||||
raw_sock.connect((SSH_HOST,SSH_PORT))
|
||||
except BlockingIOError:
|
||||
pass
|
||||
trabsport = paramiko.Transport(raw_sock)
|
||||
trabsport.start_client()
|
||||
trabsport.auth_password(USER,PWD)
|
||||
chan = trabsport.open_session()
|
||||
chan.invoke_shell()
|
||||
chan.setblocking(False)
|
||||
sel.register(chan,selectors.EVENT_READ,{"name": conn_name})
|
||||
print(f"[{conn_name}]非阻塞SSH连接成功")
|
||||
chan.send(f"test form {conn_name}\r\n".encode("utf-8"))
|
||||
return trabsport,chan
|
||||
|
||||
def handle_channel_data(key,mask):
|
||||
chan = key.fileobj
|
||||
info = key.data
|
||||
if mask & selectors.EVENT_READ:
|
||||
try:
|
||||
data = chan.recv(1024)
|
||||
if data:
|
||||
print(f"\n[{info['name']}] 收到数据:\n{data.decode('utf-8')}")
|
||||
else:
|
||||
sel.unregister(chan)
|
||||
print(f"[{info['name']}] 连接关闭")
|
||||
except Exception:
|
||||
sel.unregister(chan)
|
||||
|
||||
if __name__ == "__main__":
|
||||
trans1,chan1 = create_nonblock_ssh_conn("连接A")
|
||||
trans2,chan2 = create_nonblock_ssh_conn("连接B")
|
||||
try:
|
||||
while True:
|
||||
events = sel.select(timeout=0.3)
|
||||
for key,mask in events:
|
||||
handle_channel_data(key,mask)
|
||||
except KeyboardInterrupt:
|
||||
print("\n程序退出")
|
||||
sel.close()
|
||||
trans1.close()
|
||||
trans2.close()
|
||||
```
|
||||
Reference in New Issue
Block a user