使用异或实现冗余包方式,冗余包大,并且很难抵抗连续丢包,这里尝试用里德所罗门编码,实现丢包测试。
如下代码效果:25个包分一组,能任意恢复其中的3个包,冗余包比率为25%,抗丢包能力为10%左右。
图仅为一个示例,该情况下可能恢复不出来包。
分块分组实现FEC纠错码,就是为了更大范围的保护数据包,在该包丢失情况下,能被恢复出来。这个思路的理解,对代码实现尤为关键。
组包规则:
每个rtp包每4个字节拆分为1组,每组200个字节,所以理论上可以最多50个包一组,这里设置为25个包一组
#define DATA_PACKETS_PER_BLOCK 25 //25个包,能任意恢复3个包
#define NUM_BLOCKS 350 //350*8 = 1400 350*32=11200
#define REAL_BLOCK_SIZE 4 //
#define MAX_RESTORE_PACKET 3
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <stdint.h> #include <unistd.h> #include <arpa/inet.h> #include <errno.h> #include <time.h> // 引入 libcorrect 库头文件 #include <correct/correct.h> #include <correct/reed-solomon.h> // 定义系统参数 #define RTP_HEADER_SIZE sizeof(RTPHeader) #define FEC_HEADER_SIZE sizeof(FECHeader) #define MAX_PACKET_SIZE 1400 #define FEC_PAYLOAD_TYPE 127 // FEC 专用载荷类型 #define DATA_PACKETS_PER_BLOCK 25 // 每块 25 个 RTP 包 #define FEC_PACKETS_PER_BLOCK 1 // #define TOTAL_PACKETS_PER_BLOCK (DATA_PACKETS_PER_BLOCK + FEC_PACKETS_PER_BLOCK) // 每块总数据包数 #define MAX_BLOCKS_IN_BUFFER 100 // 最大块缓冲区数量 #define RS_PRIMITIVE_POLYNOMIAL 0x11D // 原始多项式 (GF(2^8)) #define RS_FIRST_CONSECUTIVE_ROOT 1 // 第一个连续根 #define RS_GENERATOR_ROOT_GAP 2 // 生成器根间隔 #define RS_NUM_ROOTS 32 // 校验符号数 = M=4) #define MAX_RTP_SIZE MAX_PACKET_SIZE #define BLOCK_SIZE 223 // 保持RS(255,223)标准块 #define SYMBLE_SIZE 32 #define REED_SOLOMON_LEN 255 #define SYMBOL_SIZE MAX_RTP_SIZE // 符号大小=包最大长度 #define K_RS 223 // RS编码消息长度(需K_RS ≥ BLOCK_SIZE) #define M_RS 32 // RS编码校验长度 #define K DATA_PACKETS_PER_BLOCK // 原始包数 #define M FEC_PACKETS_PER_BLOCK // 冗余包数 //#define NUM_BLOCKS 175 //175*8 = 1400 每块 1 个 FEC 包 175*32=5600 = 1400 * 2 //#define REAL_BLOCK_SIZE 8 // //#define NUM_BLOCKS 200 //175*8 = 1400 200*32=6400 //#define REAL_BLOCK_SIZE 7 // #define NUM_BLOCKS 350 //350*8 = 1400 350*32=11200 #define REAL_BLOCK_SIZE 4 // #define MAX_RESTORE_PACKET 3 //#define NUM_BLOCKS 35 // 分块数=35 ,200个字节一组,5个rtp一个fec包,每包分成35个块,每块40个字节,一共35个分组 每包35*32个字节 = 1120个字节 //40*35 = 1400 //#define REAL_BLOCK_SIZE 40 #define FEC_PACKETS_SIZE NUM_BLOCKS*SYMBLE_SIZE // 定义RS编码块大小 #define RS_MAX_DATA_LENGTH BLOCK_SIZE // // RS(255,223) 分块大小 // RTP 头部结构定义 typedef struct { uint8_t version:2; // 版本号 uint8_t padding:1; // 填充标志 uint8_t extension:1; // 扩展标志 uint8_t cc:4; // CSRC 计数器 uint8_t marker:1; // 标记位 uint8_t payload:7; // 载荷类型 uint16_t seq; // 序列号 uint32_t timestamp; // 时间戳 uint32_t ssrc; // 同步源标识符 // 可选的 CSRC 列表、扩展头等 } __attribute__((packed)) RTPHeader; // FEC 包头部结构(简化版) typedef struct { uint8_t version:2; // RTP 版本 uint8_t padding:1; // 填充标志 uint8_t extension:1; // 扩展标志 uint8_t cc:4; // CSRC 计数器 uint8_t marker:1; // 标记位 uint8_t payload:7; // 载荷类型(FEC 专用) uint16_t seq; // 序列号 uint32_t timestamp; // 时间戳 uint32_t ssrc; // 同步源标识符 uint16_t block_id; // 块 ID uint8_t fec_index; // FEC 包索引 uint8_t original_packet_count; // 原始数据包数量 } __attribute__((packed)) FECHeader; typedef struct { int block_num; // 块号 /* 组合1 200个字节一组,5个rtp一个fec包,每包分成35个块,每块40个字节,一共35个分组 每包35*32个字节 = 1120个字节 组合2 200个字节一组,25个rtp4个fec包 5600/4 = 1400个字节,每包分成175个块,每块8个字节,一共175个分组 每包175*32个字节 = 5600 字节 *组合2能恢复2个RTP包 */ uint8_t fec_data[NUM_BLOCKS * SYMBLE_SIZE]; } __attribute__((packed)) FECPacket; // 发送缓冲区结构 typedef struct { uint8_t *data; // 数据包数据 int length; // 数据包长度 int seq_num; // 序列号 int timestamp; // 时间戳 int is_fec; // 是否为 FEC 包 int block_id; // 所属块 ID int packet_id; // 在块中的 ID } PacketBuffer; // 编码器上下文 typedef struct { correct_reed_solomon *rs; // libcorrect Reed-Solomon编码器 PacketBuffer packets[DATA_PACKETS_PER_BLOCK]; // 原始数据包 FECPacket fecData[M]; // 内存池: M冗余包 int packet_count; // 当前块中的数据包数量 int next_seq_num; // 下一个序列号 int next_block_id; // 下一个块 ID int ssrc; // 同步源标识符 // 网络相关 int sockfd; // 套接字描述符 struct sockaddr_in dest_addr; // 目标地址 } EncoderContext; // 解码器上下文 typedef struct { correct_reed_solomon *rs; // libcorrect Reed-Solomon解码器 PacketBuffer *received_packets[MAX_BLOCKS_IN_BUFFER]; // 接收的数据包 int block_map[MAX_BLOCKS_IN_BUFFER]; // 块ID映射 int block_packet_count[MAX_BLOCKS_IN_BUFFER]; // 每个块收到的包数量 int block_recovered[MAX_BLOCKS_IN_BUFFER]; // 块是否已恢复 int ssrc; // 同步源标识符 // 网络相关 int sockfd; // 套接字描述符 struct sockaddr_in src_addr; // 源地址 // 统计信息 int total_packets; // 接收的总数据包数 int recovered_packets; // 成功恢复的数据包数 int total_blocks; // 处理的总块数 int recovered_blocks; // 成功恢复的块数 } DecoderContext; void printf_hex(uint8_t *data, int len); // 初始化编码器 EncoderContext* init_encoder(const char* ip, int port) { EncoderContext* ctx = malloc(sizeof(EncoderContext)); if (!ctx) return NULL; // 初始化libcorrect Reed-Solomon编码器 ctx->rs = correct_reed_solomon_create( RS_PRIMITIVE_POLYNOMIAL, RS_FIRST_CONSECUTIVE_ROOT, RS_GENERATOR_ROOT_GAP, RS_NUM_ROOTS ); if (!ctx->rs) { free(ctx); return NULL; } //int k = correct_reed_solomon_message_length(ctx->rs); //int m = correct_reed_solomon_block_length(ctx->rs); //printf("k:%d, M:%d\r\n", k , m); // 验证参数兼容性 //assert(correct_reed_solomon_message_length(enc->rs) == K); //assert(correct_reed_solomon_block_length(enc->rs) == K+M); ctx->packet_count = 0; ctx->next_seq_num = 0; ctx->next_block_id = 0; ctx->ssrc = rand(); // 创建 UDP 套接字 ctx->sockfd = socket(AF_INET, SOCK_DGRAM, 0); if (ctx->sockfd < 0) { perror("socket creation failed"); correct_reed_solomon_destroy(ctx->rs); free(ctx); return NULL; } memset(&ctx->dest_addr, 0, sizeof(ctx->dest_addr)); ctx->dest_addr.sin_family = AF_INET; ctx->dest_addr.sin_port = htons(port); if (inet_pton(AF_INET, ip, &ctx->dest_addr.sin_addr) <= 0) { perror("invalid address"); close(ctx->sockfd); correct_reed_solomon_destroy(ctx->rs); free(ctx); return NULL; } // 初始化数据包缓冲区 for (int i = 0; i < DATA_PACKETS_PER_BLOCK; i++) { ctx->packets[i].data = NULL; } return ctx; } // 初始化解码器 DecoderContext* init_decoder(int port) { DecoderContext* ctx = malloc(sizeof(DecoderContext)); if (!ctx) return NULL; // 初始化libcorrect Reed-Solomon解码器(与编码器参数一致) ctx->rs = correct_reed_solomon_create( RS_PRIMITIVE_POLYNOMIAL, RS_FIRST_CONSECUTIVE_ROOT, RS_GENERATOR_ROOT_GAP, RS_NUM_ROOTS ); if (!ctx->rs) { free(ctx); return NULL; } // 初始化块映射和计数器 memset(ctx->block_map, -1, sizeof(ctx->block_map)); memset(ctx->block_packet_count, 0, sizeof(ctx->block_packet_count)); memset(ctx->block_recovered, 0, sizeof(ctx->block_recovered)); ctx->ssrc = 0; // 初始化统计信息 ctx->total_packets = 0; ctx->recovered_packets = 0; ctx->total_blocks = 0; ctx->recovered_blocks = 0; // 创建 UDP 套接字 ctx->sockfd = socket(AF_INET, SOCK_DGRAM, 0); if (ctx->sockfd < 0) { perror("socket creation failed"); correct_reed_solomon_destroy(ctx->rs); free(ctx); return NULL; } memset(&ctx->src_addr, 0, sizeof(ctx->src_addr)); ctx->src_addr.sin_family = AF_INET; ctx->src_addr.sin_addr.s_addr = INADDR_ANY; ctx->src_addr.sin_port = htons(port); if (bind(ctx->sockfd, (const struct sockaddr *)&ctx->src_addr, sizeof(ctx->src_addr)) < 0) { perror("bind failed"); close(ctx->sockfd); correct_reed_solomon_destroy(ctx->rs); free(ctx); return NULL; } // 初始化数据包缓冲区 for (int i = 0; i < MAX_BLOCKS_IN_BUFFER; i++) { ctx->received_packets[i] = (PacketBuffer *)malloc(sizeof(PacketBuffer)); memset(ctx->received_packets[i], 0x00, sizeof(PacketBuffer)); } return ctx; } // 清理编码器资源 void cleanup_encoder(EncoderContext* ctx) { if (ctx) { // 释放数据包缓冲区 for (int i = 0; i < DATA_PACKETS_PER_BLOCK; i++) { if (ctx->packets[i].data) { free(ctx->packets[i].data); } } if (ctx->rs) correct_reed_solomon_destroy(ctx->rs); if (ctx->sockfd >= 0) close(ctx->sockfd); free(ctx); } } // 清理解码器资源 void cleanup_decoder(DecoderContext* ctx) { if (ctx) { // 释放数据包缓冲区 for (int i = 0; i < MAX_BLOCKS_IN_BUFFER; i++) { if (ctx->received_packets[i]) { free(ctx->received_packets[i]); } } if (ctx->rs) correct_reed_solomon_destroy(ctx->rs); if (ctx->sockfd >= 0) close(ctx->sockfd); // 打印统计信息 printf("解码器统计信息:\n"); printf(" 处理的总块数: %d\n", ctx->total_blocks); printf(" 成功恢复的块数: %d\n", ctx->recovered_blocks); printf(" 接收的总数据包: %d\n", ctx->total_packets); printf(" 成功恢复的数据包: %d\n", ctx->recovered_packets); printf(" 块恢复成功率: %.2f%%\n", (float)ctx->recovered_blocks / ctx->total_blocks * 100); printf(" 数据包恢复成功率: %.2f%%\n", (float)ctx->recovered_packets / (ctx->total_blocks * DATA_PACKETS_PER_BLOCK) * 100); free(ctx); } } // 使用Reed-Solomon码编码并发送RTP包 int encode_and_send(EncoderContext* ctx, const uint8_t* payload, int payload_len, int timestamp) { if (!ctx || !payload || payload_len <= 0) { errno = EINVAL; return -1; } // 确保有效载荷长度不超过最大值 if (payload_len > MAX_PACKET_SIZE ) { payload_len = MAX_PACKET_SIZE; } // 计算所需缓冲区大小 int packet_size = RTP_HEADER_SIZE + payload_len; // 分配或调整数据包缓冲区大小 if (ctx->packets[ctx->packet_count].data == NULL) { ctx->packets[ctx->packet_count].data = malloc(packet_size); if (!ctx->packets[ctx->packet_count].data) { perror("内存分配失败"); return -1; } } else if (ctx->packets[ctx->packet_count].length < packet_size) { ctx->packets[ctx->packet_count].data = realloc(ctx->packets[ctx->packet_count].data, packet_size); if (!ctx->packets[ctx->packet_count].data) { perror("内存分配失败"); return -1; } } // 创建 RTP 头部 RTPHeader header; header.version = 2; header.padding = 0; header.extension = 0; header.cc = 0; header.marker = 0; header.payload = 96; // 假设默认载荷类型 96 header.seq = htons(ctx->next_seq_num++); header.timestamp = htonl(timestamp); header.ssrc = htonl(ctx->ssrc); // 构建完整的 RTP 包 memcpy(ctx->packets[ctx->packet_count].data, &header, RTP_HEADER_SIZE); memcpy(ctx->packets[ctx->packet_count].data + RTP_HEADER_SIZE, payload, payload_len); ctx->packets[ctx->packet_count].length = packet_size; ctx->packets[ctx->packet_count].seq_num = ntohs(header.seq); ctx->packets[ctx->packet_count].timestamp = ntohl(header.timestamp); ctx->packets[ctx->packet_count].is_fec = 0; ctx->packets[ctx->packet_count].block_id = ctx->next_block_id; ctx->packets[ctx->packet_count].packet_id = ctx->packet_count; // 发送原始 RTP 包 ssize_t sent = sendto(ctx->sockfd, ctx->packets[ctx->packet_count].data, ctx->packets[ctx->packet_count].length, 0, (const struct sockaddr *)&ctx->dest_addr, sizeof(ctx->dest_addr)); if (sent < 0) { perror("发送失败"); return -1; } printf("发送原始包 #%d (块 %d/%d), 长度 %d 字节\n", ctx->packets[ctx->packet_count].seq_num, ctx->next_block_id, ctx->packet_count + 1, payload_len); // 增加包计数 int current_packet = ctx->packet_count++; // 当收集到足够的包时,生成并发送 FEC 包 if (ctx->packet_count == DATA_PACKETS_PER_BLOCK) { generate_and_send_fec(ctx); ctx->packet_count = 0; ctx->next_block_id++; } return current_packet; } // 处理变长包:填充至SYMBOL_SIZE void pad_packet(char *pkt, size_t actual_len) { if (actual_len < SYMBOL_SIZE) { memset(pkt + actual_len, 0, SYMBOL_SIZE - actual_len); } } // 使用Reed-Solomon码生成并发送FEC包 void generate_and_send_fec(EncoderContext* ctx) { int block_id = ctx->next_block_id; uint8_t msg[BLOCK_SIZE]; // 当前块数据(223字节) uint8_t encoded[BLOCK_SIZE + SYMBLE_SIZE]; // 编码结果(255字节) printf("generate_and_send_fec NUM_BLOCKS:%d\n", NUM_BLOCKS); int group_index = 0; //35个块*40 ==1400 //175*8=1400 for (int block=0; block<NUM_BLOCKS;block++){ int offset = block * REAL_BLOCK_SIZE; int size = REAL_BLOCK_SIZE; memset(msg, 0x00, sizeof(msg)); // 2. 分块处理(每块223字节)-200字节+23字节补0 //每个包取40个字节,5个rtp包 一共200个字节 for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++) { // 1.准备数据 uint8_t *full_data = ctx->packets[i].data + RTP_HEADER_SIZE; int payload_len = ctx->packets[i].length - RTP_HEADER_SIZE; // 从完整数据中提取当前块 if (block == NUM_BLOCKS - 1){ int reserved = payload_len - REAL_BLOCK_SIZE*block; if (reserved < size){ size = reserved; } } memcpy((void *)msg+i*REAL_BLOCK_SIZE, (void *)full_data + offset, size); } // RS编码 ssize_t ret = correct_reed_solomon_encode(ctx->rs, msg, BLOCK_SIZE, encoded); if (ret != REED_SOLOMON_LEN){ printf("correct_reed_solomon_encode not ret:%d\r\n", ret); } if (block == 0){ printf_hex(encoded+BLOCK_SIZE, SYMBLE_SIZE); } FECPacket *hdr = (FECPacket*)&(ctx->fecData[group_index]); hdr->block_num = block; // 标记保护的块 memcpy(hdr->fec_data + block*SYMBLE_SIZE, encoded + BLOCK_SIZE, SYMBLE_SIZE); //SYMBLE_SIZE为32个标记位 } for (int fec_idx = 0; fec_idx < FEC_PACKETS_PER_BLOCK; fec_idx++) { FECPacket *fecPacket = (FECPacket*)&(ctx->fecData[fec_idx]); // 创建FEC头部 FECHeader fec_header; fec_header.version = 2; fec_header.padding = 0; fec_header.extension = 0; fec_header.cc = 0; fec_header.marker = 0; fec_header.payload = FEC_PAYLOAD_TYPE; fec_header.seq = htons(ctx->next_seq_num++); fec_header.timestamp = htonl(ctx->packets[0].timestamp); fec_header.ssrc = htons(ctx->ssrc); fec_header.block_id = htons(block_id); fec_header.fec_index = fec_idx; fec_header.original_packet_count = DATA_PACKETS_PER_BLOCK; int fec_packet_len = FEC_HEADER_SIZE +NUM_BLOCKS * SYMBLE_SIZE; //sizeof(FECPacket) // 计算FEC包总长度 printf("fec_idx:%d, fec_packet_len:%d,sizeof(FECPacket):%d\n", fec_idx, fec_packet_len,sizeof(FECPacket)); uint8_t* fec_packet = malloc(fec_packet_len); if (!fec_packet) { perror("FEC包内存分配失败"); continue; } // 复制头部数据 memcpy(fec_packet, (void *)&fec_header, FEC_HEADER_SIZE); uint8_t *fec_data = fecPacket->fec_data; memcpy((uint8_t *)(fec_packet+FEC_HEADER_SIZE), fec_data, NUM_BLOCKS * SYMBLE_SIZE); // 发送FEC包 ssize_t sent = sendto(ctx->sockfd, fec_packet, fec_packet_len, 0, (const struct sockaddr *)&ctx->dest_addr, sizeof(ctx->dest_addr)); if (sent < 0) { perror("FEC包发送失败"); } else { printf("发送FEC包 #%d (块 %d/%d), 保护 %d 个RTP包\n", ntohs(fec_header.seq), block_id, fec_idx + 1, DATA_PACKETS_PER_BLOCK); } free(fec_packet); } // 重置包计数器,准备下一个块 ctx->packet_count = 0; ctx->next_block_id++; } //MAX_PACKET_SIZE + RTP_HEADER_SIZE + 1 #define MAX_UDP_PACKET NUM_BLOCKS*32+120 // 使用Reed-Solomon码接收并解码RTP包 int receive_and_decode(DecoderContext* ctx, uint8_t** output_buffer, int* output_len, int* seq_num) { if (!ctx || !output_buffer || !output_len || !seq_num) { errno = EINVAL; return -1; } uint8_t buffer[MAX_UDP_PACKET]; socklen_t addr_len = sizeof(ctx->src_addr); // 接收数据包 int recv_len = recvfrom(ctx->sockfd, buffer, MAX_UDP_PACKET, 0, (struct sockaddr *)&ctx->src_addr, &addr_len); if (recv_len < 0) { perror("recvfrom failed"); return -1; } // 解析RTP头部 RTPHeader* rtp_header = (RTPHeader*)buffer; uint16_t packet_seq = ntohs(rtp_header->seq); // 检查是否是FEC包 int is_fec = (rtp_header->payload == FEC_PAYLOAD_TYPE); // 计算块ID和包ID int block_id = 0; int packet_id = 0; if (is_fec) { // 解析FEC头部 FECHeader* fec_header = (FECHeader*)buffer; packet_id = packet_seq % TOTAL_PACKETS_PER_BLOCK; block_id = ntohs(fec_header->block_id); // 计算FEC包在块中的索引 int fec_index = fec_header->fec_index; printf("接收到FEC包 #%d (块 %d) %d len:%d\n", packet_seq, block_id, fec_index, recv_len); } else { // 原始 RTP 包 block_id = packet_seq / TOTAL_PACKETS_PER_BLOCK; packet_id = packet_seq % TOTAL_PACKETS_PER_BLOCK; printf("接收到原始包 #%d (块 %d/%d) len:%d\n", packet_seq, block_id, packet_id, recv_len ); } // 查找或分配块缓冲区 int block_index = packet_id; // 验证块ID是否有效 if (block_index < 0 || block_index >= MAX_BLOCKS_IN_BUFFER) { printf("警告: 无效的包块ID %d\n", block_id); return 0; } // 保存数据包,只缓存一组 int group_index = 0; PacketBuffer* packet = ctx->received_packets[block_index]; // 分配或调整缓冲区大小 if (packet->data == NULL) { packet->data = malloc( MAX_UDP_PACKET);// if (!packet->data) { perror("内存分配失败"); return -1; } } else if (packet->length < recv_len) { packet->data = realloc(packet->data, recv_len); if (!packet->data) { perror("[2]内存分配失败"); return -1; } } memcpy(packet->data, buffer, recv_len); packet->length = recv_len; packet->seq_num = packet_seq; packet->timestamp = ntohl(rtp_header->timestamp); packet->is_fec = is_fec; packet->block_id = block_id; packet->packet_id = packet_id; // 更新接收计数 ctx->block_packet_count[block_index]++; // 更新统计信息 ctx->total_packets++; // 尝试恢复块数据 if (block_index >= DATA_PACKETS_PER_BLOCK) { int recovered = recover_lost_packets(ctx, block_id); if (recovered) { ctx->block_recovered[block_index] = 1; ctx->recovered_blocks++; ctx->recovered_packets += DATA_PACKETS_PER_BLOCK; printf("成功恢复块 %d 的所有数据包\n", block_id); // 返回第一个可用的原始数据包 for (int i = 0; i < TOTAL_PACKETS_PER_BLOCK; i++) { if (ctx->received_packets[i]->data) { packet = ctx->received_packets[i]; *output_buffer = ctx->received_packets[i]->data + RTP_HEADER_SIZE; *output_len = ctx->received_packets[i]->length - RTP_HEADER_SIZE; *seq_num = ctx->received_packets[i]->seq_num; // 标记为已处理 free(ctx->received_packets[i]->data); ctx->received_packets[i]->data = NULL; memset((void *)packet, 0x00 ,sizeof(packet)); } } } } return 0; // 没有可用的完整数据包 } void printf_hex(uint8_t *data, int len){ printf("*****print len:%d***\n", len); for (int i=0; i<len; i++){ printf("%02x", data[i]); } printf("******end**\n"); } // 使用Reed-Solomon码恢复丢失的数据包 int recover_lost_packets(DecoderContext* ctx, int block_index) { int group_index = 0; int missing_packets = 0; int received[TOTAL_PACKETS_PER_BLOCK] = {0}; int erasures[BLOCK_SIZE] = {0}, num_erasures = 0; int random = rand() % (TOTAL_PACKETS_PER_BLOCK -1); int random2 = rand() % (TOTAL_PACKETS_PER_BLOCK -1);// random+1;// int random3 = rand() % (TOTAL_PACKETS_PER_BLOCK -1); //random = random2; printf("尝试恢复块 %d,%d,%d: %d 的数据包...\n", random, random2, random3, block_index); // 检查是否有FEC包可用 int available_fec_packets = 0; int available_fec_packet_index = 0; for (int i = 0; i < TOTAL_PACKETS_PER_BLOCK; i++) { PacketBuffer* packet = ctx->received_packets[i]; if (packet->data){ if (random == i || random2 == i || random3 == i){ received[i] = 0; uint8_t *data = packet->data+RTP_HEADER_SIZE; printf("test lost:%d ,orign data \n", i); printf_hex(data, packet->length - RTP_HEADER_SIZE); missing_packets++; }else{ received[i] = 1; if (packet->is_fec) { available_fec_packets++; available_fec_packet_index = i; //printf("fec lost:%d ,orign data \n", i); //printf_hex(packet->data+FEC_HEADER_SIZE, 32); } } }else{ received[i] = 0; } } // 如果FEC包全丢,不进行恢复 if (available_fec_packets == 0) { printf("错误: 没有可用的FEC包,无法恢复\n"); return 0; } printf("块 %d 有 %d 个丢失的数据包,%d 个可用的FEC包,fec_i:%d\n", block_index, missing_packets, available_fec_packets, available_fec_packet_index); // 检查是否有足够的FEC包来恢复 if (missing_packets > MAX_RESTORE_PACKET) { printf("错误: 可用FEC包不足,无法恢复 (需要至少 %d 个,实际 %d 个)\n", missing_packets, available_fec_packets); return 0; } uint8_t symbols[REED_SOLOMON_LEN], restored[REED_SOLOMON_LEN]; PacketBuffer* fec_packet = ctx->received_packets[available_fec_packet_index] ; FECHeader* fec_header = (FECHeader*)fec_packet; // 1. 分块恢复(共35块,每块40个字节) // 2. 分块恢复(共175块,每块8个字节) for (int block=0; block<NUM_BLOCKS; block++) { int offset = block * REAL_BLOCK_SIZE; memset(symbols, 0, REED_SOLOMON_LEN); memset(restored, 0, REED_SOLOMON_LEN); memset(erasures, 0, BLOCK_SIZE); num_erasures = 0; // 组包1:收集当前块的原始数据 5个包,每个包40个字节 // 组包2:收集当前块的原始数据 175个包,每个包8个字节 for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++) { if (received[i] ) { uint8_t *data = ctx->received_packets[i]->data; uint8_t *full_data = data + RTP_HEADER_SIZE; memcpy(symbols + i*REAL_BLOCK_SIZE, full_data + offset, REAL_BLOCK_SIZE); }else{ for (int j=0; j<REAL_BLOCK_SIZE; j++){ erasures[num_erasures++] = i*REAL_BLOCK_SIZE+j; } //num_erasures += REAL_BLOCK_SIZE; } } // 收集当前块的校验数据(1个FEC包) memcpy(symbols + BLOCK_SIZE , (void*)fec_packet->data + FEC_HEADER_SIZE + block*SYMBLE_SIZE, SYMBLE_SIZE); if (block == 0){ printf("***************num_erasures:%d data:",num_erasures); for (int j=0; j<num_erasures; j++){ printf("%d,",erasures[j]); } printf("****\n",num_erasures); printf_hex(symbols, REED_SOLOMON_LEN); } // RS解码 int success = correct_reed_solomon_decode_with_erasures( ctx->rs, symbols, REED_SOLOMON_LEN, erasures, num_erasures, restored ); if (block == 0){ printf_hex(restored, REED_SOLOMON_LEN); printf("#########success:%d#############\n",success); } // 3. 回写恢复的原始数据 if (success) { for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++) { if (!received[i]) { PacketBuffer *packetbuffer = ctx->received_packets[i] ; if (packetbuffer && packetbuffer->data == NULL){ packetbuffer->data = (uint8_t *)malloc(MAX_PACKET_SIZE + RTP_HEADER_SIZE); } uint8_t *full_data = packetbuffer->data + RTP_HEADER_SIZE; memcpy(full_data + offset, restored + i*REAL_BLOCK_SIZE, REAL_BLOCK_SIZE); } } } } for (int i=0; i<DATA_PACKETS_PER_BLOCK; i++) { if (!received[i]) { int missing_idx= i; if (ctx->received_packets[missing_idx]->data == NULL){ ctx->received_packets[missing_idx]->data = (uint8_t *)malloc(RTP_HEADER_SIZE+MAX_PACKET_SIZE); } // 创建恢复包的头部 RTPHeader* recovered_header = (RTPHeader*)ctx->received_packets[missing_idx]->data; // 复制RTP头部(从第一个可用的包复制) RTPHeader* header = NULL; for (int k = 0; k < DATA_PACKETS_PER_BLOCK; k++) { if (ctx->received_packets[k]->data) { header = (RTPHeader*)ctx->received_packets[k]->data; break; } } *recovered_header = *header; ctx->received_packets[missing_idx]->length = RTP_HEADER_SIZE + MAX_PACKET_SIZE; ctx->received_packets[missing_idx]->seq_num = recovered_header->seq; ctx->received_packets[missing_idx]->timestamp = ntohl(recovered_header->timestamp); ctx->received_packets[missing_idx]->is_fec = 0; ctx->received_packets[missing_idx]->block_id = fec_header->block_id; ctx->received_packets[missing_idx]->packet_id = missing_idx; uint8_t *data = ctx->received_packets[missing_idx]->data + RTP_HEADER_SIZE; printf_hex(data, MAX_PACKET_SIZE); printf("成功恢复块 %d 的数据包 #%d\n", fec_header->block_id, i); } } printf("块 %d 的恢复尝试完成: 成功\n", fec_header->block_id); return 1; } // 编码器主函数示例 void encoder_main(const char* dest_ip, int dest_port) { EncoderContext* ctx = init_encoder(dest_ip, dest_port); if (!ctx) { printf("编码器初始化失败\n"); return; } printf("编码器开始发送数据到 %s:%d...\n", dest_ip, dest_port); // 模拟发送 100 个 RTP 包 for (int i = 0; i < 100; i++) { // 创建随机数据作为载荷 uint8_t payload[MAX_PACKET_SIZE]; for (int j = 0; j < sizeof(payload); j++) { payload[j] = rand() % 256; } // 编码并发送 encode_and_send(ctx, payload, sizeof(payload), i * 90); // 控制发送速率 usleep(10000); // 10ms } cleanup_encoder(ctx); } // 解码器主函数示例 void decoder_main(int listen_port) { DecoderContext* ctx = init_decoder(listen_port); if (!ctx) { printf("解码器初始化失败\n"); return; } printf("解码器开始在端口 %d 接收数据...\n", listen_port); uint8_t* output_buffer = NULL; int output_len = 0; int seq_num = 0; // 持续接收和解码数据包 while (1) { int result = receive_and_decode(ctx, &output_buffer, &output_len, &seq_num); if (result > 0) { printf("处理接收到的数据包 #%d, 长度 %d 字节\n", seq_num, output_len); // 这里可以处理接收到的数据,例如写入文件或播放 // 示例中只是简单打印信息 // 注意:output_buffer 指向的是 ctx 内部缓冲区, // 在下一次调用 receive_and_decode 前保持有效 } else if (result < 0) { printf("接收数据时发生错误\n"); break; } // 控制循环速率 usleep(1000); // 1ms } cleanup_decoder(ctx); } int main(int argc, char* argv[]) { // 初始化随机数生成器 srand(time(NULL)); if (argc < 2) { printf("用法: %s [encoder|decoder] [参数...]\n", argv[0]); printf("编码器: %s encoder <目标IP> <目标端口>\n", argv[0]); printf("解码器: %s decoder <监听端口>\n", argv[0]); return 1; } if (strcmp(argv[1], "encoder") == 0) { if (argc < 4) { printf("编码器需要目标IP和端口参数\n"); return 1; } const char* dest_ip = argv[2]; int dest_port = atoi(argv[3]); encoder_main(dest_ip, dest_port); } else if (strcmp(argv[1], "decoder") == 0) { if (argc < 3) { printf("解码器需要监听端口参数\n"); return 1; } int listen_port = atoi(argv[2]); decoder_main(listen_port); } else { printf("未知命令: %s\n", argv[1]); return 1; } return 0; }
测试:
接收端:
./testc decoder 5000
发送端:
./testc encoder 127.0.0.1 5000
-------------------广告线---------------
项目、合作,欢迎勾搭,邮箱:promall@qq.com
本文为呱牛笔记原创文章,转载无需和我联系,但请注明来自呱牛笔记 ,it3q.com