python scapy发包、抓包、分析

adam2010 / 2024-10-11 / 原文

import os
import telnetlib
 
 
from scapy.all import *
from scapy.layers.inet import *
from scapy.all import sniff
 
# pac = dir(scapy.layers)
# print(pac) #执行代码后,会输出Scapy中的各层
# print(explore(scapy.layers.l2))  #查看各种二层协议
# print(explore(scapy.packet.ls(Ether)))  #查看协议类的属性
# print(explore(scapy.packet.ls(IP)))
# print(explore(scapy.packet.ls(TCP)))
# print(explore(scapy.packet.ls(UDP)))
 
#构造数据包,该包的结构包含IP部分和TCP部分
# pkt = IP(src="192.168.56.1",dst="192.168.56.100") /TCP()
# pkt.show()
#
# #Ether类用于设置发送方和接收方的 MAC 地址
# pkt = Ether(src="01:80:c2:00:00:14",dst="ff-ff-ff-ff-ff-ff")
# pkt.show()
#
# #构造HTTP
# pkt = IP()/TCP()/"GET /HTTP1.0\r\n\r\n"
 
 
#构造ICMP包,数据包查看
# pkt = IP(dst="192.168.56.100")/ICMP()
# res = sr1(pkt)
# res.show()  #查看数据包
# ls(pkt)  #查看数据包格式
 
 
 
#数据包文件的存储、读取
#pkt1=IP(src="192.168.147.238",dst="192.168.147.215")/TCP()
pkt1=IP(dst="183.240.98.198")/ICMP()
pkt2=IP(dst="183.240.98.198")/ICMP()
 
pkts=[pkt1,pkt2]
wrpcap("./temp.cap",pkts)  #将数据包列表pkts存储在文件中
pkt_list=rdpcap("./temp.cap")  #读取存储数据包的文件,列表格式
print(pkt_list[0].summary())  #第一个数据包的摘要
print(pkt_list[1].show())  #第二个数据包的详细数据
res,unres = sr(pkt_list)
res.summary()  #结果显示在一行
#列出本机接口:
print(conf.ifaces)
#列出本机路由:
print(conf.route)
 
a = traceroute(["www.baidu.com"],maxttl=10)
print(a)
 
# def callback(pkt):
#     print(pkt.summary)
#     wrpcap("test.cap",pkt)
#
# sniff(filter="icmp and 192.168.56.100",
#       iface="VirtualBox Host-Only Ethernet Adapter #2",
#       prn=callback,
#       count=100)
 
 
#####################
#########抓包########
#####################
 
result_list=[]
def callback(pkt):
    # print(pkt.summary)
    # wrpcap("test.cap",pkt)
    print(pkt)
    result_list.append(pkt)
 
sniff(filter="icmp and host 183.240.98.198",
      iface="Intel(R) Dual Band Wireless-AC 8265",
      prn=callback,
      count=5)
 
wrpcap("test.cap",result_list)
 
 
#####################
#########分析########
#####################
 
import pyshark
#import nest_asyncio
 
 
#如果出现 RuntimeError: This event loop is already running,避免报错
#nest_asyncio.apply()
 
#capture = pyshark.FileCapture('test.cap', tshark_path='C:\Program Files\Wireshark\WireShark.exe',  display_filter='http')
capture = pyshark.FileCapture('D:/archive/code/Project/OM-ZX/test.pcapng', tshark_path='D:/Program Files/Wireshark/WireShark.exe')
for pkt in capture:
    if (pkt.ip.get_field_by_showname("Source Address") == '192.168.1.5'):
        print("Source:", pkt.ip.get_field_by_showname("Source Address") + "\tDestination: ",pkt.ip.get_field_by_showname("Destination Address"))
capture = pyshark.FileCapture('test.cap',)
 
# path = r'D:/PyProject/allinone/caps/'
# lists = os.listdir(path)
# print(lists)
# for f in lists:
#     file = path+f
#     print(file)
#     caps = pyshark.FileCapture(file,tshark_path="C:\Program Files\Wireshark\WireShark.exe")
#     for pkt in caps:    #遍历每条数据
#         print(pkt)
#         try:
#             # 里面的layer ETH,layer IP,layer BGP,TCP通过pkt.bgp,pkt.ip,pkt.eth来访稳
#             print("Source:", pkt.ip.get_field_by_showname("Source") + "\tDestination: ",pkt.ip.get_field_by_showname("Destination"))
#             if (pkt.bgp.get_field_by_showname("BGP Identifier") is not None):
#                 print("BGP Identifier:", pkt.bgp.get_field_by_showname("BGP Identifier"))
#         except Exception as e:
#                 print("BGP、IP、ETH、TCP其中一个或者多个不存在")