24个版本 (13个稳定版)

1.2.1 2024年3月18日
1.1.5 2024年2月28日
1.1.1 2023年12月7日
1.1.0 2023年11月12日
0.3.2 2022年12月31日

#176 in 网络编程

每月47次 下载

MIT 许可证

355KB
8K SLoC

Rust Python Docs.rs Crates.io Rust Dependency codecov

FLUTE - 单向传输文件

大规模可扩展的组播分发解决方案

该库实现了单向文件传输,无需返回通道。

RFC

此库实现了以下RFC

RFC 标题 链接
RFC 6726 FLUTE - 单向传输文件 https://www.rfc-editor.org/rfc/rfc6726.html
RFC 5775 异步分层编码 (ALC) 协议实例化 https://www.rfc-editor.org/rfc/rfc5775.html
RFC 5661 分层编码传输 (LCT) 构建块 https://www.rfc-editor.org/rfc/rfc5651
RFC 5052 前向错误纠正 (FEC) 构建块 https://www.rfc-editor.org/rfc/rfc5052
RFC 5510 里德-所罗门前向错误纠正 (FEC) 方案 https://www.rfc-editor.org/rfc/rfc5510.html
3GPP TS 26.346 扩展FLUTE FDT模式 (7.2.10) https://www.etsi.org/deliver/etsi_ts/126300_126399/126346/17.03.00_60/ts_126346v170300p.pdf

UDP/IP组播文件发送器

通过UDP/IP网络传输文件

use flute::sender::Sender;
use flute::sender::ObjectDesc;
use flute::sender::Cenc;
use flute::core::UDPEndpoint;
use std::net::UdpSocket;
use std::time::SystemTime;

// Create UDP Socket
let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
udp_socket.connect("224.0.0.1:3400").expect("Connection failed");

// Create FLUTE Sender
let tsi = 1;
let oti = Default::default();
let config = Default::default();
let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
let mut sender = Sender::new(endpoint, tsi, &oti, &config);

// Add object(s) (files) to the FLUTE sender (priority queue 0)
let obj = ObjectDesc::create_from_buffer(b"hello world", "text/plain",
&url::Url::parse("file:///hello.txt").unwrap(), 1, None, None, None, Cenc::Null, true, None, true).unwrap();
sender.add_object(0, obj);

// Always call publish after adding objects
sender.publish(SystemTime::now());

// Send FLUTE packets over UDP/IP
while let Some(pkt) = sender.read(SystemTime::now()) {
    udp_socket.send(&pkt).unwrap();
    std::thread::sleep(std::time::Duration::from_millis(1));
}

UDP/IP组播文件接收器

从UDP/IP网络接收文件

use flute::receiver::{writer, MultiReceiver};
use flute::core::UDPEndpoint;
use std::net::UdpSocket;
use std::time::SystemTime;
use std::rc::Rc;

// Create UDP/IP socket to receive FLUTE pkt
let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
let udp_socket = UdpSocket::bind(format!("{}:{}", endpoint.destination_group_address, endpoint.port)).expect("Fail to bind");

// Create a writer able to write received files to the filesystem
let writer = Rc::new(writer::ObjectWriterFSBuilder::new(&std::path::Path::new("./flute_dir"))
    .unwrap_or_else(|_| std::process::exit(0)));

// Create a multi-receiver capable of de-multiplexing several FLUTE sessions
let mut receiver = MultiReceiver::new(writer, None, false);

// Receive pkt from UDP/IP socket and push it to the FLUTE receiver
let mut buf = [0; 2048];
loop {
    let (n, _src) = udp_socket.recv_from(&mut buf).expect("Failed to receive data");
    let now = SystemTime::now();
    receiver.push(&endpoint, &buf[..n], now).unwrap();
    receiver.cleanup(now);
}

应用层前向纠错 (AL-FEC)

支持以下错误恢复算法

  • 无码
  • 里德-所罗门 GF 2^8
  • 里德-所罗门 GF 2^8 限定
  • 里德-所罗门 GF 2^16
  • 里德-所罗门 GF 2^m
  • RaptorQ
  • Raptor

Oti 模块提供了FLUTE协议中用于配置前向错误纠正 (FEC) 编码的对象传输信息 (OTI) 的实现。

use flute::sender::Oti;
use flute::sender::Sender;
use flute::core::UDPEndpoint;

// Reed Solomon 2^8 with encoding blocks composed of
// 60 source symbols and 4 repair symbols of 1424 bytes per symbol
let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
let oti = Oti::new_reed_solomon_rs28(1424, 60, 4).unwrap();
let mut sender = Sender::new(endpoint, 1, &oti, &Default::default());

内容编码 (CENC)

在传输/接收过程中支持以下方案

  • 空(无压缩)
  • Deflate
  • Zlib
  • Gzip

文件复用/块交织

FLUTE发送器可以通过交织每个文件的包来并行传输多个文件。例如

文件1的数据包 -> 文件2的数据包 -> 文件3的数据包 -> 文件1的数据包 -> 文件2的数据包 -> 文件3的数据包 ...

发送者可以在单个文件中交错块。以下示例显示了来自不同块(B)的编码符号(ES)是交错排列的。例如

(B 1,ES 1)->(B 2,ES 1)->(B 3,ES 1)->**(B 1,ES 2)**->(B 2,ES 2)...

要配置复用,请使用以下Config结构体

use flute::sender::Sender;
use flute::sender::Config;
use flute::sender::PriorityQueue;
use flute::core::UDPEndpoint;

let mut config = Config {
    // Interleave a maximum of 3 blocks within each file
    interleave_blocks: 3,
    ..Default::default()
};

// Interleave a maximum of 3 files in priority queue '0'
config.set_priority_queue(PriorityQueue::HIGHEST, PriorityQueue::new(3));

let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
let mut sender = Sender::new(endpoint, 1, &Default::default(), &config);

优先级队列

FLUTE发送者可以配置多个队列,每个队列具有不同的优先级。优先级较高的队列中的文件总是先于优先级较低的队列中的文件传输。当有优先级较高的队列中的文件需要传输时,优先级较低的队列中的文件传输将暂停。

use flute::sender::Sender;
use flute::sender::Config;
use flute::sender::PriorityQueue;
use flute::core::UDPEndpoint;
use flute::sender::ObjectDesc;
use flute::sender::Cenc;

// Create a default configuration
let mut config: flute::sender::Config = Default::default();

// Configure the HIGHEST priority queue with a capacity of 3 simultaneous file transfer
config.set_priority_queue(PriorityQueue::HIGHEST, PriorityQueue::new(3));

// Configure the LOW priority queue with a capacity of 1 file transfer at a time
config.set_priority_queue(PriorityQueue::LOW, PriorityQueue::new(1));

let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
let mut sender = Sender::new(endpoint, 1, &Default::default(), &config);

// Create an ObjectDesc for a low priority file
let low_priority_obj = ObjectDesc::create_from_buffer(b"low priority", "text/plain",
&url::Url::parse("file:///low_priority.txt").unwrap(), 1, None, None, None, Cenc::Null, true, None, true).unwrap();

// Create an ObjectDesc for a high priority file
let high_priority_obj = ObjectDesc::create_from_buffer(b"high priority", "text/plain",
&url::Url::parse("file:///high_priority.txt").unwrap(), 1, None, None, None, Cenc::Null, true, None, true).unwrap();

// Put Object to the low priority queue
sender.add_object(PriorityQueue::LOW, low_priority_obj);

// Put Object to the high priority queue
sender.add_object(PriorityQueue::HIGHEST, high_priority_obj);

Python绑定

PyPI version

安装

pip install flute-alc

示例

Flute发送者Python示例

    from flute import sender

    # Flute Sender config parameters
    sender_config = sender.Config()

    # Object transmission parameters (no_code => no FEC)
    # encoding symbol size : 1400 bytes
    # Max source block length : 64 encoding symbols
    oti = sender.Oti.new_no_code(1400, 64)

    # Create FLUTE Sender
    flute_sender = sender.Sender(1, oti, sender_config)

    # Transfer a file 
    flute_sender.add_file("/path/to/file", 0, "application/octet-stream", None, None)
    flute_sender.publish()

    while True:
        alc_pkt = flute_sender.read()
        if alc_pkt == None:
            break

        #TODO Send alc_pkt over UDP/IP

Flute接收者Python示例

    from flute import receiver

    # Write received objects to a destination folder
    receiver_writer = receiver.ObjectWriterBuilder("/path/to/dest")

    # FLUTE Receiver configuration parameters
    receiver_config = receiver.Config()

    tsi = 1

    # Create a FLUTE receiver with the specified endpoint, tsi, writer, and configuration
    udp_endpoint = receiver.UDPEndpoint("224.0.0.1", 1234)
    flute_receiver = receiver.Receiver(udp_endpoint, tsi, receiver_writer, receiver_config)

    while True:
        # Receive LCT/ALC packet from UDP/IP multicast (Implement your own receive_from_udp_socket() function)
        # Note: FLUTE does not handle the UDP/IP layer, you need to implement the socket reception mechanism yourself
        pkt = receive_from_udp_socket()

        # Push the received packet to the FLUTE receiver
        flute_receiver.push(bytes(pkt))

依赖项

~6–14MB
~188K SLoC