Traffic Control (TC) 简介和使用指南

守望人间 / 2023-08-12 / 原文

Traffic Control (TC) 简介和使用指南

Traffic Control(TC)是一个在 Linux 操作系统中用于控制网络流量的工具,允许对网络流量进行限速、排队、分流以及其他管理操作。TC 用于实现 QoS(Quality of Service)和流量整形,能够更好地控制网络资源和提供更好的用户体验。我们主要用于网损控制,测试设备在低质量网络下的使用情况。

基本概念

Queue Discipline (qdisc)

  • Qdisc 是用于控制数据包排队和处理的一种机制。不同类型的 qdisc 可以用来实现不同的流量控制策略。
  • 常见的 qdisc 类型包括 pfifo、bfifo、htb、fq_codel 等。

Class

  • 在 htb(Hierarchical Token Bucket)qdisc 中,class 用于对数据包进行分类和分流。
  • 每个 class 可以设置带宽、优先级等参数。

Filter

  • Filter 用于匹配和分类数据包,根据匹配的条件将数据包分配给相应的 class 进行处理。
  • 通过 filter,可以实现根据源 IP、目标 IP、协议、端口等条件进行流量分流。

基本命令

qdisc

  • tc qdisc add dev <网卡名称> root <qdisc类型> <qdisc参数>:添加根节点的 qdisc。
  • tc qdisc show dev <网卡名称>:查看网卡下的所有 qdiscs
  • tc qdisc del dev <网卡名称> parent <父类别标识符>:删除 qdisc
  • tc qdisc del dev <网卡名称> root:清理所有的设置

class

  • tc class add dev <网卡名称> parent <父类别标识符> classid <类别标识符> <class参数>:添加 class。
  • tc qdisc add dev <网卡名称> parent <父类别标识符> <qdisc类型> <qdisc参数>:在已有的 class 下添加子 qdisc。
  • tc class show dev <网卡名称>:查看网卡下的所有 class
  • tc class del dev <网卡名称> classid <类别标识符>:删除 class

filter

  • tc filter add dev <网卡名称> protocol <协议> prio <优先级> <filter表达式> <action>:添加 filter 规则。
  • tc filter del dev <网卡名称> protocol <协议> prio <优先级>:删除 filter 规则。
  • tc filter show dev <网卡名称>:查看网卡下的所有 filter
  • tc filter del dev <网卡名称> protocol ip prio <优先级> u32 match ip src <ip>/32:删除 filter

示例用法

  1. 限制网卡 eth0 上的带宽为 1Mbps:

    tc qdisc add dev eth0 root handle 1: htb default 10
    tc class add dev eth0 parent 1: classid 1:1 htb rate 1mbit
    
  2. 添加 filter 规则,根据源 IP 分流到不同的 class:

    tc filter add dev eth0 protocol ip prio 1 u32 match ip src 192.168.1.1/32 flowid 1:1
    tc filter add dev eth0 protocol ip prio 2 u32 match ip src 192.168.1.2/32 flowid 1:2
    

注意事项

  1. 需要有一个根 qdisc
  2. 一个 qdisc 可以挂载多个 class
  3. 一个 class 可以挂载多个子 class
  4. 一个 class 只能挂载一个 qdisc
  5. 一个 class 可以挂载多个 filter

参考

  • https://blog.csdn.net/dog250/article/details/40483627
  • ChatGPT

使用 python 设置 TC

import subprocess
from enum import Enum, unique


GLOBAL_LOSS_IP = '0.0.0.0/0'
MAX_PRIO = 65535

# class
LIMIT_CLASS_ADD_CMD = 'tc class add dev {} parent 1:1 classid 1:{} htb rate {}Kbit'
LIMIT_CLASS_CHANGE_CMD = 'tc class change dev {} parent 1:1 classid 1:{} htb rate {}Kbit'
NO_LIMIT_CLASS_CMD = 'tc class add dev {} parent 1:1 classid 1:{} htb rate 10Gbit ceil 20Gbit'
# filter
UP_FILTER_CMD = 'tc filter add dev {} protocol ip parent 1: prio {} u32 match ip dst {} flowid 1:{}'
DOWN_FILTER_CMD = 'tc filter add dev {} protocol ip parent 1: prio {} u32 match ip src {} flowid 1:{}'
# 删除某一 ip 的网损,先删除 filter 和 qdisc,最后删除 class
DEL_QDISC_CMD = 'tc qdisc del dev {} parent 1:{}'
DEL_CLASS_CMD = 'tc class del dev {} classid 1:{}'
DEL_FILTER_CMD = 'tc filter del dev {} protocol ip prio {} u32 match ip src {}/32'

"""
tc 设置 ip 过滤,命令结构:
         root qdisc
             |
  class 1         class 2
     |               |
qdisc filter    qdisc filter
"""


@unique
class TYPE(Enum):
    DELAY = 0
    LOSS = 1
    DUP = 2
    DISORDER = 3
    IMPAIR = 4
    SPEED_LIMIT = 5
    SPEED_LOSS = 6
    PACKAGE_LOSS = 7
    PACKAGE_SPEED = 8
    LOSS_DELAY = 9
    BRUST_LOSS = 10


class Netem:
    """qdisc"""
    LOSS_TYPE_CMD_MAP = {
        TYPE.DELAY.value: 'delay {}ms',
        TYPE.LOSS.value: 'loss {}%',
        TYPE.DUP.value: 'duplicate {}%',
        TYPE.LOSS_DELAY.value: 'delay {}ms loss {}%',
        TYPE.DISORDER.value: 'reorder {}% delay {}ms',
        TYPE.IMPAIR.value: 'corrupt {}%',
        TYPE.BRUST_LOSS.value: 'loss gemodel {}% {}% 100% 0%',
    }

    def __init__(self, interface: str, parent: int, handle: int):
        self.interface = interface
        self.parent = parent
        self.handle = handle
        self.base_cmd = 'tc qdisc {} dev {} parent 1:{} handle {}: netem '
        self.loss_config_map = {}

    def generate_cmd(self, action: str, network_loss_type: int, values: tuple):
        base_cmd = self.base_cmd.format(action, self.interface, self.parent, self.handle)
        loss_cmd = self.LOSS_TYPE_CMD_MAP.get(network_loss_type)
        self.loss_config_map[network_loss_type] = loss_cmd.format(*values)
        return base_cmd + ' '.join(self.loss_config_map.values())

    def add_netem(self, network_loss_type: int, values: tuple):
        cmd = self.generate_cmd('add', network_loss_type, values)
        subprocess.run(cmd, shell=True)

    def change_netem(self, network_loss_type: int, values: tuple):
        cmd = self.generate_cmd('change', network_loss_type, values)
        subprocess.run(cmd, shell=True)

    def del_netem(self):
        cmd = DEL_QDISC_CMD.format(self.interface, self.parent)
        subprocess.run(cmd, shell=True)


class TrafficControlClass:
    """class"""
    def __init__(self, interface: str, handle: int):
        self.interface = interface
        self.handle = handle
        self.child = None

    def set_base_class(self):
        cmd = NO_LIMIT_CLASS_CMD.format(self.interface, self.handle)
        subprocess.run(cmd, shell=True)

    def set_limit_class(self, limit: int):
        cmd = LIMIT_CLASS_ADD_CMD.format(self.interface, self.handle, limit)
        subprocess.run(cmd, shell=True)

    def change_limit_class(self, limit: int):
        cmd = LIMIT_CLASS_CHANGE_CMD.format(self.interface, self.handle, limit)
        subprocess.run(cmd, shell=True)

    def del_class(self):
        cmd = DEL_CLASS_CMD.format(self.interface, self.handle)
        subprocess.run(cmd, shell=True)


class TrafficControlFilter:
    """filter"""
    def __init__(self, interface: str, prio: int, ip: str, flow_class: TrafficControlClass):
        self.interface = interface
        self.prio = prio
        self.ip = ip
        self.flow_class = flow_class

    def set_filter(self):
        # 添加 filter 限制 ip
        # 对源地址和目标地址都添加限制
        cmd = UP_FILTER_CMD.format(self.interface, self.prio, self.ip, self.flow_class.handle)
        subprocess.run(cmd, shell=True)
        cmd = DOWN_FILTER_CMD.format(self.interface, self.prio, self.ip, self.flow_class.handle)
        subprocess.run(cmd, shell=True)

    def del_filter(self):
        if self.ip == GLOBAL_LOSS_IP:
            cmd = DEL_FILTER_CMD.format(self.interface, self.prio, '0.0.0.0')
        else:
            cmd = DEL_FILTER_CMD.format(self.interface, self.prio, self.ip)
        subprocess.run(cmd, shell=True)

    def change_prio(self, prio: int):
        print('change prio: ', prio)
        self.del_filter()
        self.prio = prio
        self.set_filter()


class TrafficControl:
    """Traffic Control"""
    def __init__(self, interface: str = 'eth0', cycle_time: int = 1000):
        self.interface = interface  # 网卡
        self.cycle_time = cycle_time
        self.prio = 1  # qdisc 的 handle
        self.class_handle = 1  # class 的 handle
        self.qdisc_handle = 1  # filter 优先级
        self.ip_filter_map = {}
        self.init_tc()

    def init_tc(self):
        """
        初始化 TC 状态,判断根 qdisc 是否存在,不存在则创建根 qdisc
        """
        print('init_tc')
        # 清理网损
        cmd = 'tc qdisc del dev {} root'.format(self.interface)
        subprocess.run(cmd, shell=True)
        # 初始化参数
        self.qdisc_handle = 1
        self.class_handle = 1
        self.prio = 1
        self.ip_filter_map = {}

        # 创建根 qdisc,r2q 表示没有 default 的 root 使整个网络的带宽没有限制
        cmd = 'tc qdisc add dev {} root handle 1: htb r2q 1'.format(self.interface)
        subprocess.run(cmd, shell=True)
        self.qdisc_handle += 1
        # 创建一个限速非常大 class 用于后面绑定丢包等 netem 类型的 qdisc 和限速
        cmd = 'tc class add dev {} parent 1: classid 1:1 htb rate 10Gbit ceil 20Gbit'.format(self.interface)
        subprocess.run(cmd, shell=True)
        self.class_handle += 1

    def remove_ip_loss(self, ip: str):
        """移除 ip 的网损设置"""
        print('remove_loss: %s' % ip)
        tc_filter = self.ip_filter_map.pop(ip, None)  # type: TrafficControlFilter
        if tc_filter:
            tc_filter.del_filter()
            if tc_filter.flow_class.child:  # type: Netem
                tc_filter.flow_class.child.del_netem()
            tc_filter.flow_class.del_class()

    def process(self, network_loss_type: int, ip: str, data: dict, value=None):
        """
        网损设置
        :param network_loss_type: 网损类型
        :param ip: 设置的 ip
        :param data: 请求数据
        :param value: 周期数据,如果传入 value 代表为周期设置
        :return:
        """
        if not ip:
            ip = GLOBAL_LOSS_IP

        values = None
        if network_loss_type == TYPE.DELAY.value:
            if value is None:
                delay = data.get('delay', 0)
            else:
                delay = value
            values = (delay,)
        elif network_loss_type == TYPE.LOSS.value:
            if value is None:
                loss = data.get('loss', 0)
            else:
                loss = value
            values = (loss,)
        elif network_loss_type == TYPE.DUP.value:
            if value is None:
                dup = data.get('dup', 0)
            else:
                dup = value
            values = (dup,)
        elif network_loss_type == TYPE.DISORDER.value:
            disorder = data['disorder']['value']
            delay = data['disorder']['delay']
            values = (disorder, delay)
        elif network_loss_type == TYPE.IMPAIR.value:
            if value is None:
                impair = data.get('impair')
            else:
                impair = value
            values = (impair,)
        elif network_loss_type == TYPE.SPEED_LIMIT.value:
            if value is None:
                speed = data['speed_limit'].get('speed', 10000000)  # 10_000_000
            else:
                speed = value
            delay = data['speed_limit'].get('delay', 50)
            self.set_speed_limit(ip, speed, delay)
        elif network_loss_type == TYPE.LOSS_DELAY.value:
            delay = data['loss_delay']['delay']
            loss = data['loss_delay']['loss']
            values = (delay, loss)
        elif network_loss_type == TYPE.BRUST_LOSS.value:
            loss = data['brust']['loss']
            package = int(data['brust']['package'])
            p = float(loss) / 100  # p is the probability of transferring from Good State to the bad state
            loss_p = p * package
            r = p / loss_p - p  # r is the probability of transferring from the bad state to the Good
            values = (p * 100, r * 100)
        self.set_base(ip, network_loss_type, values)

    def set_speed_limit(self, ip: str, limit: int or float, latency: int = None):
        """
        设置限速
        :param ip: 网损设置 ip
        :param limit: 限速,单位 Kbit
        :param latency: 延迟,单位 ms
        :return:
        """
        if ip not in self.ip_filter_map:
            # 增加 class 设置限速
            tc_class = TrafficControlClass(self.interface, self.class_handle)
            tc_class.set_limit_class(limit)
            self.class_handle += 1
            # 创建 filter 类
            prio = MAX_PRIO if ip == GLOBAL_LOSS_IP else self.prio  # 保证全局网损的优先级最小
            tc_filter = TrafficControlFilter(self.interface, prio, ip, tc_class)
            tc_filter.set_filter()
            self.ip_filter_map[ip] = tc_filter
            self.prio += 1
        else:
            tc_filter = self.ip_filter_map[ip]
            tc_class = tc_filter.flow_class  # type: TrafficControlClass
            tc_class.change_limit_class(limit)
        if latency:
            self.set_base(ip, TYPE.DELAY.value, (latency,))

    def set_base(self, ip: str, network_loss_type: int, values: tuple = None):
        """
        设置 netem 类型下的基础网损,包括延迟、丢包、重复、乱序、损坏、延迟丢包、burst丢包等
        :param ip: 网损设置 ip
        :param network_loss_type: 网损类型
        :param values: 网损值
        :return:
        """
        if values is not None:
            if ip not in self.ip_filter_map:
                # 如果没有当前 ip 的网损设置,增加 class 和 filter
                # 增加 class 用来挂载 qdisc 设置 netem
                tc_class = TrafficControlClass(self.interface, self.class_handle)
                tc_class.set_base_class()
                self.class_handle += 1
                # 创建 filter 类
                prio = MAX_PRIO if ip == GLOBAL_LOSS_IP else self.prio  # 保证全局网损的优先级最小
                tc_filter = TrafficControlFilter(self.interface, prio, ip, tc_class)
                tc_filter.set_filter()
                self.ip_filter_map[ip] = tc_filter
                self.prio += 1
                netem = Netem(self.interface, tc_class.handle, self.qdisc_handle)
                netem.add_netem(network_loss_type, values)
                tc_class.child = netem
                self.qdisc_handle += 1
            else:
                # 如果存在当前 ip 的网损设置,取出对应的 class、filter 和 qdisc
                tc_filter = self.ip_filter_map[ip]
                tc_class = tc_filter.flow_class  # type: TrafficControlClass
                if tc_class.child:
                    netem = tc_class.child  # type: Netem
                    netem.change_netem(network_loss_type, values)
                else:
                    netem = Netem(self.interface, tc_class.handle, self.qdisc_handle)
                    netem.add_netem(network_loss_type, values)
                    tc_class.child = netem
                    self.qdisc_handle += 1

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.init_tc()


if __name__ == '__main__':
    tc = TrafficControl('eth0')
    # 对全局设置 20ms 延迟
    tc.process(TYPE.DELAY.value, '0.0.0.0/0', {'delay': 20})