24个版本 (13个稳定版)
1.2.1 | 2024年3月18日 |
---|---|
1.1.5 | 2024年2月28日 |
1.1.1 | 2023年12月7日 |
1.1.0 | 2023年11月12日 |
0.3.2 | 2022年12月31日 |
#176 in 网络编程
每月47次 下载
355KB
8K SLoC
FLUTE - 单向传输文件
大规模可扩展的组播分发解决方案
该库实现了单向文件传输,无需返回通道。
RFC
此库实现了以下RFC
RFC | 标题 | 链接 |
---|---|---|
RFC 6726 | FLUTE - 单向传输文件 | https://www.rfc-editor.org/rfc/rfc6726.html |
RFC 5775 | 异步分层编码 (ALC) 协议实例化 | https://www.rfc-editor.org/rfc/rfc5775.html |
RFC 5661 | 分层编码传输 (LCT) 构建块 | https://www.rfc-editor.org/rfc/rfc5651 |
RFC 5052 | 前向错误纠正 (FEC) 构建块 | https://www.rfc-editor.org/rfc/rfc5052 |
RFC 5510 | 里德-所罗门前向错误纠正 (FEC) 方案 | https://www.rfc-editor.org/rfc/rfc5510.html |
3GPP TS 26.346 | 扩展FLUTE FDT模式 (7.2.10) | https://www.etsi.org/deliver/etsi_ts/126300_126399/126346/17.03.00_60/ts_126346v170300p.pdf |
UDP/IP组播文件发送器
通过UDP/IP网络传输文件
use flute::sender::Sender;
use flute::sender::ObjectDesc;
use flute::sender::Cenc;
use flute::core::UDPEndpoint;
use std::net::UdpSocket;
use std::time::SystemTime;
// Create UDP Socket
let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
udp_socket.connect("224.0.0.1:3400").expect("Connection failed");
// Create FLUTE Sender
let tsi = 1;
let oti = Default::default();
let config = Default::default();
let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
let mut sender = Sender::new(endpoint, tsi, &oti, &config);
// Add object(s) (files) to the FLUTE sender (priority queue 0)
let obj = ObjectDesc::create_from_buffer(b"hello world", "text/plain",
&url::Url::parse("file:///hello.txt").unwrap(), 1, None, None, None, Cenc::Null, true, None, true).unwrap();
sender.add_object(0, obj);
// Always call publish after adding objects
sender.publish(SystemTime::now());
// Send FLUTE packets over UDP/IP
while let Some(pkt) = sender.read(SystemTime::now()) {
udp_socket.send(&pkt).unwrap();
std::thread::sleep(std::time::Duration::from_millis(1));
}
UDP/IP组播文件接收器
从UDP/IP网络接收文件
use flute::receiver::{writer, MultiReceiver};
use flute::core::UDPEndpoint;
use std::net::UdpSocket;
use std::time::SystemTime;
use std::rc::Rc;
// Create UDP/IP socket to receive FLUTE pkt
let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
let udp_socket = UdpSocket::bind(format!("{}:{}", endpoint.destination_group_address, endpoint.port)).expect("Fail to bind");
// Create a writer able to write received files to the filesystem
let writer = Rc::new(writer::ObjectWriterFSBuilder::new(&std::path::Path::new("./flute_dir"))
.unwrap_or_else(|_| std::process::exit(0)));
// Create a multi-receiver capable of de-multiplexing several FLUTE sessions
let mut receiver = MultiReceiver::new(writer, None, false);
// Receive pkt from UDP/IP socket and push it to the FLUTE receiver
let mut buf = [0; 2048];
loop {
let (n, _src) = udp_socket.recv_from(&mut buf).expect("Failed to receive data");
let now = SystemTime::now();
receiver.push(&endpoint, &buf[..n], now).unwrap();
receiver.cleanup(now);
}
应用层前向纠错 (AL-FEC)
支持以下错误恢复算法
- 无码
- 里德-所罗门 GF 2^8
- 里德-所罗门 GF 2^8 限定
- 里德-所罗门 GF 2^16
- 里德-所罗门 GF 2^m
- RaptorQ
- Raptor
Oti
模块提供了FLUTE协议中用于配置前向错误纠正 (FEC) 编码的对象传输信息 (OTI) 的实现。
use flute::sender::Oti;
use flute::sender::Sender;
use flute::core::UDPEndpoint;
// Reed Solomon 2^8 with encoding blocks composed of
// 60 source symbols and 4 repair symbols of 1424 bytes per symbol
let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
let oti = Oti::new_reed_solomon_rs28(1424, 60, 4).unwrap();
let mut sender = Sender::new(endpoint, 1, &oti, &Default::default());
内容编码 (CENC)
在传输/接收过程中支持以下方案
- 空(无压缩)
- Deflate
- Zlib
- Gzip
文件复用/块交织
FLUTE发送器可以通过交织每个文件的包来并行传输多个文件。例如
文件1的数据包 -> 文件2的数据包 -> 文件3的数据包 -> 文件1的数据包 -> 文件2的数据包 -> 文件3的数据包 ...
发送者可以在单个文件中交错块。以下示例显示了来自不同块(B)的编码符号(ES)是交错排列的。例如
(B 1,ES 1)->(B 2,ES 1)->(B 3,ES 1)->**(B 1,ES 2)**->(B 2,ES 2)...
要配置复用,请使用以下Config
结构体
use flute::sender::Sender;
use flute::sender::Config;
use flute::sender::PriorityQueue;
use flute::core::UDPEndpoint;
let mut config = Config {
// Interleave a maximum of 3 blocks within each file
interleave_blocks: 3,
..Default::default()
};
// Interleave a maximum of 3 files in priority queue '0'
config.set_priority_queue(PriorityQueue::HIGHEST, PriorityQueue::new(3));
let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
let mut sender = Sender::new(endpoint, 1, &Default::default(), &config);
优先级队列
FLUTE发送者可以配置多个队列,每个队列具有不同的优先级。优先级较高的队列中的文件总是先于优先级较低的队列中的文件传输。当有优先级较高的队列中的文件需要传输时,优先级较低的队列中的文件传输将暂停。
use flute::sender::Sender;
use flute::sender::Config;
use flute::sender::PriorityQueue;
use flute::core::UDPEndpoint;
use flute::sender::ObjectDesc;
use flute::sender::Cenc;
// Create a default configuration
let mut config: flute::sender::Config = Default::default();
// Configure the HIGHEST priority queue with a capacity of 3 simultaneous file transfer
config.set_priority_queue(PriorityQueue::HIGHEST, PriorityQueue::new(3));
// Configure the LOW priority queue with a capacity of 1 file transfer at a time
config.set_priority_queue(PriorityQueue::LOW, PriorityQueue::new(1));
let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
let mut sender = Sender::new(endpoint, 1, &Default::default(), &config);
// Create an ObjectDesc for a low priority file
let low_priority_obj = ObjectDesc::create_from_buffer(b"low priority", "text/plain",
&url::Url::parse("file:///low_priority.txt").unwrap(), 1, None, None, None, Cenc::Null, true, None, true).unwrap();
// Create an ObjectDesc for a high priority file
let high_priority_obj = ObjectDesc::create_from_buffer(b"high priority", "text/plain",
&url::Url::parse("file:///high_priority.txt").unwrap(), 1, None, None, None, Cenc::Null, true, None, true).unwrap();
// Put Object to the low priority queue
sender.add_object(PriorityQueue::LOW, low_priority_obj);
// Put Object to the high priority queue
sender.add_object(PriorityQueue::HIGHEST, high_priority_obj);
Python绑定
安装
pip install flute-alc
示例
Flute发送者Python示例
from flute import sender
# Flute Sender config parameters
sender_config = sender.Config()
# Object transmission parameters (no_code => no FEC)
# encoding symbol size : 1400 bytes
# Max source block length : 64 encoding symbols
oti = sender.Oti.new_no_code(1400, 64)
# Create FLUTE Sender
flute_sender = sender.Sender(1, oti, sender_config)
# Transfer a file
flute_sender.add_file("/path/to/file", 0, "application/octet-stream", None, None)
flute_sender.publish()
while True:
alc_pkt = flute_sender.read()
if alc_pkt == None:
break
#TODO Send alc_pkt over UDP/IP
Flute接收者Python示例
from flute import receiver
# Write received objects to a destination folder
receiver_writer = receiver.ObjectWriterBuilder("/path/to/dest")
# FLUTE Receiver configuration parameters
receiver_config = receiver.Config()
tsi = 1
# Create a FLUTE receiver with the specified endpoint, tsi, writer, and configuration
udp_endpoint = receiver.UDPEndpoint("224.0.0.1", 1234)
flute_receiver = receiver.Receiver(udp_endpoint, tsi, receiver_writer, receiver_config)
while True:
# Receive LCT/ALC packet from UDP/IP multicast (Implement your own receive_from_udp_socket() function)
# Note: FLUTE does not handle the UDP/IP layer, you need to implement the socket reception mechanism yourself
pkt = receive_from_udp_socket()
# Push the received packet to the FLUTE receiver
flute_receiver.push(bytes(pkt))
依赖项
~6–14MB
~188K SLoC