A few minor changes to the API. (#52)

The only salvagable remains from a multi-day refactoring effort. The
main benefit is that Setup messages are no longer part of the Message
enum, so match will be a lot easier.
This commit is contained in:
kixelated 2023-08-23 15:28:27 -07:00 committed by GitHub
parent c5d8873e4e
commit 5c3f794053
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 110 additions and 101 deletions

View File

@ -6,4 +6,5 @@ pub mod setup;
pub use coding::VarInt; pub use coding::VarInt;
pub use message::Message; pub use message::Message;
pub use object::Object;
pub use session::Session; pub use session::Session;

View File

@ -19,7 +19,6 @@ pub use subscribe_error::*;
pub use subscribe_ok::*; pub use subscribe_ok::*;
use crate::coding::{DecodeError, EncodeError, VarInt}; use crate::coding::{DecodeError, EncodeError, VarInt};
use crate::setup;
use std::fmt; use std::fmt;
@ -81,16 +80,12 @@ macro_rules! message_types {
} }
} }
// Just so we can use the macro above.
type SetupClient = setup::Client;
type SetupServer = setup::Server;
// Each message is prefixed with the given VarInt type. // Each message is prefixed with the given VarInt type.
message_types! { message_types! {
// NOTE: Object and Setup are in other modules. // NOTE: Object and Setup are in other modules.
// Object = 0x0 // Object = 0x0
SetupClient = 0x1, // SetupClient = 0x1
SetupServer = 0x2, // SetupServer = 0x2
Subscribe = 0x3, Subscribe = 0x3,
SubscribeOk = 0x4, SubscribeOk = 0x4,
SubscribeError = 0x5, SubscribeError = 0x5,

View File

@ -1,54 +0,0 @@
use crate::coding::{DecodeError, EncodeError, VarInt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use webtransport_generic::{RecvStream, SendStream};
#[derive(Debug)]
pub struct Header {
// An ID for this track.
// Proposal: https://github.com/moq-wg/moq-transport/issues/209
pub track: VarInt,
// The group sequence number.
pub group: VarInt,
// The object sequence number.
pub sequence: VarInt,
// The priority/send order.
// Proposal: int32 instead of a varint.
pub send_order: i32,
}
impl Header {
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
let typ = VarInt::decode(r).await?;
if typ.into_inner() != 0 {
return Err(DecodeError::InvalidType(typ));
}
// NOTE: size has been omitted
let track = VarInt::decode(r).await?;
let group = VarInt::decode(r).await?;
let sequence = VarInt::decode(r).await?;
let send_order = r.read_i32().await?; // big-endian
Ok(Self {
track,
group,
sequence,
send_order,
})
}
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
VarInt::from_u32(0).encode(w).await?;
self.track.encode(w).await?;
self.group.encode(w).await?;
self.sequence.encode(w).await?;
w.write_i32(self.send_order).await?;
Ok(())
}
}

View File

@ -1,7 +1,60 @@
mod header;
mod receiver; mod receiver;
mod sender; mod sender;
pub use header::*;
pub use receiver::*; pub use receiver::*;
pub use sender::*; pub use sender::*;
use crate::coding::{DecodeError, EncodeError, VarInt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use webtransport_generic::{RecvStream, SendStream};
#[derive(Debug)]
pub struct Object {
// An ID for this track.
// Proposal: https://github.com/moq-wg/moq-transport/issues/209
pub track: VarInt,
// The group sequence number.
pub group: VarInt,
// The object sequence number.
pub sequence: VarInt,
// The priority/send order.
// Proposal: int32 instead of a varint.
pub send_order: i32,
}
impl Object {
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
let typ = VarInt::decode(r).await?;
if typ.into_inner() != 0 {
return Err(DecodeError::InvalidType(typ));
}
// NOTE: size has been omitted
let track = VarInt::decode(r).await?;
let group = VarInt::decode(r).await?;
let sequence = VarInt::decode(r).await?;
let send_order = r.read_i32().await?; // big-endian
Ok(Self {
track,
group,
sequence,
send_order,
})
}
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
VarInt::from_u32(0).encode(w).await?;
self.track.encode(w).await?;
self.group.encode(w).await?;
self.sequence.encode(w).await?;
w.write_i32(self.send_order).await?;
Ok(())
}
}

View File

@ -1,4 +1,4 @@
use crate::object::Header; use crate::Object;
use anyhow::Context; use anyhow::Context;
@ -10,7 +10,7 @@ pub struct Receiver<S: Session> {
session: S, session: S,
// Streams that we've accepted but haven't read the header from yet. // Streams that we've accepted but haven't read the header from yet.
streams: JoinSet<anyhow::Result<(Header, S::RecvStream)>>, streams: JoinSet<anyhow::Result<(Object, S::RecvStream)>>,
} }
impl<S: Session> Receiver<S> { impl<S: Session> Receiver<S> {
@ -21,7 +21,7 @@ impl<S: Session> Receiver<S> {
} }
} }
pub async fn recv(&mut self) -> anyhow::Result<(Header, S::RecvStream)> { pub async fn recv(&mut self) -> anyhow::Result<(Object, S::RecvStream)> {
loop { loop {
tokio::select! { tokio::select! {
res = self.session.accept_uni() => { res = self.session.accept_uni() => {
@ -35,8 +35,8 @@ impl<S: Session> Receiver<S> {
} }
} }
async fn read(mut stream: S::RecvStream) -> anyhow::Result<(Header, S::RecvStream)> { async fn read(mut stream: S::RecvStream) -> anyhow::Result<(Object, S::RecvStream)> {
let header = Header::decode(&mut stream).await?; let header = Object::decode(&mut stream).await?;
Ok((header, stream)) Ok((header, stream))
} }
} }

View File

@ -1,6 +1,6 @@
use anyhow::Context; use anyhow::Context;
use crate::object::Header; use crate::Object;
use webtransport_generic::{SendStream, Session}; use webtransport_generic::{SendStream, Session};
@ -16,11 +16,11 @@ impl<S: Session> Sender<S> {
Self { session } Self { session }
} }
pub async fn open(&mut self, header: Header) -> anyhow::Result<S::SendStream> { pub async fn open(&mut self, object: Object) -> anyhow::Result<S::SendStream> {
let mut stream = self.session.open_uni().await.context("failed to open uni stream")?; let mut stream = self.session.open_uni().await.context("failed to open uni stream")?;
stream.set_priority(header.send_order); stream.set_priority(object.send_order);
header.encode(&mut stream).await.context("failed to write header")?; object.encode(&mut stream).await.context("failed to write header")?;
// log::info!("created stream: {:?}", header); // log::info!("created stream: {:?}", header);

View File

@ -14,15 +14,11 @@ impl<S: WTSession> Session<S> {
/// Called by a server with an established WebTransport session. /// Called by a server with an established WebTransport session.
// TODO close the session with an error code // TODO close the session with an error code
pub async fn accept(session: S, role: setup::Role) -> anyhow::Result<Self> { pub async fn accept(session: S, role: setup::Role) -> anyhow::Result<Self> {
let (send, recv) = session.accept_bi().await.context("failed to accept bidi stream")?; let (mut send, mut recv) = session.accept_bi().await.context("failed to accept bidi stream")?;
let mut send_control = message::Sender::new(send); let setup_client = setup::Client::decode(&mut recv)
let mut recv_control = message::Receiver::new(recv); .await
.context("failed to read CLIENT SETUP")?;
let setup_client = match recv_control.recv().await.context("failed to read SETUP")? {
message::Message::SetupClient(setup) => setup,
_ => anyhow::bail!("expected CLIENT SETUP"),
};
setup_client setup_client
.versions .versions
@ -39,11 +35,14 @@ impl<S: WTSession> Session<S> {
version: setup::Version::DRAFT_00, version: setup::Version::DRAFT_00,
}; };
send_control setup_server
.send(message::Message::SetupServer(setup_server)) .encode(&mut send)
.await .await
.context("failed to send setup server")?; .context("failed to send setup server")?;
let send_control = message::Sender::new(send);
let recv_control = message::Receiver::new(recv);
let send_objects = object::Sender::new(session.clone()); let send_objects = object::Sender::new(session.clone());
let recv_objects = object::Receiver::new(session.clone()); let recv_objects = object::Receiver::new(session.clone());
@ -57,10 +56,7 @@ impl<S: WTSession> Session<S> {
/// Called by a client with an established WebTransport session. /// Called by a client with an established WebTransport session.
pub async fn connect(session: S, role: setup::Role) -> anyhow::Result<Self> { pub async fn connect(session: S, role: setup::Role) -> anyhow::Result<Self> {
let (send, recv) = session.open_bi().await.context("failed to oen bidi stream")?; let (mut send, mut recv) = session.open_bi().await.context("failed to oen bidi stream")?;
let mut send_control = message::Sender::new(send);
let mut recv_control = message::Receiver::new(recv);
let setup_client = setup::Client { let setup_client = setup::Client {
role, role,
@ -68,15 +64,12 @@ impl<S: WTSession> Session<S> {
path: "".to_string(), path: "".to_string(),
}; };
send_control setup_client
.send(message::Message::SetupClient(setup_client)) .encode(&mut send)
.await .await
.context("failed to send SETUP CLIENT")?; .context("failed to send SETUP CLIENT")?;
let setup_server = match recv_control.recv().await.context("failed to read SETUP")? { let setup_server = setup::Server::decode(&mut recv).await.context("failed to read SETUP")?;
message::Message::SetupServer(setup) => setup,
_ => anyhow::bail!("expected SERVER SETUP"),
};
if setup_server.version != setup::Version::DRAFT_00 { if setup_server.version != setup::Version::DRAFT_00 {
anyhow::bail!("unsupported version: {:?}", setup_server.version); anyhow::bail!("unsupported version: {:?}", setup_server.version);
@ -86,6 +79,9 @@ impl<S: WTSession> Session<S> {
anyhow::bail!("incompatible roles: {:?} {:?}", role, setup_server.role); anyhow::bail!("incompatible roles: {:?} {:?}", role, setup_server.role);
} }
let send_control = message::Sender::new(send);
let recv_control = message::Receiver::new(recv);
let send_objects = object::Sender::new(session.clone()); let send_objects = object::Sender::new(session.clone());
let recv_objects = object::Receiver::new(session.clone()); let recv_objects = object::Receiver::new(session.clone());

View File

@ -1,5 +1,8 @@
use super::{Role, Versions}; use super::{Role, Versions};
use crate::coding::{decode_string, encode_string, DecodeError, EncodeError}; use crate::{
coding::{decode_string, encode_string, DecodeError, EncodeError},
VarInt,
};
use webtransport_generic::{RecvStream, SendStream}; use webtransport_generic::{RecvStream, SendStream};
@ -22,6 +25,11 @@ pub struct Client {
impl Client { impl Client {
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> { pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
let typ = VarInt::decode(r).await?;
if typ.into_inner() != 1 {
return Err(DecodeError::InvalidType(typ));
}
let versions = Versions::decode(r).await?; let versions = Versions::decode(r).await?;
let role = Role::decode(r).await?; let role = Role::decode(r).await?;
let path = decode_string(r).await?; let path = decode_string(r).await?;
@ -30,6 +38,7 @@ impl Client {
} }
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> { pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
VarInt::from_u32(1).encode(w).await?;
self.versions.encode(w).await?; self.versions.encode(w).await?;
self.role.encode(w).await?; self.role.encode(w).await?;
encode_string(&self.path, w).await?; encode_string(&self.path, w).await?;

View File

@ -1,5 +1,8 @@
use super::{Role, Version}; use super::{Role, Version};
use crate::coding::{DecodeError, EncodeError}; use crate::{
coding::{DecodeError, EncodeError},
VarInt,
};
use webtransport_generic::{RecvStream, SendStream}; use webtransport_generic::{RecvStream, SendStream};
@ -18,6 +21,11 @@ pub struct Server {
impl Server { impl Server {
pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> { pub async fn decode<R: RecvStream>(r: &mut R) -> Result<Self, DecodeError> {
let typ = VarInt::decode(r).await?;
if typ.into_inner() != 2 {
return Err(DecodeError::InvalidType(typ));
}
let version = Version::decode(r).await?; let version = Version::decode(r).await?;
let role = Role::decode(r).await?; let role = Role::decode(r).await?;
@ -25,6 +33,7 @@ impl Server {
} }
pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> { pub async fn encode<W: SendStream>(&self, w: &mut W) -> Result<(), EncodeError> {
VarInt::from_u32(2).encode(w).await?;
self.version.encode(w).await?; self.version.encode(w).await?;
self.role.encode(w).await?; self.role.encode(w).await?;

View File

@ -7,7 +7,7 @@ use tokio::sync::mpsc;
use tokio::task::JoinSet; // lock across await boundaries use tokio::task::JoinSet; // lock across await boundaries
use moq_transport::message::{Announce, AnnounceError, AnnounceOk, Subscribe, SubscribeError, SubscribeOk}; use moq_transport::message::{Announce, AnnounceError, AnnounceOk, Subscribe, SubscribeError, SubscribeOk};
use moq_transport::{object, VarInt}; use moq_transport::{object, Object, VarInt};
use webtransport_generic::Session as WTSession; use webtransport_generic::Session as WTSession;
use bytes::BytesMut; use bytes::BytesMut;
@ -89,15 +89,15 @@ impl<S: WTSession> Session<S> {
} }
} }
async fn receive_object(&mut self, header: object::Header, stream: S::RecvStream) -> anyhow::Result<()> { async fn receive_object(&mut self, obj: Object, stream: S::RecvStream) -> anyhow::Result<()> {
let track = header.track; let track = obj.track;
// Keep objects in memory for 10s // Keep objects in memory for 10s
let expires = time::Instant::now() + time::Duration::from_secs(10); let expires = time::Instant::now() + time::Duration::from_secs(10);
let segment = segment::Info { let segment = segment::Info {
sequence: header.sequence, sequence: obj.sequence,
send_order: header.send_order, send_order: obj.send_order,
expires: Some(expires), expires: Some(expires),
}; };

View File

@ -4,7 +4,7 @@ use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet; // allows locking across await use tokio::task::JoinSet; // allows locking across await
use moq_transport::message::{Announce, AnnounceError, AnnounceOk, Subscribe, SubscribeError, SubscribeOk}; use moq_transport::message::{Announce, AnnounceError, AnnounceOk, Subscribe, SubscribeError, SubscribeOk};
use moq_transport::{object, VarInt}; use moq_transport::{object, Object, VarInt};
use webtransport_generic::Session as WTSession; use webtransport_generic::Session as WTSession;
use crate::model::{segment, track}; use crate::model::{segment, track};
@ -163,7 +163,7 @@ impl<S: WTSession> Session<S> {
track_id: VarInt, track_id: VarInt,
mut segment: segment::Subscriber, mut segment: segment::Subscriber,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let object = object::Header { let object = Object {
track: track_id, track: track_id,
group: segment.sequence, group: segment.sequence,
sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object sequence: VarInt::from_u32(0), // Always zero since we send an entire group as an object