使用libcorrect 库,实现视频FEC过程记录


使用异或实现冗余包方式,冗余包大,并且很难抵抗连续丢包,这里尝试用里德所罗门编码,实现丢包测试。

如下代码效果:25个包分一组,能任意恢复其中的3个包,冗余包比率为25%,抗丢包能力为10%左右。


呱牛笔记

图仅为一个示例,该情况下可能恢复不出来包。

分块分组实现FEC纠错码,就是为了更大范围的保护数据包,在该包丢失情况下,能被恢复出来。这个思路的理解,对代码实现尤为关键。


组包规则:

每个rtp包每4个字节拆分为1组,每组200个字节,所以理论上可以最多50个包一组,这里设置为25个包一组

#define DATA_PACKETS_PER_BLOCK 25  //25个包,能任意恢复3个包

#define NUM_BLOCKS 350    //350*8 = 1400   350*32=11200  

#define REAL_BLOCK_SIZE 4 // 

#define MAX_RESTORE_PACKET 3

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <errno.h>
#include <time.h>

// 引入 libcorrect 库头文件
#include <correct/correct.h>
#include <correct/reed-solomon.h>

// 定义系统参数
#define RTP_HEADER_SIZE sizeof(RTPHeader)
#define FEC_HEADER_SIZE sizeof(FECHeader)
#define MAX_PACKET_SIZE 1400
#define FEC_PAYLOAD_TYPE 127  // FEC 专用载荷类型
#define DATA_PACKETS_PER_BLOCK 25  // 每块 25 个 RTP 包
#define FEC_PACKETS_PER_BLOCK 1    // 
#define TOTAL_PACKETS_PER_BLOCK (DATA_PACKETS_PER_BLOCK + FEC_PACKETS_PER_BLOCK) // 每块总数据包数
#define MAX_BLOCKS_IN_BUFFER 100   // 最大块缓冲区数量
#define RS_PRIMITIVE_POLYNOMIAL 0x11D  // 原始多项式 (GF(2^8))
#define RS_FIRST_CONSECUTIVE_ROOT 1   // 第一个连续根
#define RS_GENERATOR_ROOT_GAP 2       // 生成器根间隔
#define RS_NUM_ROOTS 32 // 校验符号数 = M=4)

#define MAX_RTP_SIZE MAX_PACKET_SIZE
 
#define BLOCK_SIZE 223           // 保持RS(255,223)标准块
#define SYMBLE_SIZE 32
#define REED_SOLOMON_LEN 255
#define SYMBOL_SIZE MAX_RTP_SIZE // 符号大小=包最大长度

#define K_RS            223     // RS编码消息长度(需K_RS ≥ BLOCK_SIZE)
#define M_RS            32      // RS编码校验长度
#define K DATA_PACKETS_PER_BLOCK       // 原始包数
#define M FEC_PACKETS_PER_BLOCK        // 冗余包数
//#define NUM_BLOCKS 175    //175*8 = 1400   每块 1 个 FEC 包  175*32=5600 = 1400 * 2
//#define REAL_BLOCK_SIZE 8 // 

//#define NUM_BLOCKS 200    //175*8 = 1400   200*32=6400  
//#define REAL_BLOCK_SIZE 7 // 

#define NUM_BLOCKS 350    //350*8 = 1400   350*32=11200  
#define REAL_BLOCK_SIZE 4 // 
#define MAX_RESTORE_PACKET 3
//#define NUM_BLOCKS 35 // 分块数=35 ,200个字节一组,5个rtp一个fec包,每包分成35个块,每块40个字节,一共35个分组 每包35*32个字节 = 1120个字节
 //40*35 = 1400
//#define REAL_BLOCK_SIZE 40
#define FEC_PACKETS_SIZE NUM_BLOCKS*SYMBLE_SIZE
// 定义RS编码块大小
#define RS_MAX_DATA_LENGTH BLOCK_SIZE  // // RS(255,223) 分块大小

// RTP 头部结构定义
typedef struct {
    uint8_t version:2;     // 版本号
    uint8_t padding:1;     // 填充标志
    uint8_t extension:1;   // 扩展标志
    uint8_t cc:4;          // CSRC 计数器
    uint8_t marker:1;      // 标记位
    uint8_t payload:7;     // 载荷类型
    uint16_t seq;          // 序列号
    uint32_t timestamp;    // 时间戳
    uint32_t ssrc;         // 同步源标识符
    // 可选的 CSRC 列表、扩展头等
} __attribute__((packed)) RTPHeader;

// FEC 包头部结构(简化版)
typedef struct {
    uint8_t version:2;     // RTP 版本
    uint8_t padding:1;     // 填充标志
    uint8_t extension:1;   // 扩展标志
    uint8_t cc:4;          // CSRC 计数器
    uint8_t marker:1;      // 标记位
    uint8_t payload:7;     // 载荷类型(FEC 专用)
    uint16_t seq;          // 序列号
    uint32_t timestamp;    // 时间戳
    uint32_t ssrc;         // 同步源标识符
    uint16_t block_id;     // 块 ID
    uint8_t fec_index;     // FEC 包索引
    uint8_t original_packet_count; // 原始数据包数量 
} __attribute__((packed)) FECHeader;

typedef struct { 
    int block_num;  // 块号
	/*
	组合1 200个字节一组,5个rtp一个fec包,每包分成35个块,每块40个字节,一共35个分组 每包35*32个字节 = 1120个字节
	组合2 200个字节一组,25个rtp4个fec包 5600/4 = 1400个字节,每包分成175个块,每块8个字节,一共175个分组 每包175*32个字节 = 5600 字节
	*组合2能恢复2个RTP包
    */
	uint8_t fec_data[NUM_BLOCKS * SYMBLE_SIZE]; 
} __attribute__((packed))  FECPacket;

// 发送缓冲区结构
typedef struct {
    uint8_t *data;         // 数据包数据
    int length;            // 数据包长度
    int seq_num;           // 序列号
    int timestamp;         // 时间戳
    int is_fec;            // 是否为 FEC 包
    int block_id;          // 所属块 ID
    int packet_id;         // 在块中的 ID
} PacketBuffer;


// 编码器上下文
typedef struct {
    correct_reed_solomon *rs;      // libcorrect Reed-Solomon编码器
    PacketBuffer packets[DATA_PACKETS_PER_BLOCK]; // 原始数据包
    FECPacket fecData[M];    // 内存池:  M冗余包
    int packet_count;              // 当前块中的数据包数量
    int next_seq_num;              // 下一个序列号
    int next_block_id;             // 下一个块 ID
    int ssrc;                      // 同步源标识符
    // 网络相关
    int sockfd;                    // 套接字描述符
    struct sockaddr_in dest_addr;  // 目标地址
} EncoderContext;

// 解码器上下文
typedef struct {
    correct_reed_solomon *rs;      // libcorrect Reed-Solomon解码器
    PacketBuffer *received_packets[MAX_BLOCKS_IN_BUFFER]; // 接收的数据包
    int block_map[MAX_BLOCKS_IN_BUFFER];  // 块ID映射
    int block_packet_count[MAX_BLOCKS_IN_BUFFER]; // 每个块收到的包数量
    int block_recovered[MAX_BLOCKS_IN_BUFFER]; // 块是否已恢复
    int ssrc;                      // 同步源标识符
    // 网络相关
    int sockfd;                    // 套接字描述符
    struct sockaddr_in src_addr;   // 源地址
    // 统计信息
    int total_packets;             // 接收的总数据包数
    int recovered_packets;         // 成功恢复的数据包数
    int total_blocks;              // 处理的总块数
    int recovered_blocks;          // 成功恢复的块数
} DecoderContext;
 

void printf_hex(uint8_t *data, int len);

// 初始化编码器
EncoderContext* init_encoder(const char* ip, int port) {
    EncoderContext* ctx = malloc(sizeof(EncoderContext));
    if (!ctx) return NULL;
    
    // 初始化libcorrect Reed-Solomon编码器
    ctx->rs = correct_reed_solomon_create(
        RS_PRIMITIVE_POLYNOMIAL,
        RS_FIRST_CONSECUTIVE_ROOT,
        RS_GENERATOR_ROOT_GAP,
        RS_NUM_ROOTS
    );
    
    if (!ctx->rs) {
        free(ctx);
        return NULL;
    }
	//int k = correct_reed_solomon_message_length(ctx->rs);
	//int m = correct_reed_solomon_block_length(ctx->rs);
	//printf("k:%d, M:%d\r\n", k , m);
    // 验证参数兼容性
    //assert(correct_reed_solomon_message_length(enc->rs) == K);
    //assert(correct_reed_solomon_block_length(enc->rs) == K+M);
	
    ctx->packet_count = 0;
    ctx->next_seq_num = 0;
    ctx->next_block_id = 0;
    ctx->ssrc = rand();
    
    // 创建 UDP 套接字
    ctx->sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    if (ctx->sockfd < 0) {
        perror("socket creation failed");
        correct_reed_solomon_destroy(ctx->rs);
        free(ctx);
        return NULL;
    }
    
    memset(&ctx->dest_addr, 0, sizeof(ctx->dest_addr));
    ctx->dest_addr.sin_family = AF_INET;
    ctx->dest_addr.sin_port = htons(port);
    if (inet_pton(AF_INET, ip, &ctx->dest_addr.sin_addr) <= 0) {
        perror("invalid address");
        close(ctx->sockfd);
        correct_reed_solomon_destroy(ctx->rs);
        free(ctx);
        return NULL;
    }
    
    // 初始化数据包缓冲区
    for (int i = 0; i < DATA_PACKETS_PER_BLOCK; i++) {
        ctx->packets[i].data = NULL;
    } 
    return ctx;
}

// 初始化解码器
DecoderContext* init_decoder(int port) {
    DecoderContext* ctx = malloc(sizeof(DecoderContext));
    if (!ctx) return NULL;
    
    // 初始化libcorrect Reed-Solomon解码器(与编码器参数一致)
    ctx->rs = correct_reed_solomon_create(
        RS_PRIMITIVE_POLYNOMIAL,
        RS_FIRST_CONSECUTIVE_ROOT,
        RS_GENERATOR_ROOT_GAP,
        RS_NUM_ROOTS
    );
    
    if (!ctx->rs) {
        free(ctx);
        return NULL;
    }
    
    // 初始化块映射和计数器
    memset(ctx->block_map, -1, sizeof(ctx->block_map));
    memset(ctx->block_packet_count, 0, sizeof(ctx->block_packet_count));
    memset(ctx->block_recovered, 0, sizeof(ctx->block_recovered));
    
    ctx->ssrc = 0;
    
    // 初始化统计信息
    ctx->total_packets = 0;
    ctx->recovered_packets = 0;
    ctx->total_blocks = 0;
    ctx->recovered_blocks = 0;
    
    // 创建 UDP 套接字
    ctx->sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    if (ctx->sockfd < 0) {
        perror("socket creation failed");
        correct_reed_solomon_destroy(ctx->rs);
        free(ctx);
        return NULL;
    }
    
    memset(&ctx->src_addr, 0, sizeof(ctx->src_addr));
    ctx->src_addr.sin_family = AF_INET;
    ctx->src_addr.sin_addr.s_addr = INADDR_ANY;
    ctx->src_addr.sin_port = htons(port);
    
    if (bind(ctx->sockfd, (const struct sockaddr *)&ctx->src_addr, 
             sizeof(ctx->src_addr)) < 0) {
        perror("bind failed");
        close(ctx->sockfd);
        correct_reed_solomon_destroy(ctx->rs);
        free(ctx);
        return NULL;
    }
    
    // 初始化数据包缓冲区
    for (int i = 0; i < MAX_BLOCKS_IN_BUFFER; i++) {
        ctx->received_packets[i] = (PacketBuffer *)malloc(sizeof(PacketBuffer));
		memset(ctx->received_packets[i], 0x00, sizeof(PacketBuffer));
    }
    
    return ctx;
}

// 清理编码器资源
void cleanup_encoder(EncoderContext* ctx) {
    if (ctx) {
        // 释放数据包缓冲区
        for (int i = 0; i < DATA_PACKETS_PER_BLOCK; i++) {
            if (ctx->packets[i].data) {
                free(ctx->packets[i].data);
            }
        }
         
        if (ctx->rs) correct_reed_solomon_destroy(ctx->rs);
        if (ctx->sockfd >= 0) close(ctx->sockfd);
        free(ctx);
    }
}

// 清理解码器资源
void cleanup_decoder(DecoderContext* ctx) {
    if (ctx) {
        // 释放数据包缓冲区
        for (int i = 0; i < MAX_BLOCKS_IN_BUFFER; i++) {
            if (ctx->received_packets[i]) {
                free(ctx->received_packets[i]);
            }
        }
        
        if (ctx->rs) correct_reed_solomon_destroy(ctx->rs);
        if (ctx->sockfd >= 0) close(ctx->sockfd);
        
        // 打印统计信息
        printf("解码器统计信息:\n");
        printf("  处理的总块数: %d\n", ctx->total_blocks);
        printf("  成功恢复的块数: %d\n", ctx->recovered_blocks);
        printf("  接收的总数据包: %d\n", ctx->total_packets);
        printf("  成功恢复的数据包: %d\n", ctx->recovered_packets);
        printf("  块恢复成功率: %.2f%%\n", 
              (float)ctx->recovered_blocks / ctx->total_blocks * 100);
        printf("  数据包恢复成功率: %.2f%%\n", 
              (float)ctx->recovered_packets / (ctx->total_blocks * DATA_PACKETS_PER_BLOCK) * 100);
        
        free(ctx);
    }
}

// 使用Reed-Solomon码编码并发送RTP包
int encode_and_send(EncoderContext* ctx, const uint8_t* payload, int payload_len, int timestamp) {
    if (!ctx || !payload || payload_len <= 0) {
        errno = EINVAL;
        return -1;
    }
    
    // 确保有效载荷长度不超过最大值
    if (payload_len > MAX_PACKET_SIZE ) {
        payload_len = MAX_PACKET_SIZE;
    }
    
    // 计算所需缓冲区大小
    int packet_size = RTP_HEADER_SIZE + payload_len;
    
    // 分配或调整数据包缓冲区大小
    if (ctx->packets[ctx->packet_count].data == NULL) {
        ctx->packets[ctx->packet_count].data = malloc(packet_size);
        if (!ctx->packets[ctx->packet_count].data) {
            perror("内存分配失败");
            return -1;
        }
    } else if (ctx->packets[ctx->packet_count].length < packet_size) {
        ctx->packets[ctx->packet_count].data = realloc(ctx->packets[ctx->packet_count].data, packet_size);
        if (!ctx->packets[ctx->packet_count].data) {
            perror("内存分配失败");
            return -1;
        }
    }
    
    // 创建 RTP 头部
    RTPHeader header;
    header.version = 2;
    header.padding = 0;
    header.extension = 0;
    header.cc = 0;
    header.marker = 0;
    header.payload = 96;  // 假设默认载荷类型 96
    header.seq = htons(ctx->next_seq_num++);
    header.timestamp = htonl(timestamp);
    header.ssrc = htonl(ctx->ssrc);
    
    // 构建完整的 RTP 包
    memcpy(ctx->packets[ctx->packet_count].data, &header, RTP_HEADER_SIZE);
    memcpy(ctx->packets[ctx->packet_count].data + RTP_HEADER_SIZE, payload, payload_len);
    ctx->packets[ctx->packet_count].length = packet_size;
    ctx->packets[ctx->packet_count].seq_num = ntohs(header.seq);
    ctx->packets[ctx->packet_count].timestamp = ntohl(header.timestamp);
    ctx->packets[ctx->packet_count].is_fec = 0;
    ctx->packets[ctx->packet_count].block_id = ctx->next_block_id;
    ctx->packets[ctx->packet_count].packet_id = ctx->packet_count;
    
    // 发送原始 RTP 包
    ssize_t sent = sendto(ctx->sockfd, 
                         ctx->packets[ctx->packet_count].data, 
                         ctx->packets[ctx->packet_count].length, 
                         0, 
                         (const struct sockaddr *)&ctx->dest_addr, 
                         sizeof(ctx->dest_addr));
    
    if (sent < 0) {
        perror("发送失败");
        return -1;
    }
    
    printf("发送原始包 #%d (块 %d/%d), 长度 %d 字节\n", 
          ctx->packets[ctx->packet_count].seq_num,
          ctx->next_block_id, ctx->packet_count + 1, payload_len);
    
    // 增加包计数
    int current_packet = ctx->packet_count++;
    
    // 当收集到足够的包时,生成并发送 FEC 包
    if (ctx->packet_count == DATA_PACKETS_PER_BLOCK) {
        generate_and_send_fec(ctx);
        ctx->packet_count = 0;
        ctx->next_block_id++;
    }
    
    return current_packet;
}

// 处理变长包:填充至SYMBOL_SIZE
void pad_packet(char *pkt, size_t actual_len) {
    if (actual_len < SYMBOL_SIZE) {
        memset(pkt + actual_len, 0, SYMBOL_SIZE - actual_len);
    }
}

// 使用Reed-Solomon码生成并发送FEC包
void generate_and_send_fec(EncoderContext* ctx) {
    int block_id = ctx->next_block_id;  
	
	uint8_t msg[BLOCK_SIZE];  // 当前块数据(223字节)
	uint8_t encoded[BLOCK_SIZE + SYMBLE_SIZE]; // 编码结果(255字节) 
         
     
	printf("generate_and_send_fec  NUM_BLOCKS:%d\n",  NUM_BLOCKS);
	
	int group_index = 0;
	//35个块*40 ==1400
	//175*8=1400
	for (int block=0; block<NUM_BLOCKS;block++){
		int offset = block * REAL_BLOCK_SIZE;
		int size = REAL_BLOCK_SIZE;
		 
		memset(msg, 0x00, sizeof(msg));
		// 2. 分块处理(每块223字节)-200字节+23字节补0
		//每个包取40个字节,5个rtp包 一共200个字节
		for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++) 
        {
			// 1.准备数据
			uint8_t *full_data = ctx->packets[i].data + RTP_HEADER_SIZE; 
			int payload_len = ctx->packets[i].length - RTP_HEADER_SIZE; 
			// 从完整数据中提取当前块
			if (block == NUM_BLOCKS - 1){
				int reserved = payload_len - REAL_BLOCK_SIZE*block;
				if (reserved < size){
					size = reserved;
				}
			}
			memcpy((void *)msg+i*REAL_BLOCK_SIZE, (void *)full_data + offset, size);
		}
		// RS编码
		ssize_t ret = correct_reed_solomon_encode(ctx->rs, msg, BLOCK_SIZE, encoded);
		if (ret != REED_SOLOMON_LEN){
			printf("correct_reed_solomon_encode not ret:%d\r\n", ret);
		} 
		
		if (block == 0){
			printf_hex(encoded+BLOCK_SIZE, SYMBLE_SIZE);
		}
		FECPacket *hdr = (FECPacket*)&(ctx->fecData[group_index]);
		hdr->block_num = block; // 标记保护的块
		memcpy(hdr->fec_data + block*SYMBLE_SIZE, encoded + BLOCK_SIZE, SYMBLE_SIZE); //SYMBLE_SIZE为32个标记位
	}
  
    
    for (int fec_idx = 0; fec_idx < FEC_PACKETS_PER_BLOCK; fec_idx++) {
		FECPacket *fecPacket = (FECPacket*)&(ctx->fecData[fec_idx]);
        // 创建FEC头部
        FECHeader fec_header;
        fec_header.version = 2;
        fec_header.padding = 0;
        fec_header.extension = 0;
        fec_header.cc = 0;
        fec_header.marker = 0;
        fec_header.payload = FEC_PAYLOAD_TYPE;
        fec_header.seq = htons(ctx->next_seq_num++);
        fec_header.timestamp = htonl(ctx->packets[0].timestamp);
        fec_header.ssrc = htons(ctx->ssrc);
        fec_header.block_id = htons(block_id);
        fec_header.fec_index = fec_idx;
        fec_header.original_packet_count = DATA_PACKETS_PER_BLOCK;
		int fec_packet_len = FEC_HEADER_SIZE +NUM_BLOCKS * SYMBLE_SIZE;  //sizeof(FECPacket) 
         
        // 计算FEC包总长度
		printf("fec_idx:%d, fec_packet_len:%d,sizeof(FECPacket):%d\n", fec_idx, fec_packet_len,sizeof(FECPacket));
        uint8_t* fec_packet = malloc(fec_packet_len);
        if (!fec_packet) {
            perror("FEC包内存分配失败");
            continue;
        }

        // 复制头部数据
        memcpy(fec_packet, (void *)&fec_header, FEC_HEADER_SIZE);
        uint8_t *fec_data = fecPacket->fec_data;
		memcpy((uint8_t *)(fec_packet+FEC_HEADER_SIZE), fec_data, NUM_BLOCKS * SYMBLE_SIZE);
  
        // 发送FEC包
        ssize_t sent = sendto(ctx->sockfd, fec_packet, fec_packet_len, 0, 
                            (const struct sockaddr *)&ctx->dest_addr, 
                            sizeof(ctx->dest_addr));
        
        if (sent < 0) {
            perror("FEC包发送失败");
        } else {
            printf("发送FEC包 #%d (块 %d/%d), 保护 %d 个RTP包\n", 
                  ntohs(fec_header.seq), block_id, fec_idx + 1, DATA_PACKETS_PER_BLOCK);
        }
        
        free(fec_packet);
    } 
    
    // 重置包计数器,准备下一个块
    ctx->packet_count = 0;
    ctx->next_block_id++;
}

//MAX_PACKET_SIZE +  RTP_HEADER_SIZE + 1
#define MAX_UDP_PACKET NUM_BLOCKS*32+120

// 使用Reed-Solomon码接收并解码RTP包
int receive_and_decode(DecoderContext* ctx, uint8_t** output_buffer, int* output_len, int* seq_num) {
    if (!ctx || !output_buffer || !output_len || !seq_num) {
        errno = EINVAL;
        return -1;
    }
    
    uint8_t buffer[MAX_UDP_PACKET];
    socklen_t addr_len = sizeof(ctx->src_addr);
    
    // 接收数据包
    int recv_len = recvfrom(ctx->sockfd, buffer, MAX_UDP_PACKET, 0, 
                           (struct sockaddr *)&ctx->src_addr, &addr_len);
    if (recv_len < 0) {
        perror("recvfrom failed");
        return -1;
    }
    
    // 解析RTP头部
    RTPHeader* rtp_header = (RTPHeader*)buffer;
    uint16_t packet_seq = ntohs(rtp_header->seq);
    
    // 检查是否是FEC包
    int is_fec = (rtp_header->payload == FEC_PAYLOAD_TYPE);
    
    // 计算块ID和包ID
    int block_id = 0;
    int packet_id = 0;
    
    if (is_fec) {
        // 解析FEC头部
        FECHeader* fec_header = (FECHeader*)buffer;
        packet_id = packet_seq % TOTAL_PACKETS_PER_BLOCK;
        block_id = ntohs(fec_header->block_id);
        
        
        // 计算FEC包在块中的索引
        int fec_index =  fec_header->fec_index;
        
        printf("接收到FEC包 #%d (块 %d) %d len:%d\n", 
              packet_seq, block_id, fec_index, recv_len);
    } else {
        // 原始 RTP 包
        block_id = packet_seq / TOTAL_PACKETS_PER_BLOCK;
        packet_id = packet_seq % TOTAL_PACKETS_PER_BLOCK;
        
        printf("接收到原始包 #%d (块 %d/%d) len:%d\n", 
              packet_seq, block_id, packet_id, recv_len );
    }
     
    
    // 查找或分配块缓冲区
    int block_index = packet_id; 
	// 验证块ID是否有效
	if (block_index < 0 || block_index >= MAX_BLOCKS_IN_BUFFER) {
		printf("警告: 无效的包块ID %d\n", block_id);
		return 0;
	}
    
    // 保存数据包,只缓存一组
    int group_index = 0;
    PacketBuffer* packet = ctx->received_packets[block_index];
    
    // 分配或调整缓冲区大小
    if (packet->data == NULL) {
        packet->data = malloc( MAX_UDP_PACKET);//
        if (!packet->data) {
            perror("内存分配失败");
            return -1;
        }
    } else if (packet->length < recv_len) {
        packet->data = realloc(packet->data, recv_len);
        if (!packet->data) {
            perror("[2]内存分配失败");
            return -1;
        }
    }
    
    memcpy(packet->data, buffer, recv_len);
    packet->length = recv_len;
    packet->seq_num = packet_seq;
    packet->timestamp = ntohl(rtp_header->timestamp);
    packet->is_fec = is_fec;
    packet->block_id = block_id;
    packet->packet_id = packet_id;
    
    // 更新接收计数
    ctx->block_packet_count[block_index]++;
    
    // 更新统计信息
    ctx->total_packets++;
    
    // 尝试恢复块数据
    if (block_index >= DATA_PACKETS_PER_BLOCK) {
        int recovered = recover_lost_packets(ctx, block_id);
        if (recovered) {
            ctx->block_recovered[block_index] = 1;
            ctx->recovered_blocks++;
            ctx->recovered_packets += DATA_PACKETS_PER_BLOCK;
            
            printf("成功恢复块 %d 的所有数据包\n", block_id);
            // 返回第一个可用的原始数据包
            for (int i = 0; i < TOTAL_PACKETS_PER_BLOCK; i++) {
                if (ctx->received_packets[i]->data) {
					packet = ctx->received_packets[i];
                    *output_buffer = ctx->received_packets[i]->data + RTP_HEADER_SIZE;
                    *output_len = ctx->received_packets[i]->length - RTP_HEADER_SIZE;
                    *seq_num = ctx->received_packets[i]->seq_num;
                    
					
                    // 标记为已处理
                    free(ctx->received_packets[i]->data);
                    ctx->received_packets[i]->data = NULL;
                     
					 memset((void *)packet, 0x00 ,sizeof(packet));
                }
            }
        }
    }
    
    return 0; // 没有可用的完整数据包
}

void printf_hex(uint8_t *data, int len){
    printf("*****print len:%d***\n", len);
    for (int i=0; i<len; i++){
        printf("%02x", data[i]);
    } 
    printf("******end**\n");
}

// 使用Reed-Solomon码恢复丢失的数据包
int recover_lost_packets(DecoderContext* ctx, int block_index) { 
	int group_index = 0;
	int missing_packets  = 0;
    int received[TOTAL_PACKETS_PER_BLOCK] = {0};
    int erasures[BLOCK_SIZE] = {0}, num_erasures = 0;
	 
	 int random =  rand() % (TOTAL_PACKETS_PER_BLOCK -1);
	 int random2 =   rand() % (TOTAL_PACKETS_PER_BLOCK -1);// random+1;//
	 int random3 =  rand() % (TOTAL_PACKETS_PER_BLOCK -1);
	 
	//random = random2;
	 
    printf("尝试恢复块 %d,%d,%d: %d 的数据包...\n", random, random2, random3, block_index);
    // 检查是否有FEC包可用
    int available_fec_packets = 0;
    int available_fec_packet_index = 0;
    for (int i = 0; i < TOTAL_PACKETS_PER_BLOCK; i++) {
        PacketBuffer* packet = ctx->received_packets[i];
		if (packet->data){
			if (random == i || random2 == i || random3 == i){
				received[i] = 0; 
				uint8_t *data = packet->data+RTP_HEADER_SIZE;
				printf("test lost:%d ,orign data \n", i);
				printf_hex(data, packet->length - RTP_HEADER_SIZE);
				missing_packets++; 
			}else{
				received[i] = 1;
				if (packet->is_fec) {
					available_fec_packets++;
					available_fec_packet_index = i;
					
					//printf("fec lost:%d ,orign data \n", i);
					//printf_hex(packet->data+FEC_HEADER_SIZE, 32);
				}
			}
		}else{
			received[i] = 0; 
		} 
    } 
    
    // 如果FEC包全丢,不进行恢复
    if (available_fec_packets == 0) {
        printf("错误: 没有可用的FEC包,无法恢复\n");
        return 0;
    } 
     
    
    printf("块 %d 有 %d 个丢失的数据包,%d 个可用的FEC包,fec_i:%d\n", 
          block_index, missing_packets, available_fec_packets, available_fec_packet_index);
    
    // 检查是否有足够的FEC包来恢复
    if (missing_packets > MAX_RESTORE_PACKET) {
        printf("错误: 可用FEC包不足,无法恢复 (需要至少 %d 个,实际 %d 个)\n", 
              missing_packets, available_fec_packets);
        return 0;
    } 
      
    uint8_t symbols[REED_SOLOMON_LEN], restored[REED_SOLOMON_LEN];
    PacketBuffer* fec_packet = ctx->received_packets[available_fec_packet_index] ;
	FECHeader* fec_header = (FECHeader*)fec_packet;
    // 1. 分块恢复(共35块,每块40个字节)
	// 2. 分块恢复(共175块,每块8个字节)
    for (int block=0; block<NUM_BLOCKS; block++) {
        int offset = block * REAL_BLOCK_SIZE;
        memset(symbols, 0, REED_SOLOMON_LEN);
        memset(restored, 0, REED_SOLOMON_LEN);
        memset(erasures, 0, BLOCK_SIZE);
		num_erasures = 0;
        // 组包1:收集当前块的原始数据 5个包,每个包40个字节
        // 组包2:收集当前块的原始数据 175个包,每个包8个字节
        for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++) {
            if (received[i] ) {
				uint8_t *data =  ctx->received_packets[i]->data;
			    uint8_t *full_data = data + RTP_HEADER_SIZE; 
                memcpy(symbols + i*REAL_BLOCK_SIZE, full_data + offset, REAL_BLOCK_SIZE);
            }else{
				
				for (int j=0; j<REAL_BLOCK_SIZE; j++){
					erasures[num_erasures++] = i*REAL_BLOCK_SIZE+j;
				}
				//num_erasures += REAL_BLOCK_SIZE;
			}
        }
 
        // 收集当前块的校验数据(1个FEC包) 
        memcpy(symbols + BLOCK_SIZE , (void*)fec_packet->data + FEC_HEADER_SIZE + block*SYMBLE_SIZE, SYMBLE_SIZE);  
        
		if (block == 0){
			
			printf("***************num_erasures:%d data:",num_erasures);
			for (int j=0; j<num_erasures; j++){ 
				printf("%d,",erasures[j]);
			}
			printf("****\n",num_erasures);
			printf_hex(symbols, REED_SOLOMON_LEN);
		}
        // RS解码
        int success = correct_reed_solomon_decode_with_erasures(
            ctx->rs, 
            symbols, 
			REED_SOLOMON_LEN,
            erasures, 
            num_erasures, 
            restored
        );
		if (block == 0){
		 printf_hex(restored, REED_SOLOMON_LEN);
		 printf("#########success:%d#############\n",success);
		}

        // 3. 回写恢复的原始数据 
        if (success) {
			for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++) {  
				if (!received[i]) { 
				    PacketBuffer *packetbuffer = ctx->received_packets[i] ;
					if (packetbuffer && packetbuffer->data == NULL){
						packetbuffer->data = (uint8_t *)malloc(MAX_PACKET_SIZE + RTP_HEADER_SIZE);
					}  
					uint8_t *full_data = packetbuffer->data + RTP_HEADER_SIZE; 
                    memcpy(full_data + offset, restored + i*REAL_BLOCK_SIZE, REAL_BLOCK_SIZE);
				}
			}  
        }
    }  
    
	for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++) {  
		if (!received[i]) { 
		    int missing_idx= i;
			if (ctx->received_packets[missing_idx]->data == NULL){
				ctx->received_packets[missing_idx]->data = (uint8_t *)malloc(RTP_HEADER_SIZE+MAX_PACKET_SIZE);
			}
			// 创建恢复包的头部
			RTPHeader* recovered_header = (RTPHeader*)ctx->received_packets[missing_idx]->data;  
			 
			// 复制RTP头部(从第一个可用的包复制)
			RTPHeader* header = NULL;
			for (int k = 0; k < DATA_PACKETS_PER_BLOCK; k++) {
				if (ctx->received_packets[k]->data) {
					header = (RTPHeader*)ctx->received_packets[k]->data;
					break;
				}
			}
			*recovered_header = *header;
			ctx->received_packets[missing_idx]->length = RTP_HEADER_SIZE + MAX_PACKET_SIZE;
			ctx->received_packets[missing_idx]->seq_num = recovered_header->seq;
			ctx->received_packets[missing_idx]->timestamp = ntohl(recovered_header->timestamp);
			ctx->received_packets[missing_idx]->is_fec = 0;
			ctx->received_packets[missing_idx]->block_id = fec_header->block_id;
			ctx->received_packets[missing_idx]->packet_id = missing_idx;
			
			uint8_t *data = ctx->received_packets[missing_idx]->data + RTP_HEADER_SIZE;
			printf_hex(data, MAX_PACKET_SIZE);
			printf("成功恢复块 %d 的数据包 #%d\n", fec_header->block_id, i);
		}
	} 
    printf("块 %d 的恢复尝试完成: 成功\n", fec_header->block_id);
    return 1;
}

// 编码器主函数示例
void encoder_main(const char* dest_ip, int dest_port) {
    EncoderContext* ctx = init_encoder(dest_ip, dest_port);
    if (!ctx) {
        printf("编码器初始化失败\n");
        return;
    }
    
    printf("编码器开始发送数据到 %s:%d...\n", dest_ip, dest_port);
    
    // 模拟发送 100 个 RTP 包
    for (int i = 0; i < 100; i++) {
        // 创建随机数据作为载荷
        uint8_t payload[MAX_PACKET_SIZE];
        for (int j = 0; j < sizeof(payload); j++) {
            payload[j] = rand() % 256;
        }
        
        // 编码并发送
        encode_and_send(ctx, payload, sizeof(payload), i * 90);
        
        // 控制发送速率
        usleep(10000); // 10ms
    }
    
    cleanup_encoder(ctx);
}

// 解码器主函数示例
void decoder_main(int listen_port) {
    DecoderContext* ctx = init_decoder(listen_port);
    if (!ctx) {
        printf("解码器初始化失败\n");
        return;
    }
    
    printf("解码器开始在端口 %d 接收数据...\n", listen_port);
    
    uint8_t* output_buffer = NULL;
    int output_len = 0;
    int seq_num = 0;
    
    // 持续接收和解码数据包
    while (1) {
        int result = receive_and_decode(ctx, &output_buffer, &output_len, &seq_num);
        if (result > 0) {
            printf("处理接收到的数据包 #%d, 长度 %d 字节\n", seq_num, output_len);
            
            // 这里可以处理接收到的数据,例如写入文件或播放
            // 示例中只是简单打印信息
            
            // 注意:output_buffer 指向的是 ctx 内部缓冲区,
            // 在下一次调用 receive_and_decode 前保持有效
        } else if (result < 0) {
            printf("接收数据时发生错误\n");
            break;
        }
        
        // 控制循环速率
        usleep(1000); // 1ms
    }
    
    cleanup_decoder(ctx);
}

int main(int argc, char* argv[]) {
    // 初始化随机数生成器
    srand(time(NULL));
    
    if (argc < 2) {
        printf("用法: %s [encoder|decoder] [参数...]\n", argv[0]);
        printf("编码器: %s encoder <目标IP> <目标端口>\n", argv[0]);
        printf("解码器: %s decoder <监听端口>\n", argv[0]);
        return 1;
    }
    
    if (strcmp(argv[1], "encoder") == 0) {
        if (argc < 4) {
            printf("编码器需要目标IP和端口参数\n");
            return 1;
        }
        
        const char* dest_ip = argv[2];
        int dest_port = atoi(argv[3]);
        
        encoder_main(dest_ip, dest_port);
    } else if (strcmp(argv[1], "decoder") == 0) {
        if (argc < 3) {
            printf("解码器需要监听端口参数\n");
            return 1;
        }
        
        int listen_port = atoi(argv[2]);
        
        decoder_main(listen_port);
    } else {
        printf("未知命令: %s\n", argv[1]);
        return 1;
    }
    
    return 0;
}


测试:

接收端:

./testc  decoder 5000


发送端:

./testc  encoder 127.0.0.1 5000

-------------------广告线---------------
项目、合作,欢迎勾搭,邮箱:promall@qq.com


本文为呱牛笔记原创文章,转载无需和我联系,但请注明来自呱牛笔记 ,it3q.com

请先登录后发表评论
  • 最新评论
  • 总共0条评论