用Python写一个UDP端口测试工具(二)
需求
最近有个运维需求,需要测试客户端的UDP端口与服务器的连通性。
需求也很简单:客户端测往服务端发UDP包,服务端收到包后响应客户端,当客户端能收到服务端的响应则可断定端口是可达的。但是服务端需要测试的端口有很多,需要一款工具来实现。
思路
虽然nc等工具可以测试端口,但是面对多端口测试场景,就显得捉襟见肘了,因此就想到使用Python的socket编程来自己写一个工具来实现这个功能。
具体的思路如下:
- 起一个TCP线程,用于客户端、服务端之间协商需要测试的端口
- 起一个UDP线程,用于测试端口是否可达
- 由客户端指定需要测试哪些端口,用逗号分开端口号,端口范围使用“-”或“:”连接符指定
show you the code
服务端
#!/usr/bin/env python
#!/usr/libexec/platform-python
# encoding: utf-8
import os
import signal
import socket
import time
from threading import Thread, Event
import json
try:
import queue
except ImportError:
import Queue as queue
class TCPThread(Thread):
def __init__(self, queue, event):
Thread.__init__(self)
self.queue = queue
self.event = event
def run(self):
sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
sock.bind(('0.0.0.0', 4000))
sock.listen(1)
while True:
conn, addr = sock.accept()
while True:
data_recv = conn.recv(65535)
if data_recv == b'':
conn.close()
break
try:
data_json = json.loads(data_recv.decode())
except json.decoder.JSONDecodeError:
print("json encode error")
continue
""" 合法数据
{
"proto": "udpping",
"port": number,
"available": False,
"ready": False
}
"""
if "proto" in data_json and "port" in data_json:
# 判断UDP本地端口是否可用
try:
us = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
us.bind(('0.0.0.0', data_json['port']))
us.close()
data_json["available"] = True
conn.sendall(json.dumps(data_json).encode())
self.queue.put(data_json)
self.event.wait() # --> 等待事件
self.event.clear() # -->清除事件,以方便下次读取
except socket.error:
print("port %s unavailable" % data_json['port'])
conn.sendall(json.dumps(data_json).encode())
pass
else:
print("proto error")
continue
class UDPThread(Thread):
def __init__(self, queue, event):
Thread.__init__(self)
self.queue = queue
self.event = event
def run(self):
while True:
try:
data = self.queue.get(timeout=3)
except queue.Empty:
continue
sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
sock.bind(('0.0.0.0', data['port']))
sock.setblocking(False)
timewait = 0
while True:
if timewait > 3:
break
try:
data_recv, addr = sock.recvfrom(65535)
sock.sendto(data_recv, addr)
break
except Exception:
time.sleep(0.01)
timewait += 0.01
self.event.set() # --> 发送通知事件,通知TCP线程开始干活
class Utils(object):
@staticmethod
def signal_handler(signal, frame):
os._exit(0)
if __name__ == '__main__':
signal.signal(signal.SIGINT, Utils.signal_handler)
q = queue.Queue()
event = Event()
tcpthread = TCPThread(q, event)
udpthread = UDPThread(q, event)
tcpthread.start()
udpthread.start()
客户端
#!/usr/libexec/platform-python
# encoding: utf-8
import re
import signal
import socket
from threading import Thread, Event
import json
import string
import random
import os
import sys
try:
import queue
except ImportError:
import Queue as queue
class TCPThread(Thread):
def __init__(self, queue, tevent):
Thread.__init__(self)
self.queue = queue
self.event = tevent
self.server = '127.0.0.1'
self.port = 4000
self.proto_json = {
"proto": "udpping",
"server": "127.0.0.1",
"port": 4000,
"available": False,
}
self.testportlist = []
def setserver(self, server, port):
self.server = server
self.port = port
self.proto_json['server'] = server
def _setport(self, port):
try:
port = int(port)
if 0 < port < 65535:
self.testportlist.append(int(port))
except ValueError:
print("%s not a number, ignore" % port)
def _setportrange(self, portrange):
_start, _end = re.split(':|-', portrange)
try:
_start = int(_start)
_end = int(_end)
if _start < _end and 0 < _start < 65535 and 0 < _end < 65535:
self.testportlist.extend(list(range(int(_start), int(_end) + 1)))
else:
print("%s is a illegal port range, ignore" % portrange)
except ValueError:
print("%s or %s not a number, ignore %s" % (_start, _end, portrange))
def settestports(self, port):
if len(port.split(',')) > 1:
for _slice in port.split(','):
if len(re.split('-|:', _slice)) == 2:
self._setportrange(_slice)
else:
self._setport(_slice)
elif len(re.split('-|:', port)) == 2:
self._setportrange(port)
else:
self._setport(port)
def run(self):
sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
sock.connect((self.server, self.port))
while True:
try:
testport = self.testportlist.pop(0)
except IndexError:
break
self.proto_json["port"] = testport
data_send = json.dumps(self.proto_json).encode()
sock.send(data_send)
data_recv = sock.recv(65535)
data_json = json.loads(data_recv.decode())
if data_json["available"]:
self.queue.put(data_json)
self.event.wait() # --> 等待UDP线程给事件通知
self.proto_json["port"] += 1
self.event.clear() # -->清除事件,以方便下次读取
else:
self.proto_json["port"] += 1
class UDPThread(Thread):
def __init__(self, queue, tevent):
Thread.__init__(self)
self.queue = queue
self.event = tevent
def run(self):
while True:
try:
data = self.queue.get(timeout=3)
except queue.Empty:
# 队列为空,说明TCP线程已经执行完毕
break
sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
payload = Utils.random_string(64)
sock.sendto(payload.encode(), (data["server"], data["port"]))
retrans = 0
timeout = 0.5
while True:
if retrans >= 3:
print("port %s unreachable" % data["port"])
break
sock.settimeout(timeout)
try:
data_recv, addr = sock.recvfrom(65535)
if data_recv == payload.encode() and addr[1] == data["port"]:
break
except socket.timeout:
sock.sendto(payload.encode(), (data["server"], data["port"]))
retrans += 1
except Exception as e:
continue
self.event.set() # --> 发送通知事件,通知TCPThread干活
class Utils(object):
@staticmethod
def random_string(length):
return ''.join(random.choice(string.ascii_letters + string.digits) for m in range(length))
@staticmethod
def signal_handler(signal, frame):
os._exit(0)
def h():
print(""" usage:""")
print(""" this_program <dest_ip> <dest_port> <test_ports>""")
print('')
print(" examples:")
print(" ./udpclient.py 192.168.1.1 4000 '5000,6000,7000-8000'")
print('')
if __name__ == '__main__':
if len(sys.argv) != 3 and len(sys.argv) != 4:
h()
exit()
signal.signal(signal.SIGINT, Utils.signal_handler)
server = socket.gethostbyname(sys.argv[1])
port = int(sys.argv[2])
testports = sys.argv[3]
q = queue.Queue()
event = Event()
tcpthread = TCPThread(q, event)
tcpthread.setserver(server, port)
tcpthread.settestports(testports)
udpthread = UDPThread(q, event)
tcpthread.start()
udpthread.start()
tcpthread.join()
udpthread.join()
print('')
已知问题
- 程序一旦执行,不能使用Ctrl+C来停止,因为使用了多线程,不接收Ctrl+C信号,有解决方法。又不是不能用,就这样吧
- 服务端缺少重发机制,当服务端发给客户端的回应包丢包,则显示这个端口不可达(客户端有重发机制,当客户端到服务端的包丢包,会重发3次)